Python Coroutine池化的實(shí)現(xiàn)詳解
池化介紹
在當(dāng)今計(jì)算機(jī)科學(xué)和軟件工程的領(lǐng)域中,池化技術(shù)如線程池、連接池和對(duì)象池等已經(jīng)成為優(yōu)化資源利用率和提高軟件性能的重要工具。然而,在 Python 的協(xié)程領(lǐng)域,我們卻很少見到類似于 ThreadPoolExecutor 的 CoroutinePoolExecutor。為什么會(huì)這樣呢?
首先,Python Coroutine 的特性使得池化技術(shù)在協(xié)程中的應(yīng)用相對(duì)較少。與像 Golang 這樣支持有棧協(xié)程的語(yǔ)言不同,Python Coroutine 是無(wú)棧的,無(wú)法跨核執(zhí)行,從而限制了協(xié)程池發(fā)揮多核優(yōu)勢(shì)的可能性。
其次,Python Coroutine 的輕量級(jí)和快速創(chuàng)建銷毀的特性,使得頻繁創(chuàng)建和銷毀協(xié)程并不會(huì)帶來(lái)顯著的性能損耗。這也解釋了為什么 Python 官方一直沒(méi)有引入 CoroutinePoolExecutor。
然而,作為開發(fā)者,我們?nèi)匀豢梢栽谔囟▓?chǎng)景下考慮協(xié)程的池化。雖然 Python Coroutine 輕量,但在一些需要大量協(xié)程協(xié)同工作的應(yīng)用中,池化技術(shù)能夠提供更方便、統(tǒng)一的調(diào)度子協(xié)程的方式。尤其是在涉及到異步操作的同時(shí)需要控制并發(fā)數(shù)量時(shí),協(xié)程池的優(yōu)勢(shì)就顯而易見了。
關(guān)于 Python 官方是否會(huì)在未來(lái)引入類似于 TaskGroup 的 CoroutinePoolExecutor,這或許是一個(gè)懸而未決的問(wèn)題??紤]到 Python 在異步編程方面的快速發(fā)展,我們不能排除未來(lái)可能性的存在?;蛟S有一天,我們會(huì)看到 TaskGroup 引入一個(gè) max_workers 的形參,以更好地支持對(duì)協(xié)程池的需求。
在實(shí)際開發(fā)中,我們也可以嘗試編寫自己的 CoroutinePoolExecutor,以滿足特定業(yè)務(wù)場(chǎng)景的需求。通過(guò)合理的設(shè)計(jì)架構(gòu)和對(duì)數(shù)據(jù)流的全局考慮,我們可以最大程度地發(fā)揮協(xié)程池的優(yōu)勢(shì),提高系統(tǒng)的性能和響應(yīng)速度。
在接下來(lái)的文章中,我們將探討如何設(shè)計(jì)和實(shí)現(xiàn)一個(gè)簡(jiǎn)單的 CoroutinePoolExecutor,以及在實(shí)際項(xiàng)目中的應(yīng)用場(chǎng)景。通過(guò)深入理解協(xié)程池的工作原理,我們或許能更好地利用這一技術(shù),使我們的異步應(yīng)用更為高效。
如何開始編寫
如何開始編寫 CoroutinePoolExecutor,首先我們要明確出其適用范疇、考慮到使用方式和其潛在的風(fēng)險(xiǎn)點(diǎn):
- 它并不適用于 Mult Thread + Mult Event Loop 的場(chǎng)景,因此它并非線程安全的。
- 應(yīng)當(dāng)保持和 ThreadPoolExecutor 相同的調(diào)用方式。
- 不同于 Mult Thread 中子線程不依賴于主線程的運(yùn)行,而在 Mult Coroutine 中子協(xié)程必須依賴于主協(xié)程,因此主協(xié)程在子協(xié)程沒(méi)有全部運(yùn)行完畢之前不能直接 done 掉。這也解釋了為什么 TaskGroup 官方實(shí)現(xiàn)中沒(méi)有提供類似于 shutdown 之類的方法,而是只提供上下文管理的運(yùn)行方式。
有了上述 3 點(diǎn)的考量,我們決定將 ThreadPoolExecutor 平替成 CoroutinePoolExecutor。這樣的好處在于,作為學(xué)習(xí)者一方面可以了解 ThreadPoolExecutor 的內(nèi)部實(shí)現(xiàn)機(jī)制,另一方面站在巨人肩膀上的編程借鑒往往會(huì)事半功倍,對(duì)于自我的提升也是較為明顯的。
在考慮這些因素的同時(shí),我們將繼續(xù)深入研究協(xié)程池的設(shè)計(jì)和實(shí)現(xiàn)。通過(guò)對(duì)適用范圍和使用方式的明確,我們能更好地把握 CoroutinePoolExecutor 的潛在優(yōu)勢(shì),為異步應(yīng)用的性能提升做出更有針對(duì)性的貢獻(xiàn)。
具體代碼實(shí)現(xiàn)
在這里我先貼出完整的代碼實(shí)現(xiàn),其中著重點(diǎn)已經(jīng)用注釋標(biāo)明。
以下是 CoroutinePoolExecutor 的代碼實(shí)現(xiàn):
import os import asyncio import weakref import logging import itertools async def _worker(executor_reference: "CoroutinePoolExecutor", work_queue: asyncio.Queue): try: while True: work_item = await work_queue.get() if work_item is not None: await work_item.run() del work_item executor = executor_reference() if executor is not None: # Notify available coroutines executor._idle_semaphore.release() del executor continue # Notifies the next coroutine task that it is time to exit await work_queue.put(None) break except Exception as exc: logging.critical('Exception in worker', exc_info=True) class _WorkItem: def __init__(self, future, coro): self.future = future self.coro = coro async def run(self): try: result = await self.coro except Exception as exc: self.future.set_exception(exc) else: self.future.set_result(result) class CoroutinePoolExecutor: """ Coroutine pool implemented based on ThreadPoolExecutor Different from ThreadPoolExecutor, because the running of sub-coroutine depends on the main coroutine So you must use the shutdown method to wait for all subtasks and wait for them to complete execution """ # Used to assign unique thread names when coroutine_name_prefix is not supplied. _counter = itertools.count().__next__ def __init__(self, max_workers, coroutine_name_prefix=""): if max_workers is None: max_workers = min(32, (os.cpu_count() or 1) + 4) if max_workers <= 0: raise ValueError("max_workers must be greater than 0") self._max_workers = max_workers self._work_queue = asyncio.Queue() self._idle_semaphore = asyncio.Semaphore(0) self._coroutines = set() self._shutdown = False self._shutdown_lock = asyncio.Lock() self._coroutine_name_prefix = (coroutine_name_prefix or ( f"{__class__.__name__}-{self._counter()}" )) async def submit(self, coro): async with self._shutdown_lock: # When the executor is closed, new coroutine tasks should be rejected, otherwise it will cause the problem that the newly added tasks cannot be executed. # This is because after shutdown, all sub-coroutines will end their work # one after another. Even if there are new coroutine tasks, they will not # be reactivated. if self._shutdown: raise RuntimeError('cannot schedule new coroutine task after shutdown') f = asyncio.Future() w = _WorkItem( f, coro ) await self._work_queue.put(w) await self._adjust_coroutine_count() return f async def _adjust_coroutine_count(self): try: # 2 functions: # - When there is an idle coroutine and the semaphore is not 0, there is no need to create a new sub-coroutine. # - Prevent exceptions from modifying self._coroutines members when the for loop self._coroutines and await task in shutdown are modified # Since the Semaphore provided by asyncio does not have a timeout # parameter, you can choose to use it with wait_for. if await asyncio.wait_for( self._idle_semaphore.acquire(), 0 ): return except TimeoutError: pass num_coroutines = len(self._coroutines) if num_coroutines < self._max_workers: coroutine_name = f"{self._coroutine_name_prefix or self}_{num_coroutines}" t = asyncio.create_task( coro=_worker( weakref.ref(self), self._work_queue ), name=coroutine_name ) self._coroutines.add(t) async def shutdown(self, wait=True, *, cancel_futures=False): async with self._shutdown_lock: self._shutdown = True if cancel_futures: while True: try: work_item = self._work_queue.get_nowait() except asyncio.QueueEmpty: break if work_item is not None: work_item.future.cancel() # None is an exit signal, given by the shutdown method, when the shutdown method is called # will notify the sub-coroutine to stop working and exit the loop await self._work_queue.put(None) if wait: for t in self._coroutines: await t async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.shutdown(wait=True) return False
以下是 CoroutinePoolExecutor 的使用方式:
import asyncio from coroutinepoolexecutor import CoroutinePoolExecutor async def task(i): await asyncio.sleep(1) print(f"task-{i}") async def main(): async with CoroutinePoolExecutor(2) as executor: for i in range(10): await executor.submit(task(i)) if __name__ == "__main__": asyncio.run(main())
我們知道,在線程池中,工作線程一旦創(chuàng)建會(huì)不斷的領(lǐng)取新的任務(wù)并執(zhí)行,除開 shutdown() 調(diào)用,否則對(duì)于靜態(tài)的線程池來(lái)講工作線程不會(huì)自己結(jié)束。
在上述協(xié)程池代碼實(shí)現(xiàn)中,CoroutinePoolExecutor 類包含了主要的對(duì)外調(diào)用功能的接口、內(nèi)部提供了存儲(chǔ) task 的 Queue、工作協(xié)程自動(dòng)生成 name 的計(jì)數(shù)器、保障協(xié)程的信號(hào)量鎖等等。
而 _worker 函數(shù)是工作協(xié)程的運(yùn)行函數(shù),其會(huì)在工作協(xié)程啟動(dòng)后,不斷的從 CoroutinePoolExecutor 的 Queue 中得到 _WorkItem 并由 _WorkItem 具體執(zhí)行 coro task。
剩下的 _WorkItem 是一個(gè) future 對(duì)象與 coro task 的封裝器,其功能是解耦 future 對(duì)象和 coro task、并在 coro task 運(yùn)行時(shí)和運(yùn)行后設(shè)置 future 的結(jié)果。
對(duì)于異步循環(huán)的思考
在此 CoroutinePoolExecutor 實(shí)現(xiàn)后,我其實(shí)又有了一個(gè)新的思考。Python 的 EventLoop 相較于 Node.js 的 EventLoop 來(lái)說(shuō)其實(shí)更加的底層,它有感的暴露了出來(lái)。
具體體現(xiàn)在當(dāng) Python Event Loop 啟動(dòng)后,如果 main coroutine 停止運(yùn)行,那么所有的 subtask coroutine 也會(huì)停止運(yùn)行,尤其是對(duì)于一些需要清理資源的操作、如 aiohttp 的 close session、CoroutinePoolExecutor 的 shutdown 等都會(huì)在某些情況顯得無(wú)措,說(shuō)的更具體點(diǎn)就是不知道在什么時(shí)候調(diào)用。
對(duì)于這些問(wèn)題,我們可以繼承 BaseEventLoop 自己手動(dòng)對(duì) EventLoop 的功能進(jìn)行擴(kuò)展,如在事件循環(huán)關(guān)閉之前添加 hook function,甚至可以限制整個(gè) EventLoop 的 max_workers 或者做成動(dòng)態(tài)的可調(diào)節(jié) coroutine 數(shù)量的 EventLoop 都行。
無(wú)論如何,只要心里有想法,就可以去將它實(shí)現(xiàn) .. 學(xué)習(xí)本身就是一個(gè)不斷挑戰(zhàn)的過(guò)程。
到此這篇關(guān)于Python Coroutine池化的實(shí)現(xiàn)詳解的文章就介紹到這了,更多相關(guān)Python Coroutine池化內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Zabbix實(shí)現(xiàn)微信報(bào)警功能
這篇文章主要介紹了Zabbix實(shí)現(xiàn)微信報(bào)警的相關(guān)資料,本文圖文并茂介紹的非常詳細(xì),需要的朋友可以參考下2016-10-10一個(gè)基于flask的web應(yīng)用誕生 用戶注冊(cè)功能開發(fā)(5)
一個(gè)基于flask的web應(yīng)用誕生第五篇,這篇文章主要介紹了用戶注冊(cè)功能開發(fā),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-04-04python使用代理IP爬取貓眼電影專業(yè)評(píng)分?jǐn)?shù)據(jù)
在編寫爬蟲程序的過(guò)程中,IP封鎖無(wú)疑是一個(gè)常見且棘手的問(wèn)題,盡管網(wǎng)絡(luò)上存在大量的免費(fèi)IP代理網(wǎng)站,但其質(zhì)量往往參差不齊,令人堪憂,本篇文章中介紹一下如何使用Python的Requests庫(kù)和BeautifulSoup庫(kù)來(lái)抓取貓眼電影網(wǎng)站上的專業(yè)評(píng)分?jǐn)?shù)據(jù),需要的朋友可以參考下2024-03-03在Python中處理日期和時(shí)間的基本知識(shí)點(diǎn)整理匯總
這篇文章主要介紹了在Python中處理日期和時(shí)間的基本知識(shí)點(diǎn)整理匯總,是Python入門學(xué)習(xí)中的基礎(chǔ)知識(shí),需要的朋友可以參考下2015-05-05對(duì)python_discover方法遍歷所有執(zhí)行的用例詳解
今天小編就為大家分享一篇對(duì)python_discover方法遍歷所有執(zhí)行的用例詳解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-02-02Python編程實(shí)現(xiàn)簡(jiǎn)易的音樂(lè)播放器基本操作
這篇文章主要來(lái)教大家利用Python編程來(lái)實(shí)現(xiàn)一個(gè)簡(jiǎn)易的音樂(lè)播放器,文中含有基本功能的操作示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助2021-10-10python數(shù)字圖像處理之圖像簡(jiǎn)單濾波實(shí)現(xiàn)
這篇文章主要為大家介紹了python數(shù)字圖像處理之圖像簡(jiǎn)單濾波實(shí)現(xiàn)示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-06-06