python異步編程之a(chǎn)syncio高階API的使用詳解
1.asyncio 高階API列表
asyncio中函數(shù)可以分為高階函數(shù)和低階函數(shù)。低階函數(shù)用于調(diào)用事件循環(huán)、linux 套接字、信號(hào)等更底層的功能,高階函數(shù)是屏蔽了更多底層細(xì)節(jié)的任務(wù)并發(fā),任務(wù)執(zhí)行函數(shù)。通常開發(fā)中使用更多的是高階函數(shù)。本篇主要介紹asyncio中常用的高階函數(shù)。
由于asyncio在不同的版本中有差異,本文以及本系列都以python3.10為準(zhǔn)。
函數(shù) | 功能 |
---|---|
run() | 創(chuàng)建事件循環(huán),運(yùn)行一個(gè)協(xié)程,關(guān)閉事件循環(huán)。 |
create_task() | 創(chuàng)建一個(gè)asyncio的Task對(duì)象 |
await sleep() | 休眠幾秒 |
await gather() | 并發(fā)執(zhí)行所有事件的調(diào)度和等待 |
await wait_for() | 有超時(shí)控制的運(yùn)行 |
await shield() | 屏蔽取消操作 |
await wait() | 完成情況的監(jiān)控器 |
current_task() | 返回當(dāng)前Task對(duì)象 |
all_tasks() | 返回事件循環(huán)中所有的task對(duì)象 |
Task | Task對(duì)象 |
to_thread() | 在不同的 OS 線程中異步地運(yùn)行一個(gè)函數(shù) |
run_coroutine_threadsafe() | 從其他OS線程中調(diào)度一個(gè)協(xié)程 |
for in as_completed() | 用 for 循環(huán)監(jiān)控完成情況 |
2.run
函數(shù)原型:
asyncio.run(coro, *, debug=False)
功能:創(chuàng)建事件循環(huán),運(yùn)行傳入的協(xié)程。該函數(shù)總是會(huì)創(chuàng)建一個(gè)新的事件循環(huán)并在結(jié)束時(shí)關(guān)閉它,應(yīng)該被當(dāng)做asyncio程序的主入口點(diǎn)。run() 函數(shù)是用來創(chuàng)建事件,將task加入事件,運(yùn)行事件的函數(shù)。
async def main(): await asyncio.sleep(1) print('hello') asyncio.run(main())
run() 從功能上等價(jià)于以下低階API。獲取一個(gè)事件循環(huán),創(chuàng)建一個(gè)task,加入事件循環(huán)。
loop = asyncio.get_event_loop() task = loop.create_task(main()) loop.run_until_complete(task)
3.create_task
函數(shù)原型:
asyncio.create_task(coro, *, name=None)
功能:將協(xié)程函數(shù)封裝成一個(gè)Task。協(xié)程函數(shù)沒有生命周期,但是Task有生命周期。
將協(xié)程打包為一個(gè) Task 并自動(dòng)尋找事件循環(huán)加入。返回 Task 對(duì)象。該任務(wù)會(huì)在 get_running_loop() 返回的循環(huán)中執(zhí)行,如果當(dāng)前線程沒有在運(yùn)行的循環(huán)則會(huì)引發(fā) RuntimeError。
async def coro(): await asyncio.sleep(1) print("i am coro") async def main(): task = asyncio.create_task(coro()) print(f"task狀態(tài):{task._state}") await asyncio.sleep(2) print(f"task狀態(tài):{task._state}") print("i am main") asyncio.run(main())
結(jié)果:
task狀態(tài):PENDING
i am coro task
狀態(tài):FINISHED
i am main
結(jié)果分析:
可以看到task運(yùn)行中的狀態(tài)和結(jié)束的生命周期狀態(tài)
4.gather
函數(shù)原型:
asyncio.gather(*aws, return_exceptions=False)
功能:
并發(fā)執(zhí)行所有可等待對(duì)象,收集任務(wù)結(jié)果,返回所有已經(jīng)完成的task的結(jié)果。結(jié)果將是一個(gè)由所有返回值組成的列表。結(jié)果值的順序與傳入的task的順序一致。可等待對(duì)象可以是協(xié)程和task。
如果序列中是協(xié)程而不是task,那么會(huì)將其自動(dòng)封裝成task加入事件循環(huán)。
import asyncio async def coro(value): print(f"hello coro{value}") return f"coro{value}" async def main(): tasks = [coro(i) for i in range(5)] res = await asyncio.gather(*tasks) for i in res: print(i) asyncio.run(main())
結(jié)果:
hello coro0
hello coro1
hello coro2
hello coro3
hello coro4
coro0
coro1
coro2
coro3
coro4
結(jié)果分析:
獲取了所有協(xié)程的返回值,并且返回的順序和任務(wù)的順序一致。
5.wait
函數(shù)原型:
asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)
功能:
并發(fā)地運(yùn)行序列中的可等待對(duì)象,并進(jìn)入阻塞狀態(tài)直到滿足 return_when 所指定的條件。將task任務(wù)結(jié)果收集起來,返回兩個(gè) Task/Future 集合: (done, pending)。done是已經(jīng)完成的任務(wù),pending是未完成的任務(wù),未完成的原因可能是超時(shí)或return_when策略。
aws:
aws中保存的是task而不是協(xié)程,從3.8起不建議傳入?yún)f(xié)程,3.11將不再支持傳入?yún)f(xié)程。
timeout:
如指定 timeout (float 或 int 類型) 則它將被用于控制返回之前等待的最長(zhǎng)秒數(shù)。
請(qǐng)注意此函數(shù)不會(huì)引發(fā) asyncio.TimeoutError。當(dāng)超時(shí)發(fā)生時(shí),未完成的 Future 或 Task 將不會(huì)繼續(xù)執(zhí)行,不會(huì)返回結(jié)果。
return_when:
return_when 指定此函數(shù)應(yīng)在何時(shí)返回。它必須為以下參數(shù)之一:
參數(shù) | 描述 |
---|---|
FIRST_COMPLETED | 函數(shù)將在任意可等待對(duì)象結(jié)束或取消時(shí)返回。 |
FIRST_EXCEPTION | 函數(shù)將在任意可等待對(duì)象因引發(fā)異常而結(jié)束時(shí)返回。當(dāng)沒有引發(fā)任何異常時(shí)它就相當(dāng)于 ALL_COMPLETED。 |
ALL_COMPLETED | 函數(shù)將在所有可等待對(duì)象結(jié)束或取消時(shí)返回。 |
基礎(chǔ)使用示例:
import asyncio async def coro(value): print(f"hello coro{value}") return f"coro{value}" async def main(): tasks = [asyncio.create_task(coro(i)) for i in range(5)] done, pending = await asyncio.wait(tasks) for i in done: print(i.result()) asyncio.run(main())
結(jié)果:
hello coro0
hello coro1
hello coro2
hello coro3
hello coro4
coro1
coro2
coro0
coro3
coro4
結(jié)果分析:
返回結(jié)果和執(zhí)行順序并不是一致的
指定超時(shí)時(shí)間:
import asyncio from asyncio import FIRST_COMPLETED async def coro(value): print(f"hello coro{value}") await asyncio.sleep(value) return f"coro{value}" async def main(): tasks = [asyncio.create_task(coro(i)) for i in range(5)] done, pending = await asyncio.wait(tasks, timeout=3) print("---------finish----------") for i in done: print(i.result()) print("---------pending----------") for i in pending: print(i) asyncio.run(main())
hello coro0
hello coro1
hello coro2
hello coro3
hello coro4
---------finish----------
coro1
coro2
coro0
---------pending----------
<Task pending name='Task-5' coro=<coro() running at /Users/ljk/Documents/code/daily_dev/async_demo/wait_demo.py:6> wait_for=<Future finished result=None>>
<Task pending name='Task-6' coro=<coro() running at /Users/ljk/Documents/code/daily_dev/async_demo/wait_demo.py:6> wait_for=<Future pending cb=[Task.task_wakeup()]>>
結(jié)果分析:
超時(shí)未完成的task會(huì)保存在pending中,未完成的task在超時(shí)之后不會(huì)繼續(xù)執(zhí)行,沒有返回結(jié)果。
return_when配置任意任務(wù)完成就返回:
import asyncio from asyncio import FIRST_COMPLETED async def coro(value): print(f"hello coro{value}") await asyncio.sleep(value) return f"coro{value}" async def main(): tasks = [asyncio.create_task(coro(i)) for i in range(5)] done, pending = await asyncio.wait(tasks, return_when=FIRST_COMPLETED) print("---------finish----------") for i in done: print(i.result()) print("---------pending----------") for i in pending: print(i) asyncio.run(main())
結(jié)果:
hello coro0
hello coro1
hello coro2
hello coro3
hello coro4
---------finish----------
coro0
---------pending----------
<Task pending name='Task-5' coro=<coro() running at /Users/ljk/Documents/code/daily_dev/async_demo/wait_demo.py:6> wait_for=<Future pending cb=[Task.task_wakeup()]>>
<Task pending name='Task-3' coro=<coro() running at /Users/ljk/Documents/code/daily_dev/async_demo/wait_demo.py:6> wait_for=<Future pending cb=[Task.task_wakeup()]>>
<Task pending name='Task-4' coro=<coro() running at /Users/ljk/Documents/code/daily_dev/async_demo/wait_demo.py:6> wait_for=<Future pending cb=[Task.task_wakeup()]>>
<Task pending name='Task-6' coro=<coro() running at /Users/ljk/Documents/code/daily_dev/async_demo/wait_demo.py:6> wait_for=<Future pending cb=[Task.task_wakeup()]>>
結(jié)果分析:
獲取到任意結(jié)果就返回,未完成的task保存在pending中。未完成的task在超時(shí)之后不會(huì)繼續(xù)執(zhí)行。
6.as_completed
函數(shù)原型:
asyncio.as_completed(aws, *, timeout=None)
說明:并發(fā)執(zhí)行aws中保存的可等待對(duì)象,返回一個(gè)協(xié)程的迭代器??梢詮牡髦腥〕鲎钕葓?zhí)行完成的task的結(jié)果。返回結(jié)果和執(zhí)行順序不一致。aws中可以是task或協(xié)程序列。
import asyncio async def coro(value): print(f"hello coro{value}") return f"coro{value}" async def main(): tasks = [coro(i) for i in range(5)] for item in asyncio.as_completed(tasks): res = await item print(res) asyncio.run(main())
結(jié)果:
hello coro2
hello coro3
hello coro4
hello coro1
hello coro0
coro2
coro3
coro4
coro1
coro0
結(jié)果分析:
所有任務(wù)都會(huì)執(zhí)行完成,沒有超時(shí)配置。返回順序和執(zhí)行順序無關(guān)。
7.gather、wait、as_completed 異同點(diǎn)小結(jié)
asyncio協(xié)程體系中可以實(shí)現(xiàn)創(chuàng)建多個(gè)任務(wù)并發(fā)執(zhí)行的函數(shù)有以下三個(gè):
- asyncio.gather
- asyncio.wait
- asyncio.as_completed
不同之處比較:
特性/函數(shù) | gather | wait | as_completed |
---|---|---|---|
入?yún)?/td> | 同時(shí)支持task和協(xié)程序列 | 只支持task序列 | 同時(shí)支持task和協(xié)程序列 |
獲取結(jié)果順序 | 有序,和并發(fā)序列順序相同 | 無序,和并發(fā)序列無關(guān) | 無序,和并發(fā)序列無關(guān) |
返回 | 返回結(jié)果列表,保存的是函數(shù)返回值。 | 返回元組done、pending。元組中保存的是task,而非task 的函數(shù)返回值 | 返回一個(gè)迭代器,從中可迭代出函數(shù)返回值。 |
8.wait for
函數(shù)原型:
asyncio.wait_for(aw, timeout)
功能:執(zhí)行單個(gè)可等待對(duì)象,指定 timeout 秒數(shù)后超時(shí)
等待可等待對(duì)象完成,指定timeout秒數(shù)后超時(shí)。和gather類似,可以自動(dòng)將協(xié)程轉(zhuǎn)化成任務(wù)加入循環(huán)。
timeout 可以為 None,也可以為 float 或 int 型數(shù)值表示的等待秒數(shù)。如果 timeout 為 None,則等待直到完成。
如果發(fā)生超時(shí),任務(wù)將取消并引發(fā) asyncio.TimeoutError。
async def coro(): # 睡眠5s await asyncio.sleep(3600) print('finish!') async def main(): # Wait for at most 1 second try: await asyncio.wait_for(coro(), timeout=1.0) except asyncio.TimeoutError: print('timeout!') asyncio.run(main())
結(jié)果:
timeout!
以上就是python異步編程之a(chǎn)syncio高階API的使用詳解的詳細(xì)內(nèi)容,更多關(guān)于python asyncio高階API的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
掌握python polars庫(kù)進(jìn)行高效高速的數(shù)據(jù)處理。
這篇文章主要介紹了python polars庫(kù)進(jìn)行高效高速的數(shù)據(jù)處理技巧詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2024-01-01python?tkinter自定義實(shí)現(xiàn)Expander控件
和其他成熟的GUI庫(kù)相比,tkinter的組件并不是太多,但在自定義組件這一點(diǎn)上,并不遜色于其他框架,下面小編就教大家如何自定義一個(gè)Expander控件吧2023-08-08TensorFlow tf.nn.softmax_cross_entropy_with_logits的用法
這篇文章主要介紹了TensorFlow tf.nn.softmax_cross_entropy_with_logits的用法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-04-04Python使用psutil庫(kù)實(shí)現(xiàn)系統(tǒng)監(jiān)控與管理詳解
在我們的測(cè)試工作中,監(jiān)控和管理系統(tǒng)資源是一項(xiàng)重要的任務(wù),本文將介紹如何使用psutil庫(kù)來實(shí)現(xiàn)系統(tǒng)監(jiān)控和管理,以及一些實(shí)用的技巧和示例,希望對(duì)大家有所幫助2022-10-10Python+matplotlib調(diào)用隨機(jī)函數(shù)生成變化圖形
這篇文章主要介紹了如何在Python中利用隨機(jī)函數(shù)生成變化的圖形,文中的示例代碼講解詳細(xì),對(duì)我們學(xué)習(xí)有一定吧參考價(jià)值,需要的可以了解一下2022-04-04python中squeeze的超詳細(xì)解釋(附代碼示例)
這篇文章主要介紹了python中squeeze的超詳細(xì)解釋,squeeze操作用于去除張量或數(shù)組中大小為1的維度,簡(jiǎn)化數(shù)據(jù)結(jié)構(gòu),在PyTorch和NumPy中都有類似的功能,需要的朋友可以參考下2025-03-03