欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python Coroutine池化的實(shí)現(xiàn)詳解

 更新時(shí)間:2024年01月29日 13:55:28   作者:.Hanabi  
在當(dāng)今計(jì)算機(jī)科學(xué)和軟件工程的領(lǐng)域中,池化技術(shù)如線程池、連接池和對(duì)象池等已經(jīng)成為優(yōu)化資源利用率和提高軟件性能的重要工具,所以下面我們就來(lái)看看Coroutine池化的具體實(shí)現(xiàn)吧

池化介紹

在當(dāng)今計(jì)算機(jī)科學(xué)和軟件工程的領(lǐng)域中,池化技術(shù)如線程池、連接池和對(duì)象池等已經(jīng)成為優(yōu)化資源利用率和提高軟件性能的重要工具。然而,在 Python 的協(xié)程領(lǐng)域,我們卻很少見到類似于 ThreadPoolExecutor 的 CoroutinePoolExecutor。為什么會(huì)這樣呢?

首先,Python Coroutine 的特性使得池化技術(shù)在協(xié)程中的應(yīng)用相對(duì)較少。與像 Golang 這樣支持有棧協(xié)程的語(yǔ)言不同,Python Coroutine 是無(wú)棧的,無(wú)法跨核執(zhí)行,從而限制了協(xié)程池發(fā)揮多核優(yōu)勢(shì)的可能性。

其次,Python Coroutine 的輕量級(jí)和快速創(chuàng)建銷毀的特性,使得頻繁創(chuàng)建和銷毀協(xié)程并不會(huì)帶來(lái)顯著的性能損耗。這也解釋了為什么 Python 官方一直沒(méi)有引入 CoroutinePoolExecutor。

然而,作為開發(fā)者,我們?nèi)匀豢梢栽谔囟▓?chǎng)景下考慮協(xié)程的池化。雖然 Python Coroutine 輕量,但在一些需要大量協(xié)程協(xié)同工作的應(yīng)用中,池化技術(shù)能夠提供更方便、統(tǒng)一的調(diào)度子協(xié)程的方式。尤其是在涉及到異步操作的同時(shí)需要控制并發(fā)數(shù)量時(shí),協(xié)程池的優(yōu)勢(shì)就顯而易見了。

關(guān)于 Python 官方是否會(huì)在未來(lái)引入類似于 TaskGroup 的 CoroutinePoolExecutor,這或許是一個(gè)懸而未決的問(wèn)題??紤]到 Python 在異步編程方面的快速發(fā)展,我們不能排除未來(lái)可能性的存在?;蛟S有一天,我們會(huì)看到 TaskGroup 引入一個(gè) max_workers 的形參,以更好地支持對(duì)協(xié)程池的需求。

在實(shí)際開發(fā)中,我們也可以嘗試編寫自己的 CoroutinePoolExecutor,以滿足特定業(yè)務(wù)場(chǎng)景的需求。通過(guò)合理的設(shè)計(jì)架構(gòu)和對(duì)數(shù)據(jù)流的全局考慮,我們可以最大程度地發(fā)揮協(xié)程池的優(yōu)勢(shì),提高系統(tǒng)的性能和響應(yīng)速度。

在接下來(lái)的文章中,我們將探討如何設(shè)計(jì)和實(shí)現(xiàn)一個(gè)簡(jiǎn)單的 CoroutinePoolExecutor,以及在實(shí)際項(xiàng)目中的應(yīng)用場(chǎng)景。通過(guò)深入理解協(xié)程池的工作原理,我們或許能更好地利用這一技術(shù),使我們的異步應(yīng)用更為高效。

如何開始編寫

如何開始編寫 CoroutinePoolExecutor,首先我們要明確出其適用范疇、考慮到使用方式和其潛在的風(fēng)險(xiǎn)點(diǎn):

  • 它并不適用于 Mult Thread + Mult Event Loop 的場(chǎng)景,因此它并非線程安全的。
  • 應(yīng)當(dāng)保持和 ThreadPoolExecutor 相同的調(diào)用方式。
  • 不同于 Mult Thread 中子線程不依賴于主線程的運(yùn)行,而在 Mult Coroutine 中子協(xié)程必須依賴于主協(xié)程,因此主協(xié)程在子協(xié)程沒(méi)有全部運(yùn)行完畢之前不能直接 done 掉。這也解釋了為什么 TaskGroup 官方實(shí)現(xiàn)中沒(méi)有提供類似于 shutdown 之類的方法,而是只提供上下文管理的運(yùn)行方式。

有了上述 3 點(diǎn)的考量,我們決定將 ThreadPoolExecutor 平替成 CoroutinePoolExecutor。這樣的好處在于,作為學(xué)習(xí)者一方面可以了解 ThreadPoolExecutor 的內(nèi)部實(shí)現(xiàn)機(jī)制,另一方面站在巨人肩膀上的編程借鑒往往會(huì)事半功倍,對(duì)于自我的提升也是較為明顯的。

在考慮這些因素的同時(shí),我們將繼續(xù)深入研究協(xié)程池的設(shè)計(jì)和實(shí)現(xiàn)。通過(guò)對(duì)適用范圍和使用方式的明確,我們能更好地把握 CoroutinePoolExecutor 的潛在優(yōu)勢(shì),為異步應(yīng)用的性能提升做出更有針對(duì)性的貢獻(xiàn)。

具體代碼實(shí)現(xiàn)

在這里我先貼出完整的代碼實(shí)現(xiàn),其中著重點(diǎn)已經(jīng)用注釋標(biāo)明。

以下是 CoroutinePoolExecutor 的代碼實(shí)現(xiàn):

import os
import asyncio
import weakref
import logging
import itertools


async def _worker(executor_reference: "CoroutinePoolExecutor", work_queue: asyncio.Queue):
    try:
        while True:
            work_item = await work_queue.get()

            if work_item is not None:
                await work_item.run()
                del work_item

                executor = executor_reference()
                if executor is not None:
                    # Notify available coroutines
                    executor._idle_semaphore.release()
                del executor
                continue

            # Notifies the next coroutine task that it is time to exit
            await work_queue.put(None)
            break

    except Exception as exc:
        logging.critical('Exception in worker', exc_info=True)


class _WorkItem:
    def __init__(self, future, coro):
        self.future = future
        self.coro = coro

    async def run(self):
        try:
            result = await self.coro
        except Exception as exc:
            self.future.set_exception(exc)
        else:
            self.future.set_result(result)


class CoroutinePoolExecutor:
    """
    Coroutine pool implemented based on ThreadPoolExecutor
    Different from ThreadPoolExecutor, because the running of sub-coroutine depends on the main coroutine
    So you must use the shutdown method to wait for all subtasks and wait for them to complete execution
    """

    # Used to assign unique thread names when coroutine_name_prefix is not supplied.
    _counter = itertools.count().__next__

    def __init__(self, max_workers, coroutine_name_prefix=""):

        if max_workers is None:
            max_workers = min(32, (os.cpu_count() or 1) + 4)
        if max_workers <= 0:
            raise ValueError("max_workers must be greater than 0")

        self._max_workers = max_workers
        self._work_queue = asyncio.Queue()
        self._idle_semaphore = asyncio.Semaphore(0)
        self._coroutines = set()
        self._shutdown = False
        self._shutdown_lock = asyncio.Lock()
        self._coroutine_name_prefix = (coroutine_name_prefix or (
            f"{__class__.__name__}-{self._counter()}"
        ))

    async def submit(self, coro):
        async with self._shutdown_lock:
            # When the executor is closed, new coroutine tasks should be rejected, otherwise it will cause the problem that the newly added tasks cannot be executed.
            # This is because after shutdown, all sub-coroutines will end their work
            # one after another. Even if there are new coroutine tasks, they will not
            # be reactivated.
            if self._shutdown:
                raise RuntimeError('cannot schedule new coroutine task after shutdown')

            f = asyncio.Future()
            w = _WorkItem(
                f,
                coro
            )
            await self._work_queue.put(w)
            await self._adjust_coroutine_count()
            return f

    async def _adjust_coroutine_count(self):

        try:
            # 2 functions:
            # - When there is an idle coroutine and the semaphore is not 0, there is no need to create a new sub-coroutine.
            # - Prevent exceptions from modifying self._coroutines members when the for loop self._coroutines and await task in shutdown are modified
            # Since the Semaphore provided by asyncio does not have a timeout
            # parameter, you can choose to use it with wait_for.
            if await asyncio.wait_for(
                    self._idle_semaphore.acquire(),
                    0
            ):
                return
        except TimeoutError:
            pass

        num_coroutines = len(self._coroutines)
        if num_coroutines < self._max_workers:
            coroutine_name = f"{self._coroutine_name_prefix or self}_{num_coroutines}"
            t = asyncio.create_task(
                coro=_worker(
                    weakref.ref(self),
                    self._work_queue
                ),
                name=coroutine_name
            )

            self._coroutines.add(t)

    async def shutdown(self, wait=True, *, cancel_futures=False):
        async with self._shutdown_lock:
            self._shutdown = True

            if cancel_futures:
                while True:
                    try:
                        work_item = self._work_queue.get_nowait()
                    except asyncio.QueueEmpty:
                        break
                    if work_item is not None:
                        work_item.future.cancel()

            # None is an exit signal, given by the shutdown method, when the shutdown method is called
            # will notify the sub-coroutine to stop working and exit the loop
            await self._work_queue.put(None)

        if wait:
            for t in self._coroutines:
                await t

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.shutdown(wait=True)
        return False

以下是 CoroutinePoolExecutor 的使用方式:

import asyncio

from coroutinepoolexecutor import CoroutinePoolExecutor


async def task(i):
    await asyncio.sleep(1)
    print(f"task-{i}")


async def main():
    async with CoroutinePoolExecutor(2) as executor:
        for i in range(10):
            await executor.submit(task(i))

if __name__ == "__main__":
    asyncio.run(main())

我們知道,在線程池中,工作線程一旦創(chuàng)建會(huì)不斷的領(lǐng)取新的任務(wù)并執(zhí)行,除開 shutdown() 調(diào)用,否則對(duì)于靜態(tài)的線程池來(lái)講工作線程不會(huì)自己結(jié)束。

在上述協(xié)程池代碼實(shí)現(xiàn)中,CoroutinePoolExecutor 類包含了主要的對(duì)外調(diào)用功能的接口、內(nèi)部提供了存儲(chǔ) task 的 Queue、工作協(xié)程自動(dòng)生成 name 的計(jì)數(shù)器、保障協(xié)程的信號(hào)量鎖等等。

而 _worker 函數(shù)是工作協(xié)程的運(yùn)行函數(shù),其會(huì)在工作協(xié)程啟動(dòng)后,不斷的從 CoroutinePoolExecutor 的 Queue 中得到 _WorkItem 并由 _WorkItem 具體執(zhí)行 coro task。

剩下的 _WorkItem 是一個(gè) future 對(duì)象與 coro task 的封裝器,其功能是解耦 future 對(duì)象和 coro task、并在 coro task 運(yùn)行時(shí)和運(yùn)行后設(shè)置 future 的結(jié)果。

對(duì)于異步循環(huán)的思考

在此 CoroutinePoolExecutor 實(shí)現(xiàn)后,我其實(shí)又有了一個(gè)新的思考。Python 的 EventLoop 相較于 Node.js 的 EventLoop 來(lái)說(shuō)其實(shí)更加的底層,它有感的暴露了出來(lái)。

具體體現(xiàn)在當(dāng) Python Event Loop 啟動(dòng)后,如果 main coroutine 停止運(yùn)行,那么所有的 subtask coroutine 也會(huì)停止運(yùn)行,尤其是對(duì)于一些需要清理資源的操作、如 aiohttp 的 close session、CoroutinePoolExecutor 的 shutdown 等都會(huì)在某些情況顯得無(wú)措,說(shuō)的更具體點(diǎn)就是不知道在什么時(shí)候調(diào)用。

對(duì)于這些問(wèn)題,我們可以繼承 BaseEventLoop 自己手動(dòng)對(duì) EventLoop 的功能進(jìn)行擴(kuò)展,如在事件循環(huán)關(guān)閉之前添加 hook function,甚至可以限制整個(gè) EventLoop 的 max_workers 或者做成動(dòng)態(tài)的可調(diào)節(jié) coroutine 數(shù)量的 EventLoop 都行。

無(wú)論如何,只要心里有想法,就可以去將它實(shí)現(xiàn) .. 學(xué)習(xí)本身就是一個(gè)不斷挑戰(zhàn)的過(guò)程。

到此這篇關(guān)于Python Coroutine池化的實(shí)現(xiàn)詳解的文章就介紹到這了,更多相關(guān)Python Coroutine池化內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論