Python中的asyncio代碼詳解
asyncio介紹
熟悉c#的同學(xué)可能知道,在c#中可以很方便的使用 async 和 await 來(lái)實(shí)現(xiàn)異步編程,那么在python中應(yīng)該怎么做呢,其實(shí)python也支持異步編程,一般使用 asyncio 這個(gè)庫(kù),下面介紹下什么是 asyncio :
asyncio 是用來(lái)編寫 并發(fā) 代碼的庫(kù),使用 async/await 語(yǔ)法。 asyncio 被用作多個(gè)提供高性能 Python 異步框架的基礎(chǔ),包括網(wǎng)絡(luò)和網(wǎng)站服務(wù),數(shù)據(jù)庫(kù)連接庫(kù),分布式任務(wù)隊(duì)列等等。 asyncio 往往是構(gòu)建 IO 密集型和高層級(jí) 結(jié)構(gòu)化 網(wǎng)絡(luò)代碼的最佳選擇。
asyncio中的基本概念
可以看見(jiàn),使用asyncio庫(kù)我們也可以在python代碼中使用 async 和 await 。在 asyncio 中,有四個(gè)基本概念,分別是:
Eventloop
Eventloop 可以說(shuō)是 asyncio 應(yīng)用的核心,中央總控, Eventloop 實(shí)例提供了注冊(cè)、取消、執(zhí)行任務(wù)和回調(diào) 的方法。 簡(jiǎn)單來(lái)說(shuō),就是我們可以把一些異步函數(shù)注冊(cè)到這個(gè)事件循環(huán)上,事件循環(huán)回循環(huán)執(zhí)行這些函數(shù)(每次只能執(zhí)行一個(gè)),如果當(dāng)前正在執(zhí)行的函數(shù)在等待I/O返回,那么事件循環(huán)就會(huì)暫停它的執(zhí)行去執(zhí)行其他函數(shù)。當(dāng)某個(gè)函數(shù)完成I/O后會(huì)恢復(fù),等到下次循環(huán)到它的時(shí)候就會(huì)繼續(xù)執(zhí)行。
Coroutine
協(xié)程本質(zhì)就是一個(gè)函數(shù),
import asyncio import time async def a(): print('Suspending a') await asyncio.sleep(3) print('Resuming a') async def b(): print('Suspending b') await asyncio.sleep(1) print('Resuming b') async def main(): start = time.perf_counter() await asyncio.gather(a(), b()) print(f'{main.__name__} Cost: {time.perf_counter() - start}') if __name__ == '__main__': asyncio.run(main())
執(zhí)行上述代碼,可以看到類似這樣的輸出:
Suspending a
Suspending b
Resuming b
Resuming a
main Cost: 3.0023356619999997
關(guān)于協(xié)程的具體介紹,可以參考我以前的文章python中的協(xié)程 不過(guò)以前的那種寫法,需要使用裝飾器,已經(jīng)過(guò)時(shí)了。
Future
Future 是表示一個(gè)“未來(lái)”對(duì)象,類似于 javascript 中的 promise ,當(dāng)異步操作結(jié)束后會(huì)把最終結(jié)果設(shè)置到這個(gè) Future 對(duì)象上, Future 是對(duì)協(xié)程的封裝。
>>> import asyncio >>> def fun(): ... print("inner fun") ... return 111 ... >>> loop = asyncio.get_event_loop() >>> future = loop.run_in_executor(None, fun) #這里沒(méi)有使用await inner fun >>> future #可以看到,fun方法狀態(tài)是pending <Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/futures.py:348]> >>> future.done() # 還沒(méi)有完成 False >>> [m for m in dir(future) if not m.startswith('_')] ['add_done_callback', 'cancel', 'cancelled', 'done', 'exception', 'get_loop', 'remove_done_callback', 'result', 'set_exception', 'set_result'] >>> future.result() #這個(gè)時(shí)候如果直接調(diào)用result()方法會(huì)報(bào)錯(cuò) Traceback (most recent call last): File "<input>", line 1, in <module> asyncio.base_futures.InvalidStateError: Result is not set. >>> async def runfun(): ... result=await future ... print(result) ... >>>loop.run_until_complete(runfun()) #也可以通過(guò) loop.run_until_complete(future) 來(lái)執(zhí)行,這里只是為了演示await 111 >>> future <Future finished result=111> >>> future.done() True >>> future.result() 111 Task
Eventloop 除了支持協(xié)程,還支持注冊(cè) Future 和 Task 2種類型的對(duì)象,而 Future 是協(xié)程的封裝, Future 對(duì)象提供了很多任務(wù)方法(如完成后的回調(diào),取消,設(shè)置任務(wù)結(jié)果等等),但是一般情況下開(kāi)發(fā)者不需要操作 Future 這種底層對(duì)象,而是直接用 Future 的子類 Task 協(xié)同的調(diào)度協(xié)程來(lái)實(shí)現(xiàn)并發(fā)。那么什么是 Task 呢?下面介紹下:
一個(gè)與 Future 類似的對(duì)象,可運(yùn)行 Python 協(xié)程。非線程安全。 Task 對(duì)象被用來(lái)在事件循環(huán)中運(yùn)行協(xié)程。如果一個(gè)協(xié)程在等待一個(gè) Future 對(duì)象, Task 對(duì)象會(huì)掛起該協(xié)程的執(zhí)行并等待該 Future 對(duì)象完成。當(dāng)該 Future 對(duì)象完成被打包的協(xié)程將恢復(fù)執(zhí)行。 事件循環(huán)使用協(xié)同日程調(diào)度: 一個(gè)事件循環(huán)每次運(yùn)行一個(gè) Task 對(duì)象。而一個(gè) Task 對(duì)象會(huì)等待一個(gè) Future 對(duì)象完成,該事件循環(huán)會(huì)運(yùn)行其他 Task 、回調(diào)或執(zhí)行IO操作。
下面看看用法:
>>> async def a(): ... print('Suspending a') ... await asyncio.sleep(3) ... print('Resuming a') ... >>> task = asyncio.ensure_future(a()) >>> loop.run_until_complete(task) Suspending a Resuming a
asyncio中一些常見(jiàn)用法的區(qū)別
Asyncio.gather和asyncio.wait
我們?cè)谏厦娴拇a中用到過(guò) asyncio.gather ,其實(shí)還有另外一種用法是 asyncio.wait ,他們都可以讓多個(gè)協(xié)程并發(fā)執(zhí)行,那么他們有什么區(qū)別呢?下面介紹下。
>>> import asyncio >>> async def a(): ... print('Suspending a') ... await asyncio.sleep(3) ... print('Resuming a') ... return 'A' ... ... ... async def b(): ... print('Suspending b') ... await asyncio.sleep(1) ... print('Resuming b') ... return 'B' ... >>> async def fun1(): ... return_value_a, return_value_b = await asyncio.gather(a(), b()) ... print(return_value_a,return_value_b) ... >>> asyncio.run(fun1()) Suspending a Suspending b Resuming b Resuming a A B >>> async def fun2(): ... done,pending=await asyncio.wait([a(),b()]) ... print(done) ... print(pending) ... task=list(done)[0] ... print(task) ... print(task.result()) ... >>> asyncio.run(fun2()) Suspending b Suspending a Resuming b Resuming a {<Task finished coro=<a() done, defined at <input>:1> result='A'>, <Task finished coro=<b() done, defined at <input>:8> result='B'>} set() <Task finished coro=<a() done, defined at <input>:1> result='A'> A
根據(jù)上述代碼,我們可以看出兩者的區(qū)別:
asyncio.gather 能收集協(xié)程的結(jié)果,而且會(huì)按照輸入?yún)f(xié)程的順序保存對(duì)應(yīng)協(xié)程的執(zhí)行結(jié)果,而 asyncio.wait 的返回值有兩項(xiàng),第一項(xiàng)是完成的任務(wù)列表,第二項(xiàng)表示等待完成的任務(wù)列表。
asyncio.wait 支持接受一個(gè)參數(shù) return_when ,在默認(rèn)情況下, asyncio.wait 會(huì)等待全部任務(wù)完成 (return_when='ALL_COMPLETED') ,它還支持 FIRST_COMPLETED (第一個(gè)協(xié)程完成就返回)和 FIRST_EXCEPTION (出現(xiàn)第一個(gè)異常就返回):
>>> async def fun2(): ... done,pending=await asyncio.wait([a(),b()],return_when=asyncio.tasks.FIRST_COMPLETED) ... print(done) ... print(pending) ... task=list(done)[0] ... print(task) ... print(task.result()) ... >>> asyncio.run(fun2()) Suspending a Suspending b Resuming b {<Task finished coro=<b() done, defined at <input>:8> result='B'>} {<Task pending coro=<a() running at <input>:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10757bf18>()]>>} <Task finished coro=<b() done, defined at <input>:8> result='B'> B
一般情況下,用 asyncio.gather 就足夠了。
asyncio.create_task和loop.create_task以及asyncio.ensure_future
這三種方法都可以創(chuàng)建 Task ,從Python3.7開(kāi)始可以統(tǒng)一的使用更高階的 asyncio.create_task .其實(shí) asyncio.create_task 就是用的 loop.create_task . loop.create_task 接受的參數(shù)需要是一個(gè)協(xié)程,但是 asyncio.ensure_future 除了接受協(xié)程,還可以是 Future 對(duì)象或者 awaitable 對(duì)象:
- 如果參數(shù)是協(xié)程,其底層使用 loop.create_task ,返回 Task 對(duì)象
- 如果是 Future 對(duì)象會(huì)直接返回
- 如果是一個(gè) awaitable 對(duì)象,會(huì) await 這個(gè)對(duì)象的 __await__ 方法,再執(zhí)行一次 ensure_future ,最后返回 Task 或者 Future 。
所以 ensure_future 方法主要就是確保這是一個(gè) Future 對(duì)象,一般情況下直接用 asyncio.create_task 就可以了。
注冊(cè)回調(diào)和執(zhí)行同步代碼
可以使用 add_done_callback 來(lái)添加成功回調(diào):
def callback(future): print(f'Result: {future.result()}') def callback2(future, n): print(f'Result: {future.result()}, N: {n}') async def funa(): await asyncio.sleep(1) return "funa" async def main(): task = asyncio.create_task(funa()) task.add_done_callback(callback) await task #這樣可以為callback傳遞參數(shù) task = asyncio.create_task(funa()) task.add_done_callback(functools.partial(callback2, n=1)) await task if __name__ == '__main__': asyncio.run(main())
執(zhí)行同步代碼
如果有同步邏輯,想要用 asyncio 來(lái)實(shí)現(xiàn)并發(fā),那么需要怎么做呢?下面看看:
def a1(): time.sleep(1) return "A" async def b1(): await asyncio.sleep(1) return "B" async def main(): loop = asyncio.get_running_loop() await asyncio.gather(loop.run_in_executor(None, a1), b1()) if __name__ == '__main__': start = time.perf_counter() asyncio.run(main()) print(f'main method Cost: {time.perf_counter() - start}') # 輸出: main method Cost: 1.0050589740000002
可以使用 run_into_executor
來(lái)將同步函數(shù)邏輯轉(zhuǎn)化成一個(gè)協(xié)程,第一個(gè)參數(shù)是要傳遞 concurrent.futures.Executor 實(shí)例的,傳遞 None 會(huì)選擇默認(rèn)的 executor 。
總結(jié)
以上所述是小編給大家介紹的Python中的asyncio代碼詳解,希望對(duì)大家有所幫助,如果大家有任何疑問(wèn)請(qǐng)給我留言,小編會(huì)及時(shí)回復(fù)大家的。在此也非常感謝大家對(duì)腳本之家網(wǎng)站的支持!
如果你覺(jué)得本文對(duì)你有幫助,歡迎轉(zhuǎn)載,煩請(qǐng)注明出處,謝謝!
- 在Python3中使用asyncio庫(kù)進(jìn)行快速數(shù)據(jù)抓取的教程
- 探索Python3.4中新引入的asyncio模塊
- Python中使用asyncio 封裝文件讀寫
- Python使用asyncio包處理并發(fā)詳解
- Python中的并發(fā)處理之a(chǎn)syncio包使用的詳解
- 詳解python異步編程之a(chǎn)syncio(百萬(wàn)并發(fā))
- Python中asyncio與aiohttp入門教程
- Python中asyncio模塊的深入講解
- python協(xié)程與?asyncio?庫(kù)詳情
- Python中asyncio的多種用法舉例(異步同步)
相關(guān)文章
python爬蟲指南之xpath實(shí)例解析(附實(shí)戰(zhàn))
在進(jìn)行網(wǎng)頁(yè)抓取的時(shí)候,分析定位html節(jié)點(diǎn)是獲取抓取信息的關(guān)鍵,目前我用的是lxml模塊,下面這篇文章主要給大家介紹了關(guān)于python爬蟲指南之xpath實(shí)例解析的相關(guān)資料,需要的朋友可以參考下2022-01-01Python apscheduler實(shí)現(xiàn)定時(shí)任務(wù)的方法詳解
apscheduler(Advanced Python Scheduler)是一個(gè)用于Python的靈活、強(qiáng)大的定時(shí)任務(wù)調(diào)度庫(kù),它允許您以各種方式安排函數(shù)或方法的執(zhí)行,下面就跟隨小編一起學(xué)習(xí)一下它的具體使用吧2023-10-10使用IPython來(lái)操作Docker容器的入門指引
這篇文章主要介紹了使用IPython來(lái)操作Docker容器的方法,包括一些基本的搭建和連接,主要依靠docker-py模塊,需要的朋友可以參考下2015-04-04Numpy中的shape、reshape函數(shù)的區(qū)別
本文主要介紹了Numpy中的shape、reshape函數(shù)的區(qū)別,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07Python字符串的encode與decode研究心得亂碼問(wèn)題解決方法
為什么Python使用過(guò)程中會(huì)出現(xiàn)各式各樣的亂碼問(wèn)題,明明是中文字符卻顯示成“\xe4\xb8\xad\xe6\x96\x87”的形式?2009-03-03Python中dumps與dump及l(fā)oads與load的區(qū)別
這篇文章主要介紹了Python中dumps與dump、loads與load的區(qū)別,json模塊提供了一種很簡(jiǎn)單的方式來(lái)編碼和解碼JSON數(shù)據(jù)。其中兩個(gè)主要的函數(shù)是json.dumps()和json.loads(),需要的朋友可以參考下2022-04-04JSON文件及Python對(duì)JSON文件的讀寫操作
JSON和XML都是互聯(lián)網(wǎng)上數(shù)據(jù)交換的主要載體。這篇文章主要介紹了JSON文件及Python對(duì)JSON文件的讀寫操作,需要的朋友可以參考下2018-10-10python的格式化輸出(format,%)實(shí)例詳解
Python中格式化字符串目前有兩種陣營(yíng):%和format,哪一種比較適合我們使用呢?下面腳本之家小編給大家介紹下python的格式化輸出(format,%)實(shí)例詳解,感興趣的朋友一起看看吧2018-06-06