Python?Asyncio中Coroutines,Tasks,Future可等待對象的關(guān)系及作用
前記
上一遍文章《Python中Async語法協(xié)程的實現(xiàn)》介紹了Python是如何以生成器來實現(xiàn)協(xié)程的以及Python Asyncio通過Future和Task的封裝來實現(xiàn)協(xié)程的調(diào)度,而在Python Asyncio之中Coroutines, Tasks和Future都屬于可等待對象,在使用的Asyncio的過程中,經(jīng)常涉及到三者的轉(zhuǎn)換和調(diào)度,開發(fā)者容易在概念和作用上犯迷糊,本文主要闡述的是三者之間的關(guān)系以及他們的作用。
1.Asyncio的入口
協(xié)程是線程中的一種特例,協(xié)程的入口和切換都是靠事件循環(huán)來調(diào)度的,在新版的Python中協(xié)程的入口是Asyncio.run,當(dāng)程序運行到Asyncio.run后,可以簡單的理解為程序由線程模式切換為協(xié)程模式(只是方便理解,對于計算機而言,并沒有這樣區(qū)分),
以下是一個最小的協(xié)程例子代碼:
import asyncio
async def main():
await asyncio.sleep(0)
asyncio.run(main())在這段代碼中,main函數(shù)和asyncio.sleep都屬于Coroutine,main是通過asyncio.run進(jìn)行調(diào)用的,接下來程序也進(jìn)入一個協(xié)程模式,asyncio.run的核心調(diào)用是Runner.run,它的代碼如下:
class Runner:
...
def run(self, coro, *, context=None):
"""Run a coroutine inside the embedded event loop."""
# 省略代碼
...
# 把coroutine轉(zhuǎn)為task
task = self._loop.create_task(coro, context=context)
# 省略代碼
...
try:
# 如果傳入的是Future或者coroutine,也會專為task
return self._loop.run_until_complete(task)
except exceptions.CancelledError:
# 省略代碼
...這段代碼中刪去了部分其它功能和初始化的代碼,可以看到這段函數(shù)的主要功能是通過loop.create_task方法把一個Coroutine對象轉(zhuǎn)為一個Task對象,然后通過loop.run_until_complete等待這個Task運行結(jié)束。
可以看到,Asycnio并不會直接去調(diào)度Coroutine,而是把它轉(zhuǎn)為Task再進(jìn)行調(diào)度,這是因為在Asyncio中事件循環(huán)的最小調(diào)度對象就是Task。不過在Asyncio中并不是所有的Coroutine的調(diào)用都會先被轉(zhuǎn)為Task對象再等待,比如示例代碼中的asyncio.sleep,由于它是在main函數(shù)中直接await的,所以它不會被進(jìn)行轉(zhuǎn)換,而是直接等待,通過調(diào)用工具分析展示的圖如下:

在這個圖示中,從main函數(shù)到asyncio.sleep函數(shù)中沒有明顯的loop.create_task等把Coroutine轉(zhuǎn)為Task調(diào)用,這里之所以不用進(jìn)行轉(zhuǎn)換的原因不是做了一些特殊優(yōu)化,而是本因如此, 這個await asyncio.sleep函數(shù)實際上還是會被main這個Coroutine轉(zhuǎn)換成的Task繼續(xù)調(diào)度到。
2.兩種Coroutine調(diào)用方法的區(qū)別
在了解Task的調(diào)度原理之前,還是先回到最初的調(diào)用示例,看看直接用Task調(diào)用和直接用Coroutine調(diào)用的區(qū)別是什么。
如下代碼,我們顯示的執(zhí)行一個Coroutine轉(zhuǎn)為Task的操作再等待,那么代碼會變成下面這樣:
import asyncio
async def main():
await asyncio.create_task(asyncio.sleep(0))
asyncio.run(main())這樣的代碼看起來跟最初的調(diào)用示例很像,沒啥區(qū)別,但是如果進(jìn)行一些改變,比如增加一些休眠時間和Coroutine的調(diào)用,就能看出Task對象的作用了,現(xiàn)在編寫兩份文件,
他們的代碼如下:
# demo_coro.py
import asyncio
import time
async def main():
await asyncio.sleep(1)
await asyncio.sleep(2)
s_t = time.time()
asyncio.run(main())
print(time.time() - s_t)
# // Output: 3.0028765201568604
# demo_task.py
import asyncio
import time
async def main():
task_1 = asyncio.create_task(asyncio.sleep(1))
task_2 = asyncio.create_task(asyncio.sleep(2))
await task_1
await task_2
s_t = time.time()
asyncio.run(main())
print(time.time() - s_t)
# // Output: 2.0027475357055664其中demo_coro.py進(jìn)行了兩次await調(diào)用,程序的運行總時長為3秒,而demo_task.py則是先把兩個Coroutine對象轉(zhuǎn)為Task對象,然后再進(jìn)行兩次await調(diào)用,程序的運行總時長為2秒??梢园l(fā)現(xiàn),demo_task.py的運行時長近似于其中運行最久的Task對象時長,而demo_coro.py的運行時長則是近似于兩個Coroutine對象的總運行時長。
之所以會是這樣的結(jié)果,是因為直接awaitCoroutine對象時,這段程序會一直等待,直到Coroutine對象執(zhí)行完畢再繼續(xù)往下走,而Task對象的不同之處就是在創(chuàng)建的那一刻,就已經(jīng)把自己注冊到事件循環(huán)之中等待被安排運行了,然后返回一個task對象供開發(fā)者等待,由于asyncio.sleep是一個純IO類型的調(diào)用,所以在這個程序中,兩個asyncio.sleepCoroutine被轉(zhuǎn)為Task從而實現(xiàn)了并發(fā)調(diào)用。
3.Task與Future
上述的代碼之所以通過Task能實現(xiàn)并發(fā)調(diào)用,是因為Task中出現(xiàn)了一些與事件循環(huán)交互的函數(shù),正是這些函數(shù)架起了Coroutine并發(fā)調(diào)用的可能, 不過Task是Future的一個子對象,所以在了解Task之前,需要先了解Future。
3.1.Future
與Coroutine只有讓步和接收結(jié)果不同的是Future除了讓步和接收結(jié)果功能外,它還是一個只會被動進(jìn)行事件調(diào)用且?guī)в袪顟B(tài)的容器,它在初始化時就是Pending狀態(tài),這時可以被取消,被設(shè)置結(jié)果和設(shè)置異常。而在被設(shè)定對應(yīng)的操作后,F(xiàn)uture會被轉(zhuǎn)化到一個不可逆的對應(yīng)狀態(tài),并通過loop.call_sonn來調(diào)用所有注冊到本身上的回調(diào)函數(shù),同時它帶有__iter__和__await__方法使其可以被await和yield from調(diào)用,它的主要代碼如下:
class Future:
...
def set_result(self, result):
"""設(shè)置結(jié)果,并安排下一個調(diào)用"""
if self._state != _PENDING:
raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
self._result = result
self._state = _FINISHED
self.__schedule_callbacks()
def set_exception(self, exception):
"""設(shè)置異常,并安排下一個調(diào)用"""
if self._state != _PENDING:
raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
if isinstance(exception, type):
exception = exception()
if type(exception) is StopIteration:
raise TypeError("StopIteration interacts badly with generators "
"and cannot be raised into a Future")
self._exception = exception
self._state = _FINISHED
self.__schedule_callbacks()
self.__log_traceback = True
def __await__(self):
"""設(shè)置為blocking,并接受await或者yield from調(diào)用"""
if not self.done():
self._asyncio_future_blocking = True
yield self # This tells Task to wait for completion.
if not self.done():
raise RuntimeError("await wasn't used with future")
return self.result() # May raise too.
__iter__ = __await__ # make compatible with 'yield from'.單看這段代碼是很難理解為什么下面這個future被調(diào)用set_result后就能繼續(xù)往下走:
async def demo(future: asyncio.Future):
await future
print("aha")這是因為Future跟Coroutine一樣,沒有主動調(diào)度的能力,只能通過Task和事件循環(huán)聯(lián)手被調(diào)度。
3.2.Task
Task是Future的子類,除了繼承了Future的所有方法,它還多了兩個重要的方法__step和__wakeup,通過這兩個方法賦予了Task調(diào)度能力,這是Coroutine和Future沒有的,Task的涉及到調(diào)度的主要代碼如下(說明見注釋):
class Task(futures._PyFuture): # Inherit Python Task implementation # from a Python Future implementation.
_log_destroy_pending = True
def __init__(self, coro, *, loop=None, name=None, context=None):
super().__init__(loop=loop)
# 省略部分初始化代碼
...
# 托管的coroutine
self._coro = coro
if context is None:
self._context = contextvars.copy_context()
else:
self._context = context
# 通過loop.call_sonn,在Task初始化后馬上就通知事件循環(huán)在下次有空的時候執(zhí)行自己的__step函數(shù)
self._loop.call_soon(self.__step, context=self._context)
def __step(self, exc=None):
coro = self._coro
# 方便asyncio自省
_enter_task(self._loop, self)
# Call either coro.throw(exc) or coro.send(None).
try:
if exc is None:
# 通過send預(yù)激托管的coroutine
# 這時候只會得到coroutine yield回來的數(shù)據(jù)或者收到一個StopIteration的異常
# 對于Future或者Task返回的是Self
result = coro.send(None)
else:
# 發(fā)送異常給coroutine
result = coro.throw(exc)
except StopIteration as exc:
# StopIteration代表Coroutine運行完畢
if self._must_cancel:
# coroutine在停止之前被執(zhí)行了取消操作,則需要顯示的執(zhí)行取消操作
self._must_cancel = False
super().cancel(msg=self._cancel_message)
else:
# 把運行完畢的值發(fā)送到結(jié)果值中
super().set_result(exc.value)
# 省略其它異常封裝
...
else:
# 如果沒有異常拋出
blocking = getattr(result, '_asyncio_future_blocking', None)
if blocking is not None:
# 通過Future代碼可以判斷,如果帶有_asyncio_future_blocking屬性,則代表當(dāng)前result是Future或者是Task
# 意味著這個Task里面裹著另外一個的Future或者Task
# 省略Future判斷
...
if blocking:
# 代表這這個Future或者Task處于卡住的狀態(tài),
# 此時的Task放棄了自己對事件循環(huán)的控制權(quán),等待這個卡住的Future或者Task執(zhí)行完成時喚醒一下自己
result._asyncio_future_blocking = False
result.add_done_callback(self.__wakeup, context=self._context)
self._fut_waiter = result
if self._must_cancel:
if self._fut_waiter.cancel(msg=self._cancel_message):
self._must_cancel = False
else:
# 不能被await兩次
new_exc = RuntimeError(
f'yield was used instead of yield from '
f'in task {self!r} with {result!r}')
self._loop.call_soon(
self.__step, new_exc, context=self._context)
elif result is None:
# 放棄了對事件循環(huán)的控制權(quán),代表自己托管的coroutine可能有個coroutine在運行,接下來會把控制權(quán)交給他和事件循環(huán)
# 當(dāng)前的coroutine里面即使沒有Future或者Task,但是子Future可能有
self._loop.call_soon(self.__step, context=self._context)
finally:
_leave_task(self._loop, self)
self = None # Needed to break cycles when an exception occurs.
def __wakeup(self, future):
# 其它Task和Future完成后會調(diào)用到該函數(shù),接下來進(jìn)行一些處理
try:
# 回收Future的狀態(tài),如果Future發(fā)生了異常,則把異常傳回給自己
future.result()
except BaseException as exc:
# This may also be a cancellation.
self.__step(exc)
else:
# Task并不需要自己托管的Future的結(jié)果值,而且如下注釋,這樣能使調(diào)度變得更快
# Don't pass the value of `future.result()` explicitly,
# as `Future.__iter__` and `Future.__await__` don't need it.
# If we call `_step(value, None)` instead of `_step()`,
# Python eval loop would use `.send(value)` method call,
# instead of `__next__()`, which is slower for futures
# that return non-generator iterators from their `__iter__`.
self.__step()
self = None # Needed to break cycles when an exception occurs.這份源碼的Task對象中的__setp方法比較長,經(jīng)過精簡后可以發(fā)現(xiàn)他主要做的工作有三個:
- 1.通過
send或者throw來驅(qū)動Coroutine進(jìn)行下一步 - 2.通過給被自己托管的Future或者Task添加回調(diào)來獲得完成的通知并重新獲取控制權(quán)
- 3.通過
loop.call_soon來讓步,把控制權(quán)交給事件循環(huán)
單通過源碼分析可能很難明白, 以下是以兩種Coroutine的代碼為例子,簡單的闡述Task與事件循環(huán)調(diào)度的過程,首先是demo_coro,這個例子中只有一個Task:
# demo_coro.py
import asyncio
import time
async def main():
await asyncio.sleep(1)
await asyncio.sleep(2)
s_t = time.time()
asyncio.run(main())
print(time.time() - s_t)
# // Output: 3.0028765201568604這個例子中第一步是把main轉(zhuǎn)為一個Task,然后調(diào)用到了對應(yīng)的__step方法,這時候__step方法會會調(diào)用main()這個Coroutine的send(None)方法。
之后整個程序的邏輯會直接轉(zhuǎn)到main函數(shù)中的await asyncio.sleep(1)這個Coroutine中,await asyncio.sleep(1)會先生成一個Future對象,并通過loop.call_at告訴事件循環(huán)在1秒后激活這個Future對象,然后把對象返回。這時候邏輯會重新回到Task的__step方法中,__step發(fā)現(xiàn)send調(diào)用得到的是一個Future對象,所以就在這個Future添加一個回調(diào),讓Future完成的時候來激活自己,然后放棄了對事件循環(huán)的控制權(quán)。接著就是事件循環(huán)在一秒后激活了這個Future對象,這時程序邏輯就會執(zhí)行到Future的回調(diào),也就是Task的__wakeup方法,于是Task的__step又被調(diào)用到了,而這次遇到的是后面的await asyncio.sleep(2),于是又走了一遍上面的流程。當(dāng)兩個asyncio.sleep都執(zhí)行完成后,Task的__step方法里在對Coroutine發(fā)送一個send(None)后就捕獲到了StopIteration異常,這時候Task就會通過set_result設(shè)置結(jié)果,并結(jié)束自己的調(diào)度流程。
可以看到demo_core.py中只有一個Task在負(fù)責(zé)和事件循環(huán)一起調(diào)度,事件循環(huán)的開始一定是一個Task,并通過Task來調(diào)起一個Coroutine,通過__step方法把后續(xù)的Future,Task,Coroutine都當(dāng)成一條鏈來運行,而demo_task.py則不一樣了,它有兩個Task,代碼如下:
# demo_task.py
import asyncio
import time
async def main():
task_1 = asyncio.create_task(asyncio.sleep(1))
task_2 = asyncio.create_task(asyncio.sleep(2))
await task_1
await task_2
s_t = time.time()
asyncio.run(main())
print(time.time() - s_t)
# // Output: 2.0027475357055664這個例子中第一步還是跟demo_coro一樣,但跳轉(zhuǎn)到main函數(shù)后就開始有區(qū)別了,首先在這函數(shù)中創(chuàng)建了task1和task2兩個Task,他們分別都會通過__step方法中的send激活對應(yīng)的asyncio.sleepCoroutine,然后等待對應(yīng)的Future來通知自己已經(jīng)完成了。而對于創(chuàng)建了這兩個Task的main Task來說,通過main函數(shù)的awati task_1和await task_2來獲取到他們的“控制權(quán)“。首先是通過await task_1語句,main Task中的__step方法里在調(diào)用send后得到的是task_1對應(yīng)的Future,這時候就可以為這個Future添加一個回調(diào),讓他完成時通知自己,自己再走下一步,對于task_2也是如此。 直到最后兩個task都執(zhí)行完成,main Task也捕獲到了StopIteration異常,通過set_result設(shè)置結(jié)果,并結(jié)束自己的調(diào)度流程。
可以看到demo_task.py與demo_coro.py有個明顯的區(qū)別在于main Task在運行的生命周期中創(chuàng)建了兩個Task,并通過await托管了兩個Task,同時兩個Task又能實現(xiàn)兩個協(xié)程的并發(fā),所以可以發(fā)現(xiàn)事件循環(huán)運行期間,當(dāng)前協(xié)程的并發(fā)數(shù)永遠(yuǎn)小于事件循環(huán)中注冊的Task數(shù)量。此外,如果在main Task中如果沒有顯式的進(jìn)行await,那么子Task就會逃逸,不受main Task管理,如下:
# demo_task.py
import asyncio
import time
def mutli_task():
task_1 = asyncio.create_task(asyncio.sleep(1))
task_2 = asyncio.create_task(asyncio.sleep(2))
async def main():
mutli_task()
await asyncio.sleep(1.5)
s_t = time.time()
asyncio.run(main())
print(time.time() - s_t)
# // Output: 1.5027475357055664 在這段代碼中,main Task在執(zhí)行到mutli_task時,會創(chuàng)建出兩個task,但是在__step中的coro.send(None)調(diào)用得到的結(jié)果卻是await asyncio.sleep(1.5)返回的Future,所以main Task只能調(diào)用到這個Future的add_don_callback來裝載自己的__wakeup方法,最終導(dǎo)致到main Task只能托管到await asyncio.sleep(1.5)的Future,而mutli_task創(chuàng)建的task則逃逸了,成為另一條鏈的頂點Task。
不過這個程序的事件循環(huán)只管理到了main Task所以事件循環(huán)會一直運行,直到main Task運行結(jié)束的時候才退出,這時程序會跟著一起退出,所以程序的運行時間只有1.5秒左右。
此外由于另外的Task也是注冊到這個事件循環(huán)上面,所以事件循環(huán)會幫忙把task_1執(zhí)行完畢,而task_2定義的休眠時間是2秒,程序退出之前事件循環(huán)會發(fā)現(xiàn)有個Task尚未執(zhí)行完畢,于是會對這個Task進(jìn)行清理并打印一條警報。
4.總結(jié)
在深入了Task,F(xiàn)uture的源碼了解后,了解了Task和Future在Asyncio的作用,同時也發(fā)現(xiàn)Task和Future都跟loop有一定的耦合,而loop也可以通過一定的方法來創(chuàng)建Task和Future,所以如果要真正的理解到Asyncio的調(diào)度原理,還需要更進(jìn)入一步,通過Asyncio的源碼來了解整個Asyncio的設(shè)計。
到此這篇關(guān)于Python Asyncio中Coroutines,Tasks,Future可等待對象的關(guān)系及作用的文章就介紹到這了,更多相關(guān)Python Asyncio 內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
pandas刪除部分?jǐn)?shù)據(jù)后重新生成索引的實現(xiàn)
這篇文章主要介紹了pandas刪除部分?jǐn)?shù)據(jù)后重新生成索引的實現(xiàn)方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-07-07
使用Selenium在Python中實現(xiàn)錄屏功能
Selenium 是一個強大的用于自動化測試的工具,但你知道它也可以用來錄制瀏覽器操作的視頻嗎?本文將介紹如何使用 Selenium 在 Python 中實現(xiàn)錄屏功能,以便記錄和分享你的網(wǎng)頁操作過程,需要的朋友可以參考下2023-11-11
運行獨立 pyspark 時出現(xiàn) Windows 錯誤解決辦法
在本篇文章里小編給大家分享的是一篇關(guān)于運行獨立 pyspark 時出現(xiàn) Windows 錯誤解決辦法,對此有需求的方法可以參考下。2021-12-12
tensorflow中tf.reduce_mean函數(shù)的使用
這篇文章主要介紹了tensorflow中tf.reduce_mean函數(shù)的使用,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-04-04

