簡單有效上手Python3異步asyncio問題
Python3異步asyncio問題
官方文檔:
https://docs.python.org/zh-cn/3/library/asyncio-task.html#asyncio.run
看了一大堆相關的資料和教程,針對的Python版本不同,寫法也各不一致,翻了翻官方的文檔,發(fā)現(xiàn)其實越高版本的Python對異步進行封裝的越方便,官方說法叫高層級API
,甚至都不用去理解什么Future\task\loop之類的概念了,我現(xiàn)在用的是Python 3.7.5
,可以這樣很簡單的實現(xiàn)阻塞等待\異步并發(fā):如果沒有復雜需求的話,用高層級API就可以了。
如果涉及到回調的話貌似還得用低層級的API,后面單獨記錄。
import asyncio async def first(): print('first函數(shù)調用開始,下面將會等待3秒模擬函數(shù)運行完畢') await asyncio.sleep(3) print('first函數(shù)執(zhí)行完畢') async def last(): print('last函數(shù)調用開始') await asyncio.sleep(2) print('last函數(shù)執(zhí)行完畢') async def func(delay): print('開始異步同時執(zhí)行的函數(shù)+延遲: ' + str(delay)) await asyncio.sleep(delay) print('--異步函數(shù)執(zhí)行完畢+延遲: ' + str(delay)) async def main(): await first() # 這里先調用first()函數(shù),并且等它執(zhí)行完了才會開始 await asyncio.gather( func(1), func(2), func(3) ) await last() asyncio.run(main())
上面代碼實際執(zhí)行的過程是:
- 開始執(zhí)行
first()
函數(shù) - 等
first()
執(zhí)行完畢后開始并發(fā)執(zhí)行下面gather
中加入的 - 三個函數(shù)(任務)三個函數(shù)全部并發(fā)執(zhí)行完畢后執(zhí)行
last()
官方文檔中給的建議是只創(chuàng)建一個主入口的main()
函數(shù)(當然這個函數(shù)名可以自定義的),將要調用的其他函數(shù)都放在這個函數(shù)中,然后再使用asyncio.run()
啟動,理想情況下應當只被調用一次.
上圖:
更新
上面所謂的高階層API
用法最后一行asyncio.run(main())
和下面使用低階層API
實現(xiàn)效果是一樣的:
loop = asyncio.get_event_loop() task = loop.create_task(main()) loop.run_until_complete(task)
下面是學習過程中記錄的偏低層實現(xiàn)的資料
最基本的定義和應用
import asyncio # 定義一個可以異步調用的函數(shù),其類型為coroutine async def func1(): pass if __name__ == '__main__': loop = asyncio.get_event_loop() # 定義一個用來循環(huán)異步函數(shù)的loop對象 task = asyncio.ensure_future(func1()) # 創(chuàng)建一個調用異步函數(shù)的任務,task類型為future # task = loop.create_task(func1()) # 使用loop的.create_task()創(chuàng)建任務,效果一樣 loop.run_until_complete(task) # 開始在loop循環(huán)中執(zhí)行異步函數(shù),直到該函數(shù)運行完畢
asyncio.ensure_future(coroutine)
和 loop.create_task(coroutine)
都可以創(chuàng)建一個task
,run_until_complete
的參數(shù)是一個futrue對象。
當傳入一個協(xié)程,其內部會自動封裝成task,task是Future的子類。
isinstance(task, asyncio.Future)
將會輸出True。
future
類型的任務可以在loop.run_until_complete
中執(zhí)行,也可以直接用await
+任務變量名阻塞?調用
什么時候使用異步
import asyncio # 這是一個耗時很少的異步函數(shù) async def msg(text): await asyncio.sleep(0.1) print(text) # 這是一個耗時較長的異步函數(shù) async def long_operation(): print('long_operation started') await asyncio.sleep(3) print('long_operation finished') # 主函數(shù)部分,同樣需要聲明為async異步類型 async def main(): await msg('first') # 現(xiàn)在需要調用一個耗時較長的函數(shù)操作,不希望阻塞的等待它執(zhí)行完畢 # 希望long_operation在開始執(zhí)行后,立即調用msg,這里就可以將long_operation封裝到task任務中 task = asyncio.ensure_future(long_operation()) await msg('second') # 開始task中的long_operation函數(shù) await task # task執(zhí)行完畢后會繼續(xù)下面的代碼 print('All done.') if __name__ == "__main__": loop = asyncio.get_event_loop() loop.run_until_complete(main())
運行結果:
并發(fā)和并行
并發(fā)和并行一直是容易混淆的概念。并發(fā)通常指有多個任務需要同時進行,并行則是同一時刻有多個任務執(zhí)行。
用上課來舉例就是,并發(fā)情況下是一個老師在同一時間段輔助不同的人功課。
并行則是好幾個老師分別同時輔助多個學生功課。
簡而言之就是一個人同時吃三個饅頭還是三個人同時分別吃一個的情況,吃一個饅頭算一個任務。
asyncio實現(xiàn)并發(fā),就需要多個協(xié)程來完成任務,每當有任務阻塞的時候就await,然后其他協(xié)程繼續(xù)工作。
創(chuàng)建多個協(xié)程的列表,然后將這些協(xié)程注冊到事件循環(huán)中。
import asyncio import time now = lambda: time.time() async def do_some_work(x): print('Waiting: ', x) await asyncio.sleep(x) return 'Done after {}s'.format(x) start = now() coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) for task in tasks: print('Task ret: ', task.result()) print('TIME: ', now() - start)
結果如下
Waiting: 1
Waiting: 2
Waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
TIME: 4.003541946411133
總時間為4s左右。4s的阻塞時間,足夠前面兩個協(xié)程執(zhí)行完畢。如果是同步順序的任務,那么至少需要7s。此時我們使用了aysncio實現(xiàn)了并發(fā)。
asyncio.wait(tasks) 也可以使用 asyncio.gather(*tasks) ,前者接受一個task列表,后者接收一堆task。
異步結果回調
找了個別人寫的例子,大致理解了下實現(xiàn)過程:
- 定義
async
異步函數(shù) - 定義普通函數(shù)用于處理回調
- 獲取異步函數(shù)的
coroutine
協(xié)程對象(其實就是不帶await
修飾直接執(zhí)行異步函數(shù)返回的那個對象) - 獲取
loop
循環(huán)對象 - 使用低階層API手動創(chuàng)建
task
任務 - 為
task
任務對象注冊回調函數(shù) - 啟動
loop
循環(huán)調用異步函數(shù)
import time import asyncio now = lambda : time.time() async def do_some_work(x): print('Waiting: ', x) return 'Done after {}s'.format(x) def callback(future): print('Callback: ', future.result()) start = now() coroutine = do_some_work(2) loop = asyncio.get_event_loop() task = loop.create_task(coroutine) #task = asyncio.ensure_future(coroutine) # 貌似和上面用loop創(chuàng)建任務效果一樣 task.add_done_callback(callback) loop.run_until_complete(task) print('TIME: ', now() - start)
這里需要注意,在使用低層級API
手動創(chuàng)建異步任務的時候,不能同時使用高層級API
的簡單操作了,比如這里創(chuàng)建task
任務對象的時候,就不能用asyncio.create_task()
,否則會找不到loop
對象,返回下面的錯誤
RuntimeError: no running event loop
翻了一下asyncio\tasks.py
源代碼,原來所謂的高層級API
就是這么簡單的封裝啊…
調用的時候在函數(shù)內部又定義了一遍loop
def create_task(coro): """Schedule the execution of a coroutine object in a spawn task. Return a Task object. """ loop = events.get_running_loop() return loop.create_task(coro)
我自己寫的例子
創(chuàng)建4個異步任務同時開始執(zhí)行,每個任務執(zhí)行完成后將結果追加到result_list
數(shù)組變量中.
import asyncio result_list = [] async def fun(var): return var + 1 def callbackFun(future): result_list.append(future.result()) task_list = [] for i in range(1, 5): cor = fun(i) task = asyncio.ensure_future(cor) task.add_done_callback(callbackFun) task_list.append(task) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(task_list)) print(result_list)
當然,如果不需要使用回調函數(shù),而是等所有提交的異步任務都執(zhí)行完成后獲取它們的結果,可以這樣寫:
# 前面省略 loop.run_until_complete(asyncio.wait(task_list)) # task_list中的每個任務執(zhí)行完成后可以調用它的.result()方法來獲取結果 for task in task_list: print('每個task執(zhí)行完的結果:', task.result())
總結
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
Centos部署django服務nginx+uwsgi的方法
這篇文章主要介紹了Centos部署django服務nginx+uwsgi的方法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2019-01-01python matplotlib 注釋文本箭頭簡單代碼示例
這篇文章主要介紹了python matplotlib 注釋文本箭頭簡單代碼示例,具有一定借鑒價值。2018-01-01python+OpenCV人臉識別考勤系統(tǒng)實現(xiàn)的詳細代碼
作為一個基于人臉識別算法的考勤系統(tǒng)的設計與實現(xiàn)教程,以下內容將提供詳細的步驟和代碼示例。本教程將使用 Python 語言和 OpenCV 庫進行實現(xiàn),需要的朋友可以參考下2023-05-05