Python?Asyncio中Coroutines,Tasks,Future可等待對(duì)象的關(guān)系及作用
前記
上一遍文章《Python中Async語(yǔ)法協(xié)程的實(shí)現(xiàn)》介紹了Python是如何以生成器來(lái)實(shí)現(xiàn)協(xié)程的以及Python Asyncio通過(guò)Future和Task的封裝來(lái)實(shí)現(xiàn)協(xié)程的調(diào)度,而在Python Asyncio之中Coroutines, Tasks和Future都屬于可等待對(duì)象,在使用的Asyncio的過(guò)程中,經(jīng)常涉及到三者的轉(zhuǎn)換和調(diào)度,開(kāi)發(fā)者容易在概念和作用上犯迷糊,本文主要闡述的是三者之間的關(guān)系以及他們的作用。
1.Asyncio的入口
協(xié)程是線程中的一種特例,協(xié)程的入口和切換都是靠事件循環(huán)來(lái)調(diào)度的,在新版的Python
中協(xié)程的入口是Asyncio.run
,當(dāng)程序運(yùn)行到Asyncio.run
后,可以簡(jiǎn)單的理解為程序由線程模式切換為協(xié)程模式(只是方便理解,對(duì)于計(jì)算機(jī)而言,并沒(méi)有這樣區(qū)分),
以下是一個(gè)最小的協(xié)程例子代碼:
import asyncio async def main(): await asyncio.sleep(0) asyncio.run(main())
在這段代碼中,main
函數(shù)和asyncio.sleep
都屬于Coroutine,main
是通過(guò)asyncio.run
進(jìn)行調(diào)用的,接下來(lái)程序也進(jìn)入一個(gè)協(xié)程模式,asyncio.run
的核心調(diào)用是Runner.run
,它的代碼如下:
class Runner: ... def run(self, coro, *, context=None): """Run a coroutine inside the embedded event loop.""" # 省略代碼 ... # 把coroutine轉(zhuǎn)為task task = self._loop.create_task(coro, context=context) # 省略代碼 ... try: # 如果傳入的是Future或者coroutine,也會(huì)專為task return self._loop.run_until_complete(task) except exceptions.CancelledError: # 省略代碼 ...
這段代碼中刪去了部分其它功能和初始化的代碼,可以看到這段函數(shù)的主要功能是通過(guò)loop.create_task方法把一個(gè)Coroutine對(duì)象轉(zhuǎn)為一個(gè)Task對(duì)象,然后通過(guò)loop.run_until_complete等待這個(gè)Task運(yùn)行結(jié)束。
可以看到,Asycnio
并不會(huì)直接去調(diào)度Coroutine,而是把它轉(zhuǎn)為Task再進(jìn)行調(diào)度,這是因?yàn)樵?code>Asyncio中事件循環(huán)的最小調(diào)度對(duì)象就是Task。不過(guò)在Asyncio
中并不是所有的Coroutine的調(diào)用都會(huì)先被轉(zhuǎn)為Task對(duì)象再等待,比如示例代碼中的asyncio.sleep
,由于它是在main
函數(shù)中直接await的,所以它不會(huì)被進(jìn)行轉(zhuǎn)換,而是直接等待,通過(guò)調(diào)用工具分析展示的圖如下:
在這個(gè)圖示中,從main
函數(shù)到asyncio.sleep
函數(shù)中沒(méi)有明顯的loop.create_task
等把Coroutine轉(zhuǎn)為Task調(diào)用,這里之所以不用進(jìn)行轉(zhuǎn)換的原因不是做了一些特殊優(yōu)化,而是本因如此, 這個(gè)await asyncio.sleep
函數(shù)實(shí)際上還是會(huì)被main
這個(gè)Coroutine轉(zhuǎn)換成的Task
繼續(xù)調(diào)度到。
2.兩種Coroutine調(diào)用方法的區(qū)別
在了解Task
的調(diào)度原理之前,還是先回到最初的調(diào)用示例,看看直接用Task調(diào)用和直接用Coroutine調(diào)用的區(qū)別是什么。
如下代碼,我們顯示的執(zhí)行一個(gè)Coroutine轉(zhuǎn)為Task的操作再等待,那么代碼會(huì)變成下面這樣:
import asyncio async def main(): await asyncio.create_task(asyncio.sleep(0)) asyncio.run(main())
這樣的代碼看起來(lái)跟最初的調(diào)用示例很像,沒(méi)啥區(qū)別,但是如果進(jìn)行一些改變,比如增加一些休眠時(shí)間和Coroutine的調(diào)用,就能看出Task對(duì)象的作用了,現(xiàn)在編寫兩份文件,
他們的代碼如下:
# demo_coro.py import asyncio import time async def main(): await asyncio.sleep(1) await asyncio.sleep(2) s_t = time.time() asyncio.run(main()) print(time.time() - s_t) # // Output: 3.0028765201568604 # demo_task.py import asyncio import time async def main(): task_1 = asyncio.create_task(asyncio.sleep(1)) task_2 = asyncio.create_task(asyncio.sleep(2)) await task_1 await task_2 s_t = time.time() asyncio.run(main()) print(time.time() - s_t) # // Output: 2.0027475357055664
其中demo_coro.py
進(jìn)行了兩次await
調(diào)用,程序的運(yùn)行總時(shí)長(zhǎng)為3秒,而demo_task.py
則是先把兩個(gè)Coroutine對(duì)象轉(zhuǎn)為Task對(duì)象,然后再進(jìn)行兩次await
調(diào)用,程序的運(yùn)行總時(shí)長(zhǎng)為2秒。可以發(fā)現(xiàn),demo_task.py
的運(yùn)行時(shí)長(zhǎng)近似于其中運(yùn)行最久的Task對(duì)象時(shí)長(zhǎng),而demo_coro.py
的運(yùn)行時(shí)長(zhǎng)則是近似于兩個(gè)Coroutine對(duì)象的總運(yùn)行時(shí)長(zhǎng)。
之所以會(huì)是這樣的結(jié)果,是因?yàn)橹苯?code>awaitCoroutine對(duì)象時(shí),這段程序會(huì)一直等待,直到Coroutine對(duì)象執(zhí)行完畢再繼續(xù)往下走,而Task對(duì)象的不同之處就是在創(chuàng)建的那一刻,就已經(jīng)把自己注冊(cè)到事件循環(huán)之中等待被安排運(yùn)行了,然后返回一個(gè)task對(duì)象供開(kāi)發(fā)者等待,由于asyncio.sleep
是一個(gè)純IO類型的調(diào)用,所以在這個(gè)程序中,兩個(gè)asyncio.sleep
Coroutine被轉(zhuǎn)為Task從而實(shí)現(xiàn)了并發(fā)調(diào)用。
3.Task與Future
上述的代碼之所以通過(guò)Task能實(shí)現(xiàn)并發(fā)調(diào)用,是因?yàn)門ask中出現(xiàn)了一些與事件循環(huán)交互的函數(shù),正是這些函數(shù)架起了Coroutine并發(fā)調(diào)用的可能, 不過(guò)Task是Future的一個(gè)子對(duì)象,所以在了解Task之前,需要先了解Future。
3.1.Future
與Coroutine只有讓步和接收結(jié)果不同的是Future除了讓步和接收結(jié)果功能外,它還是一個(gè)只會(huì)被動(dòng)進(jìn)行事件調(diào)用且?guī)в袪顟B(tài)的容器,它在初始化時(shí)就是Pending
狀態(tài),這時(shí)可以被取消,被設(shè)置結(jié)果和設(shè)置異常。而在被設(shè)定對(duì)應(yīng)的操作后,F(xiàn)uture會(huì)被轉(zhuǎn)化到一個(gè)不可逆的對(duì)應(yīng)狀態(tài),并通過(guò)loop.call_sonn
來(lái)調(diào)用所有注冊(cè)到本身上的回調(diào)函數(shù),同時(shí)它帶有__iter__
和__await__
方法使其可以被await
和yield from
調(diào)用,它的主要代碼如下:
class Future: ... def set_result(self, result): """設(shè)置結(jié)果,并安排下一個(gè)調(diào)用""" if self._state != _PENDING: raise exceptions.InvalidStateError(f'{self._state}: {self!r}') self._result = result self._state = _FINISHED self.__schedule_callbacks() def set_exception(self, exception): """設(shè)置異常,并安排下一個(gè)調(diào)用""" if self._state != _PENDING: raise exceptions.InvalidStateError(f'{self._state}: {self!r}') if isinstance(exception, type): exception = exception() if type(exception) is StopIteration: raise TypeError("StopIteration interacts badly with generators " "and cannot be raised into a Future") self._exception = exception self._state = _FINISHED self.__schedule_callbacks() self.__log_traceback = True def __await__(self): """設(shè)置為blocking,并接受await或者yield from調(diào)用""" if not self.done(): self._asyncio_future_blocking = True yield self # This tells Task to wait for completion. if not self.done(): raise RuntimeError("await wasn't used with future") return self.result() # May raise too. __iter__ = __await__ # make compatible with 'yield from'.
單看這段代碼是很難理解為什么下面這個(gè)future被調(diào)用set_result
后就能繼續(xù)往下走:
async def demo(future: asyncio.Future): await future print("aha")
這是因?yàn)镕uture跟Coroutine一樣,沒(méi)有主動(dòng)調(diào)度的能力,只能通過(guò)Task和事件循環(huán)聯(lián)手被調(diào)度。
3.2.Task
Task是Future的子類,除了繼承了Future的所有方法,它還多了兩個(gè)重要的方法__step
和__wakeup
,通過(guò)這兩個(gè)方法賦予了Task調(diào)度能力,這是Coroutine和Future沒(méi)有的,Task的涉及到調(diào)度的主要代碼如下(說(shuō)明見(jiàn)注釋):
class Task(futures._PyFuture): # Inherit Python Task implementation # from a Python Future implementation. _log_destroy_pending = True def __init__(self, coro, *, loop=None, name=None, context=None): super().__init__(loop=loop) # 省略部分初始化代碼 ... # 托管的coroutine self._coro = coro if context is None: self._context = contextvars.copy_context() else: self._context = context # 通過(guò)loop.call_sonn,在Task初始化后馬上就通知事件循環(huán)在下次有空的時(shí)候執(zhí)行自己的__step函數(shù) self._loop.call_soon(self.__step, context=self._context) def __step(self, exc=None): coro = self._coro # 方便asyncio自省 _enter_task(self._loop, self) # Call either coro.throw(exc) or coro.send(None). try: if exc is None: # 通過(guò)send預(yù)激托管的coroutine # 這時(shí)候只會(huì)得到coroutine yield回來(lái)的數(shù)據(jù)或者收到一個(gè)StopIteration的異常 # 對(duì)于Future或者Task返回的是Self result = coro.send(None) else: # 發(fā)送異常給coroutine result = coro.throw(exc) except StopIteration as exc: # StopIteration代表Coroutine運(yùn)行完畢 if self._must_cancel: # coroutine在停止之前被執(zhí)行了取消操作,則需要顯示的執(zhí)行取消操作 self._must_cancel = False super().cancel(msg=self._cancel_message) else: # 把運(yùn)行完畢的值發(fā)送到結(jié)果值中 super().set_result(exc.value) # 省略其它異常封裝 ... else: # 如果沒(méi)有異常拋出 blocking = getattr(result, '_asyncio_future_blocking', None) if blocking is not None: # 通過(guò)Future代碼可以判斷,如果帶有_asyncio_future_blocking屬性,則代表當(dāng)前result是Future或者是Task # 意味著這個(gè)Task里面裹著另外一個(gè)的Future或者Task # 省略Future判斷 ... if blocking: # 代表這這個(gè)Future或者Task處于卡住的狀態(tài), # 此時(shí)的Task放棄了自己對(duì)事件循環(huán)的控制權(quán),等待這個(gè)卡住的Future或者Task執(zhí)行完成時(shí)喚醒一下自己 result._asyncio_future_blocking = False result.add_done_callback(self.__wakeup, context=self._context) self._fut_waiter = result if self._must_cancel: if self._fut_waiter.cancel(msg=self._cancel_message): self._must_cancel = False else: # 不能被await兩次 new_exc = RuntimeError( f'yield was used instead of yield from ' f'in task {self!r} with {result!r}') self._loop.call_soon( self.__step, new_exc, context=self._context) elif result is None: # 放棄了對(duì)事件循環(huán)的控制權(quán),代表自己托管的coroutine可能有個(gè)coroutine在運(yùn)行,接下來(lái)會(huì)把控制權(quán)交給他和事件循環(huán) # 當(dāng)前的coroutine里面即使沒(méi)有Future或者Task,但是子Future可能有 self._loop.call_soon(self.__step, context=self._context) finally: _leave_task(self._loop, self) self = None # Needed to break cycles when an exception occurs. def __wakeup(self, future): # 其它Task和Future完成后會(huì)調(diào)用到該函數(shù),接下來(lái)進(jìn)行一些處理 try: # 回收Future的狀態(tài),如果Future發(fā)生了異常,則把異常傳回給自己 future.result() except BaseException as exc: # This may also be a cancellation. self.__step(exc) else: # Task并不需要自己托管的Future的結(jié)果值,而且如下注釋,這樣能使調(diào)度變得更快 # Don't pass the value of `future.result()` explicitly, # as `Future.__iter__` and `Future.__await__` don't need it. # If we call `_step(value, None)` instead of `_step()`, # Python eval loop would use `.send(value)` method call, # instead of `__next__()`, which is slower for futures # that return non-generator iterators from their `__iter__`. self.__step() self = None # Needed to break cycles when an exception occurs.
這份源碼的Task對(duì)象中的__setp
方法比較長(zhǎng),經(jīng)過(guò)精簡(jiǎn)后可以發(fā)現(xiàn)他主要做的工作有三個(gè):
- 1.通過(guò)
send
或者throw
來(lái)驅(qū)動(dòng)Coroutine進(jìn)行下一步 - 2.通過(guò)給被自己托管的Future或者Task添加回調(diào)來(lái)獲得完成的通知并重新獲取控制權(quán)
- 3.通過(guò)
loop.call_soon
來(lái)讓步,把控制權(quán)交給事件循環(huán)
單通過(guò)源碼分析可能很難明白, 以下是以兩種Coroutine
的代碼為例子,簡(jiǎn)單的闡述Task與事件循環(huán)調(diào)度的過(guò)程,首先是demo_coro
,這個(gè)例子中只有一個(gè)Task:
# demo_coro.py import asyncio import time async def main(): await asyncio.sleep(1) await asyncio.sleep(2) s_t = time.time() asyncio.run(main()) print(time.time() - s_t) # // Output: 3.0028765201568604
這個(gè)例子中第一步是把main
轉(zhuǎn)為一個(gè)Task,然后調(diào)用到了對(duì)應(yīng)的__step
方法,這時(shí)候__step
方法會(huì)會(huì)調(diào)用main()
這個(gè)Coroutine的send(None)
方法。
之后整個(gè)程序的邏輯會(huì)直接轉(zhuǎn)到main
函數(shù)中的await asyncio.sleep(1)
這個(gè)Coroutine中,await asyncio.sleep(1)
會(huì)先生成一個(gè)Future對(duì)象,并通過(guò)loop.call_at
告訴事件循環(huán)在1秒后激活這個(gè)Future對(duì)象,然后把對(duì)象返回。這時(shí)候邏輯會(huì)重新回到Task的__step
方法中,__step
發(fā)現(xiàn)send
調(diào)用得到的是一個(gè)Future對(duì)象,所以就在這個(gè)Future添加一個(gè)回調(diào),讓Future完成的時(shí)候來(lái)激活自己,然后放棄了對(duì)事件循環(huán)的控制權(quán)。接著就是事件循環(huán)在一秒后激活了這個(gè)Future對(duì)象,這時(shí)程序邏輯就會(huì)執(zhí)行到Future的回調(diào),也就是Task的__wakeup
方法,于是Task的__step
又被調(diào)用到了,而這次遇到的是后面的await asyncio.sleep(2)
,于是又走了一遍上面的流程。當(dāng)兩個(gè)asyncio.sleep
都執(zhí)行完成后,Task的__step
方法里在對(duì)Coroutine發(fā)送一個(gè)send(None)
后就捕獲到了StopIteration
異常,這時(shí)候Task就會(huì)通過(guò)set_result
設(shè)置結(jié)果,并結(jié)束自己的調(diào)度流程。
可以看到demo_core.py
中只有一個(gè)Task在負(fù)責(zé)和事件循環(huán)一起調(diào)度,事件循環(huán)的開(kāi)始一定是一個(gè)Task,并通過(guò)Task來(lái)調(diào)起一個(gè)Coroutine,通過(guò)__step
方法把后續(xù)的Future,Task,Coroutine都當(dāng)成一條鏈來(lái)運(yùn)行,而demo_task.py
則不一樣了,它有兩個(gè)Task,代碼如下:
# demo_task.py import asyncio import time async def main(): task_1 = asyncio.create_task(asyncio.sleep(1)) task_2 = asyncio.create_task(asyncio.sleep(2)) await task_1 await task_2 s_t = time.time() asyncio.run(main()) print(time.time() - s_t) # // Output: 2.0027475357055664
這個(gè)例子中第一步還是跟demo_coro
一樣,但跳轉(zhuǎn)到main
函數(shù)后就開(kāi)始有區(qū)別了,首先在這函數(shù)中創(chuàng)建了task1和task2兩個(gè)Task,他們分別都會(huì)通過(guò)__step
方法中的send
激活對(duì)應(yīng)的asyncio.sleep
Coroutine,然后等待對(duì)應(yīng)的Future來(lái)通知自己已經(jīng)完成了。而對(duì)于創(chuàng)建了這兩個(gè)Task的main Task來(lái)說(shuō),通過(guò)main
函數(shù)的awati task_1
和await task_2
來(lái)獲取到他們的“控制權(quán)“。首先是通過(guò)await task_1
語(yǔ)句,main Task中的__step
方法里在調(diào)用send
后得到的是task_1對(duì)應(yīng)的Future,這時(shí)候就可以為這個(gè)Future添加一個(gè)回調(diào),讓他完成時(shí)通知自己,自己再走下一步,對(duì)于task_2也是如此。 直到最后兩個(gè)task都執(zhí)行完成,main Task也捕獲到了StopIteration
異常,通過(guò)set_result
設(shè)置結(jié)果,并結(jié)束自己的調(diào)度流程。
可以看到demo_task.py
與demo_coro.py
有個(gè)明顯的區(qū)別在于main Task在運(yùn)行的生命周期中創(chuàng)建了兩個(gè)Task,并通過(guò)await
托管了兩個(gè)Task,同時(shí)兩個(gè)Task又能實(shí)現(xiàn)兩個(gè)協(xié)程的并發(fā),所以可以發(fā)現(xiàn)事件循環(huán)運(yùn)行期間,當(dāng)前協(xié)程的并發(fā)數(shù)永遠(yuǎn)小于事件循環(huán)中注冊(cè)的Task數(shù)量。此外,如果在main Task中如果沒(méi)有顯式的進(jìn)行await
,那么子Task就會(huì)逃逸,不受main Task管理,如下:
# demo_task.py import asyncio import time def mutli_task(): task_1 = asyncio.create_task(asyncio.sleep(1)) task_2 = asyncio.create_task(asyncio.sleep(2)) async def main(): mutli_task() await asyncio.sleep(1.5) s_t = time.time() asyncio.run(main()) print(time.time() - s_t) # // Output: 1.5027475357055664
在這段代碼中,main Task在執(zhí)行到mutli_task
時(shí),會(huì)創(chuàng)建出兩個(gè)task,但是在__step
中的coro.send(None)
調(diào)用得到的結(jié)果卻是await asyncio.sleep(1.5)
返回的Future,所以main Task只能調(diào)用到這個(gè)Future的add_don_callback
來(lái)裝載自己的__wakeup
方法,最終導(dǎo)致到main Task只能托管到await asyncio.sleep(1.5)
的Future,而mutli_task
創(chuàng)建的task則逃逸了,成為另一條鏈的頂點(diǎn)Task。
不過(guò)這個(gè)程序的事件循環(huán)只管理到了main Task
所以事件循環(huán)會(huì)一直運(yùn)行,直到main Task
運(yùn)行結(jié)束的時(shí)候才退出,這時(shí)程序會(huì)跟著一起退出,所以程序的運(yùn)行時(shí)間只有1.5秒左右。
此外由于另外的Task也是注冊(cè)到這個(gè)事件循環(huán)上面,所以事件循環(huán)會(huì)幫忙把task_1執(zhí)行完畢,而task_2定義的休眠時(shí)間是2秒,程序退出之前事件循環(huán)會(huì)發(fā)現(xiàn)有個(gè)Task尚未執(zhí)行完畢,于是會(huì)對(duì)這個(gè)Task進(jìn)行清理并打印一條警報(bào)。
4.總結(jié)
在深入了Task,F(xiàn)uture的源碼了解后,了解了Task和Future在Asyncio
的作用,同時(shí)也發(fā)現(xiàn)Task和Future都跟loop有一定的耦合,而loop也可以通過(guò)一定的方法來(lái)創(chuàng)建Task和Future,所以如果要真正的理解到Asyncio
的調(diào)度原理,還需要更進(jìn)入一步,通過(guò)Asyncio
的源碼來(lái)了解整個(gè)Asyncio
的設(shè)計(jì)。
到此這篇關(guān)于Python Asyncio中Coroutines,Tasks,Future可等待對(duì)象的關(guān)系及作用的文章就介紹到這了,更多相關(guān)Python Asyncio 內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
pandas刪除部分?jǐn)?shù)據(jù)后重新生成索引的實(shí)現(xiàn)
這篇文章主要介紹了pandas刪除部分?jǐn)?shù)據(jù)后重新生成索引的實(shí)現(xiàn)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-07-07使用Selenium在Python中實(shí)現(xiàn)錄屏功能
Selenium 是一個(gè)強(qiáng)大的用于自動(dòng)化測(cè)試的工具,但你知道它也可以用來(lái)錄制瀏覽器操作的視頻嗎?本文將介紹如何使用 Selenium 在 Python 中實(shí)現(xiàn)錄屏功能,以便記錄和分享你的網(wǎng)頁(yè)操作過(guò)程,需要的朋友可以參考下2023-11-11運(yùn)行獨(dú)立 pyspark 時(shí)出現(xiàn) Windows 錯(cuò)誤解決辦法
在本篇文章里小編給大家分享的是一篇關(guān)于運(yùn)行獨(dú)立 pyspark 時(shí)出現(xiàn) Windows 錯(cuò)誤解決辦法,對(duì)此有需求的方法可以參考下。2021-12-12python定位xpath 節(jié)點(diǎn)位置的方法
今天小編就為大家分享一篇python定位xpath 節(jié)點(diǎn)位置的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-08-08python 輸出列表元素實(shí)例(以空格/逗號(hào)為分隔符)
今天小編就為大家分享一篇python 輸出列表元素實(shí)例(以空格/逗號(hào)為分隔符),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-12-12在Python中使用Mako模版庫(kù)的簡(jiǎn)單教程
這篇文章主要介紹了在Python中使用Mako模版庫(kù)的簡(jiǎn)單教程,包括在Django或者Tornado框架中集成Mako的方法,需要的朋友可以參考下2015-04-04tensorflow中tf.reduce_mean函數(shù)的使用
這篇文章主要介紹了tensorflow中tf.reduce_mean函數(shù)的使用,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-04-04