Async?IO在Python中的異步編程工作實(shí)例解析
引言
許多程序員都熟悉編寫順序(同步)代碼,在異步世界中,事件的發(fā)生獨(dú)立于主程序流程。這意味著動作在后臺執(zhí)行,無需等待上一個動作完成。
換句話說,代碼行是并發(fā)執(zhí)行的。
想象一下,你有一些獨(dú)立的任務(wù),每個任務(wù)都需要大量的運(yùn)行時(shí)間才能完成。他們的輸出不相互依賴。所以,您想一次啟動它們。如果這些任務(wù)按特定順序執(zhí)行,程序?qū)⒉坏貌坏却總€任務(wù)完成后再開始下一個任務(wù)。等待時(shí)間會阻塞程序。
異步編程范例有助于并發(fā)執(zhí)行這些任務(wù),并確保您可以克服等待時(shí)間并更有效地使用資源。
在 Python 中引入異步
Python 中引入了兩個主要組件:
1.
async io
這是一個 Python 包,允許 API 運(yùn)行和管理協(xié)程。2.
async/await
用來定義協(xié)程。
例如要進(jìn)行 HTTP 調(diào)用,請考慮使用 aiohttp
,這是一個 Python 包,允許異步進(jìn)行 HTTP 調(diào)用。在異步代碼中,常用的 requests
庫可能效果不是很好。
同樣,如果您正在使用 Mongo 數(shù)據(jù)庫,而不是依賴同步驅(qū)動程序(如 mongo-python
),您必須使用異步驅(qū)動程序(如 moto
異步訪問 MongoDB)。
在異步世界中,一切都在事件循環(huán)中運(yùn)行。這允許您一次運(yùn)行多個協(xié)程。我們將在本教程中了解協(xié)程是什么。
里面的一切 async def
都是異步代碼,其他一切都是同步的。
編寫異步代碼不像編寫同步代碼那么容易。Python 異步模型基于事件、回調(diào)、傳輸、協(xié)議和期貨等概念。
異步如何工作
asyncio
庫提供了兩個關(guān)鍵字, async
和 await
.
讓我們看一下這個 async hello-world 示例:
import asyncio async def hello(): print("Hello World!") await asyncio.sleep(1) print("Hello again!") asyncio.run(hello()) # Hello World! # Hello again!
乍一看你可能認(rèn)為這是一個同步代碼,因?yàn)榈诙未蛴〉却?1 秒打印Hello again!
在Hello World!
之后。但是這段代碼實(shí)際上是異步的。
協(xié)程
任何定義為 async def
的函數(shù)都是像上面那樣的協(xié)程。需要注意的是,調(diào)用 hello()
函數(shù)需要在 asyncio.run()
中執(zhí)行,
為了運(yùn)行協(xié)程,asyncio
提供了三種主要機(jī)制:
asyncio.run()
函數(shù),它是啟動事件循環(huán)并運(yùn)行異步的主要入口點(diǎn)。
使用 await
協(xié)程的結(jié)果并將控制權(quán)傳遞給事件循環(huán)。
import asyncio import time async def say_something(delay, words): print(f"Before {words}") await asyncio.sleep(delay) print(f"After {words}") async def main(): print(f"Started: {time.strftime('%X')}") await say_something(1, "Task 1") await say_something(2, "Task 2") print(f"Finished: {time.strftime( '%X' )}") asyncio.run(main()) # Started:15:59:52 # Before Task 1 # After Task 1 # Before Task 2 # After Task 2 # Finished:15:59:55
前面的代碼片段仍然等待 say_something()
協(xié)程完成,因此它在 1 秒內(nèi)執(zhí)行第一個任務(wù),然后在等待 2 秒后執(zhí)行第二個任務(wù)。
要讓協(xié)程并發(fā)運(yùn)行,我們應(yīng)該創(chuàng)建任務(wù),這是第三種機(jī)制。
asyncio.create_task()
用于安排協(xié)程執(zhí)行的函數(shù)。
import asyncio import time async def say_something(delay, words): print(f"Before {words}") await asyncio.sleep(delay) print(f"After {words}") async def main(): print(f"Started: {time.strftime('%X')}") task1 = asyncio.create_task(say_something(1,"Task 1")) task2 = asyncio.create_task(say_something(2, "Task 2")) await task1 await task2 print(f"Finished: {time.strftime('%X')}") asyncio.run(main()) # Started:16:07:35 # Before Task 1 # Before Task 2 # After Task 1 # After Task 2 # Finished:16:07:37
上面的代碼現(xiàn)在并發(fā)運(yùn)行,say_something()
協(xié)程不再等待 say_something()
協(xié)程完成。而是同時(shí)運(yùn)行具有不同參數(shù)的同一個協(xié)程。
發(fā)生的情況如下:
say_something()
協(xié)程從參數(shù)的第一個任務(wù)(1 秒和一個字符串Task 1
)開始。這個任務(wù)叫做 task1
。
然后它會暫停協(xié)程的執(zhí)行并等待 1 秒讓 say_something()
協(xié)程在遇到 await
關(guān)鍵字時(shí)完成。它將控制權(quán)返回給事件循環(huán)。
同樣對于第二個任務(wù),它會暫停協(xié)程的執(zhí)行并等待 2 秒讓 say_something()
協(xié)程完成,因?yàn)樗龅?nbsp;await
關(guān)鍵字。
task1
控制返回到事件循環(huán)后,事件循環(huán)繼續(xù)執(zhí)行第二個任務(wù) task2
因?yàn)?nbsp;asyncio.sleep()
還沒有完成。
asyncio.create_task()
包裝 say_something()
函數(shù)并使其作為異步任務(wù)同時(shí)運(yùn)行協(xié)程。 如您所見,上面的代碼片段顯示它的運(yùn)行速度比以前快了 1 秒。
當(dāng) asyncio.create_task()
被調(diào)用時(shí),協(xié)程會自動安排在事件循環(huán)中運(yùn)行。
任務(wù)可以幫助您并發(fā)運(yùn)行多個協(xié)程,但這并不是實(shí)現(xiàn)并發(fā)的唯一方法。
使用 asyncio.gather() 運(yùn)行并發(fā)任務(wù)
另一種同時(shí)運(yùn)行多個協(xié)程的方法是使用 asyncio.gather()
函數(shù)。此函數(shù)將協(xié)程作為參數(shù)并并發(fā)運(yùn)行它們。
import asyncio import time async def greetings(): print("Welcome") await asyncio.sleep(1) print("Goodbye") async def main(): start = time.time() await asyncio.gather(greetings(), greetings()) elapsed = time.time() - start print(f"{__name__} executed in {elapsed:0.2f} seconds.") asyncio.run(main()) # Welcome # Welcome # Goodbye # Goodbye # __main__ executed in 1.00 seconds.
在前面的代碼中,greetings()
協(xié)程被并發(fā)執(zhí)行了兩次。
等待對象
如果一個對象可以與 await
關(guān)鍵字一起使用,則該對象稱為可等待對象??傻却龑ο笾饕?3 種類型:coroutines
、tasks
和futures
。
coroutines
import asyncio async def mult(first, second): print("Calculating multiplication...") await asyncio.sleep(1) mul = first * second print(f"{first} multiplied by {second} is {mul}") return mul async def add(first, second): print("Calculating sum...") await asyncio.sleep(1) sum = first + second print(f"Sum of {first} and {second} is {sum}") return sum async def main(first, second): await mult(first, second) await add(first, second) asyncio.run(main(10, 20)) # Calculating multiplication... # 10 multiplied by 20 is 200 # Calculating sum... # Sum of 10 and 20 is 30
在前面的示例中, 協(xié)同程序 main()
等待 mult()
和 add()
結(jié)束。
假設(shè)您在 mult
協(xié)程之前省略了 await
關(guān)鍵字。然后您將收到以下錯誤: RuntimeWarning: coroutine 'mult' was never awaited.
。
tasks
要安排協(xié)程在事件循環(huán)中運(yùn)行,我們使用 asyncio.create_task()
函數(shù)。
import asyncio async def mult(first, second): print("Calculating multiplication...") await asyncio.sleep(1) mul = first * second print(f"{first} multiplied by {second} is {mul}") return mul async def add(first, second): print("Calculating sum...") await asyncio.sleep(1) sum = first + second print(f"Sum of {first} and {second} is {sum}") return sum async def main(first, second): mult_task = asyncio.create_task(mult(first, second)) add_task = asyncio.create_task(add(first, second)) await mult_task await add_task asyncio.run(main(10, 20)) # Calculating multiplication... # Calculating sum... # 10 multiplied by 20 is 200 # Sum of 10 and 20 is 30
futures
Future 是表示異步計(jì)算結(jié)果的低級可等待對象。它是通過調(diào)用 asyncio.Future()
函數(shù)創(chuàng)建的。
from asyncio import Future future = Future() print(future.done()) print(future.cancelled()) future.cancel() print(future.done()) print(future.cancelled()) # False # False # True # True
超時(shí)
用于 asyncio.wait_for(aw, timeout, *)
設(shè)置等待對象完成的超時(shí)。請注意, aw
這里是可等待的對象。如果要在可等待對象完成時(shí)間過長時(shí)引發(fā)異常,這將很有用。作為異常 asyncio.TimeoutError
。
import asyncio async def slow_operation(): await asyncio.sleep(400) print("Completed.") async def main(): try: await asyncio.wait_for(slow_operation(), timeout=1.0) except asyncio.TimeoutError: print("Timed out!") asyncio.run(main()) # Timed out!
盡管協(xié)程需要 400 秒才能完成,但 slow_operation()
中的超時(shí) Future 設(shè)置為 1 秒。
相關(guān)文章
python中的classmethod與staticmethod
這篇文章主要介紹了python中的classmethod與staticmethod,2022-01-01OpenCV-Python實(shí)現(xiàn)圖像平滑處理操作
圖像平滑處理的噪聲取值主要有6種方法,本文主要介紹了這6種方法的具體使用并配置實(shí)例方法,具有一定的參考價(jià)值,感興趣的可以了解一下2021-06-06Python使用Keras OCR實(shí)現(xiàn)從圖像中刪除文本
這篇文章主要為大家介紹了如何在Python中利用Keras OCR實(shí)現(xiàn)快速地從圖像中刪除文本,從而作為圖像分類器的預(yù)處理步驟,需要的可以參考一下2022-03-03Python cookbook(字符串與文本)在字符串的開頭或結(jié)尾處進(jìn)行文本匹配操作
這篇文章主要介紹了Python cookbook(字符串與文本)在字符串的開頭或結(jié)尾處進(jìn)行文本匹配操作,涉及Python使用str.startswith()和str.endswith()方法針對字符串開始或結(jié)尾處特定文本匹配操作相關(guān)實(shí)現(xiàn)技巧,需要的朋友可以參考下2018-04-04python框架flask入門之環(huán)境搭建及開啟調(diào)試
這篇文章主要介紹了python框架flask入門環(huán)境搭建及開啟調(diào)試的步驟設(shè)置,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-06-06OpenCV+MediaPipe實(shí)現(xiàn)手部關(guān)鍵點(diǎn)識別
這篇文章主要介紹了如何通過OpenCV MediaPipe實(shí)現(xiàn)手部關(guān)鍵點(diǎn)識別,文中的示例代碼講解詳細(xì),對我們學(xué)習(xí)或工作有一定的價(jià)值,需要的可以參考一下2022-01-01