Python Coroutine池化的實現(xiàn)詳解
池化介紹
在當今計算機科學和軟件工程的領域中,池化技術如線程池、連接池和對象池等已經(jīng)成為優(yōu)化資源利用率和提高軟件性能的重要工具。然而,在 Python 的協(xié)程領域,我們卻很少見到類似于 ThreadPoolExecutor 的 CoroutinePoolExecutor。為什么會這樣呢?
首先,Python Coroutine 的特性使得池化技術在協(xié)程中的應用相對較少。與像 Golang 這樣支持有棧協(xié)程的語言不同,Python Coroutine 是無棧的,無法跨核執(zhí)行,從而限制了協(xié)程池發(fā)揮多核優(yōu)勢的可能性。
其次,Python Coroutine 的輕量級和快速創(chuàng)建銷毀的特性,使得頻繁創(chuàng)建和銷毀協(xié)程并不會帶來顯著的性能損耗。這也解釋了為什么 Python 官方一直沒有引入 CoroutinePoolExecutor。
然而,作為開發(fā)者,我們?nèi)匀豢梢栽谔囟▓鼍跋驴紤]協(xié)程的池化。雖然 Python Coroutine 輕量,但在一些需要大量協(xié)程協(xié)同工作的應用中,池化技術能夠提供更方便、統(tǒng)一的調(diào)度子協(xié)程的方式。尤其是在涉及到異步操作的同時需要控制并發(fā)數(shù)量時,協(xié)程池的優(yōu)勢就顯而易見了。
關于 Python 官方是否會在未來引入類似于 TaskGroup 的 CoroutinePoolExecutor,這或許是一個懸而未決的問題??紤]到 Python 在異步編程方面的快速發(fā)展,我們不能排除未來可能性的存在。或許有一天,我們會看到 TaskGroup 引入一個 max_workers 的形參,以更好地支持對協(xié)程池的需求。
在實際開發(fā)中,我們也可以嘗試編寫自己的 CoroutinePoolExecutor,以滿足特定業(yè)務場景的需求。通過合理的設計架構(gòu)和對數(shù)據(jù)流的全局考慮,我們可以最大程度地發(fā)揮協(xié)程池的優(yōu)勢,提高系統(tǒng)的性能和響應速度。
在接下來的文章中,我們將探討如何設計和實現(xiàn)一個簡單的 CoroutinePoolExecutor,以及在實際項目中的應用場景。通過深入理解協(xié)程池的工作原理,我們或許能更好地利用這一技術,使我們的異步應用更為高效。
如何開始編寫
如何開始編寫 CoroutinePoolExecutor,首先我們要明確出其適用范疇、考慮到使用方式和其潛在的風險點:
- 它并不適用于 Mult Thread + Mult Event Loop 的場景,因此它并非線程安全的。
- 應當保持和 ThreadPoolExecutor 相同的調(diào)用方式。
- 不同于 Mult Thread 中子線程不依賴于主線程的運行,而在 Mult Coroutine 中子協(xié)程必須依賴于主協(xié)程,因此主協(xié)程在子協(xié)程沒有全部運行完畢之前不能直接 done 掉。這也解釋了為什么 TaskGroup 官方實現(xiàn)中沒有提供類似于 shutdown 之類的方法,而是只提供上下文管理的運行方式。
有了上述 3 點的考量,我們決定將 ThreadPoolExecutor 平替成 CoroutinePoolExecutor。這樣的好處在于,作為學習者一方面可以了解 ThreadPoolExecutor 的內(nèi)部實現(xiàn)機制,另一方面站在巨人肩膀上的編程借鑒往往會事半功倍,對于自我的提升也是較為明顯的。
在考慮這些因素的同時,我們將繼續(xù)深入研究協(xié)程池的設計和實現(xiàn)。通過對適用范圍和使用方式的明確,我們能更好地把握 CoroutinePoolExecutor 的潛在優(yōu)勢,為異步應用的性能提升做出更有針對性的貢獻。
具體代碼實現(xiàn)
在這里我先貼出完整的代碼實現(xiàn),其中著重點已經(jīng)用注釋標明。
以下是 CoroutinePoolExecutor 的代碼實現(xiàn):
import os
import asyncio
import weakref
import logging
import itertools
async def _worker(executor_reference: "CoroutinePoolExecutor", work_queue: asyncio.Queue):
try:
while True:
work_item = await work_queue.get()
if work_item is not None:
await work_item.run()
del work_item
executor = executor_reference()
if executor is not None:
# Notify available coroutines
executor._idle_semaphore.release()
del executor
continue
# Notifies the next coroutine task that it is time to exit
await work_queue.put(None)
break
except Exception as exc:
logging.critical('Exception in worker', exc_info=True)
class _WorkItem:
def __init__(self, future, coro):
self.future = future
self.coro = coro
async def run(self):
try:
result = await self.coro
except Exception as exc:
self.future.set_exception(exc)
else:
self.future.set_result(result)
class CoroutinePoolExecutor:
"""
Coroutine pool implemented based on ThreadPoolExecutor
Different from ThreadPoolExecutor, because the running of sub-coroutine depends on the main coroutine
So you must use the shutdown method to wait for all subtasks and wait for them to complete execution
"""
# Used to assign unique thread names when coroutine_name_prefix is not supplied.
_counter = itertools.count().__next__
def __init__(self, max_workers, coroutine_name_prefix=""):
if max_workers is None:
max_workers = min(32, (os.cpu_count() or 1) + 4)
if max_workers <= 0:
raise ValueError("max_workers must be greater than 0")
self._max_workers = max_workers
self._work_queue = asyncio.Queue()
self._idle_semaphore = asyncio.Semaphore(0)
self._coroutines = set()
self._shutdown = False
self._shutdown_lock = asyncio.Lock()
self._coroutine_name_prefix = (coroutine_name_prefix or (
f"{__class__.__name__}-{self._counter()}"
))
async def submit(self, coro):
async with self._shutdown_lock:
# When the executor is closed, new coroutine tasks should be rejected, otherwise it will cause the problem that the newly added tasks cannot be executed.
# This is because after shutdown, all sub-coroutines will end their work
# one after another. Even if there are new coroutine tasks, they will not
# be reactivated.
if self._shutdown:
raise RuntimeError('cannot schedule new coroutine task after shutdown')
f = asyncio.Future()
w = _WorkItem(
f,
coro
)
await self._work_queue.put(w)
await self._adjust_coroutine_count()
return f
async def _adjust_coroutine_count(self):
try:
# 2 functions:
# - When there is an idle coroutine and the semaphore is not 0, there is no need to create a new sub-coroutine.
# - Prevent exceptions from modifying self._coroutines members when the for loop self._coroutines and await task in shutdown are modified
# Since the Semaphore provided by asyncio does not have a timeout
# parameter, you can choose to use it with wait_for.
if await asyncio.wait_for(
self._idle_semaphore.acquire(),
0
):
return
except TimeoutError:
pass
num_coroutines = len(self._coroutines)
if num_coroutines < self._max_workers:
coroutine_name = f"{self._coroutine_name_prefix or self}_{num_coroutines}"
t = asyncio.create_task(
coro=_worker(
weakref.ref(self),
self._work_queue
),
name=coroutine_name
)
self._coroutines.add(t)
async def shutdown(self, wait=True, *, cancel_futures=False):
async with self._shutdown_lock:
self._shutdown = True
if cancel_futures:
while True:
try:
work_item = self._work_queue.get_nowait()
except asyncio.QueueEmpty:
break
if work_item is not None:
work_item.future.cancel()
# None is an exit signal, given by the shutdown method, when the shutdown method is called
# will notify the sub-coroutine to stop working and exit the loop
await self._work_queue.put(None)
if wait:
for t in self._coroutines:
await t
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.shutdown(wait=True)
return False
以下是 CoroutinePoolExecutor 的使用方式:
import asyncio
from coroutinepoolexecutor import CoroutinePoolExecutor
async def task(i):
await asyncio.sleep(1)
print(f"task-{i}")
async def main():
async with CoroutinePoolExecutor(2) as executor:
for i in range(10):
await executor.submit(task(i))
if __name__ == "__main__":
asyncio.run(main())
我們知道,在線程池中,工作線程一旦創(chuàng)建會不斷的領取新的任務并執(zhí)行,除開 shutdown() 調(diào)用,否則對于靜態(tài)的線程池來講工作線程不會自己結(jié)束。
在上述協(xié)程池代碼實現(xiàn)中,CoroutinePoolExecutor 類包含了主要的對外調(diào)用功能的接口、內(nèi)部提供了存儲 task 的 Queue、工作協(xié)程自動生成 name 的計數(shù)器、保障協(xié)程的信號量鎖等等。
而 _worker 函數(shù)是工作協(xié)程的運行函數(shù),其會在工作協(xié)程啟動后,不斷的從 CoroutinePoolExecutor 的 Queue 中得到 _WorkItem 并由 _WorkItem 具體執(zhí)行 coro task。
剩下的 _WorkItem 是一個 future 對象與 coro task 的封裝器,其功能是解耦 future 對象和 coro task、并在 coro task 運行時和運行后設置 future 的結(jié)果。
對于異步循環(huán)的思考
在此 CoroutinePoolExecutor 實現(xiàn)后,我其實又有了一個新的思考。Python 的 EventLoop 相較于 Node.js 的 EventLoop 來說其實更加的底層,它有感的暴露了出來。
具體體現(xiàn)在當 Python Event Loop 啟動后,如果 main coroutine 停止運行,那么所有的 subtask coroutine 也會停止運行,尤其是對于一些需要清理資源的操作、如 aiohttp 的 close session、CoroutinePoolExecutor 的 shutdown 等都會在某些情況顯得無措,說的更具體點就是不知道在什么時候調(diào)用。
對于這些問題,我們可以繼承 BaseEventLoop 自己手動對 EventLoop 的功能進行擴展,如在事件循環(huán)關閉之前添加 hook function,甚至可以限制整個 EventLoop 的 max_workers 或者做成動態(tài)的可調(diào)節(jié) coroutine 數(shù)量的 EventLoop 都行。
無論如何,只要心里有想法,就可以去將它實現(xiàn) .. 學習本身就是一個不斷挑戰(zhàn)的過程。
到此這篇關于Python Coroutine池化的實現(xiàn)詳解的文章就介紹到這了,更多相關Python Coroutine池化內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
一個基于flask的web應用誕生 用戶注冊功能開發(fā)(5)
一個基于flask的web應用誕生第五篇,這篇文章主要介紹了用戶注冊功能開發(fā),具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-04-04
python使用代理IP爬取貓眼電影專業(yè)評分數(shù)據(jù)
在編寫爬蟲程序的過程中,IP封鎖無疑是一個常見且棘手的問題,盡管網(wǎng)絡上存在大量的免費IP代理網(wǎng)站,但其質(zhì)量往往參差不齊,令人堪憂,本篇文章中介紹一下如何使用Python的Requests庫和BeautifulSoup庫來抓取貓眼電影網(wǎng)站上的專業(yè)評分數(shù)據(jù),需要的朋友可以參考下2024-03-03
對python_discover方法遍歷所有執(zhí)行的用例詳解
今天小編就為大家分享一篇對python_discover方法遍歷所有執(zhí)行的用例詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-02-02
python數(shù)字圖像處理之圖像簡單濾波實現(xiàn)
這篇文章主要為大家介紹了python數(shù)字圖像處理之圖像簡單濾波實現(xiàn)示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-06-06

