欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python Coroutine池化的實現(xiàn)詳解

 更新時間:2024年01月29日 13:55:28   作者:.Hanabi  
在當(dāng)今計算機科學(xué)和軟件工程的領(lǐng)域中,池化技術(shù)如線程池、連接池和對象池等已經(jīng)成為優(yōu)化資源利用率和提高軟件性能的重要工具,所以下面我們就來看看Coroutine池化的具體實現(xiàn)吧

池化介紹

在當(dāng)今計算機科學(xué)和軟件工程的領(lǐng)域中,池化技術(shù)如線程池、連接池和對象池等已經(jīng)成為優(yōu)化資源利用率和提高軟件性能的重要工具。然而,在 Python 的協(xié)程領(lǐng)域,我們卻很少見到類似于 ThreadPoolExecutor 的 CoroutinePoolExecutor。為什么會這樣呢?

首先,Python Coroutine 的特性使得池化技術(shù)在協(xié)程中的應(yīng)用相對較少。與像 Golang 這樣支持有棧協(xié)程的語言不同,Python Coroutine 是無棧的,無法跨核執(zhí)行,從而限制了協(xié)程池發(fā)揮多核優(yōu)勢的可能性。

其次,Python Coroutine 的輕量級和快速創(chuàng)建銷毀的特性,使得頻繁創(chuàng)建和銷毀協(xié)程并不會帶來顯著的性能損耗。這也解釋了為什么 Python 官方一直沒有引入 CoroutinePoolExecutor。

然而,作為開發(fā)者,我們?nèi)匀豢梢栽谔囟▓鼍跋驴紤]協(xié)程的池化。雖然 Python Coroutine 輕量,但在一些需要大量協(xié)程協(xié)同工作的應(yīng)用中,池化技術(shù)能夠提供更方便、統(tǒng)一的調(diào)度子協(xié)程的方式。尤其是在涉及到異步操作的同時需要控制并發(fā)數(shù)量時,協(xié)程池的優(yōu)勢就顯而易見了。

關(guān)于 Python 官方是否會在未來引入類似于 TaskGroup 的 CoroutinePoolExecutor,這或許是一個懸而未決的問題??紤]到 Python 在異步編程方面的快速發(fā)展,我們不能排除未來可能性的存在?;蛟S有一天,我們會看到 TaskGroup 引入一個 max_workers 的形參,以更好地支持對協(xié)程池的需求。

在實際開發(fā)中,我們也可以嘗試編寫自己的 CoroutinePoolExecutor,以滿足特定業(yè)務(wù)場景的需求。通過合理的設(shè)計架構(gòu)和對數(shù)據(jù)流的全局考慮,我們可以最大程度地發(fā)揮協(xié)程池的優(yōu)勢,提高系統(tǒng)的性能和響應(yīng)速度。

在接下來的文章中,我們將探討如何設(shè)計和實現(xiàn)一個簡單的 CoroutinePoolExecutor,以及在實際項目中的應(yīng)用場景。通過深入理解協(xié)程池的工作原理,我們或許能更好地利用這一技術(shù),使我們的異步應(yīng)用更為高效。

如何開始編寫

如何開始編寫 CoroutinePoolExecutor,首先我們要明確出其適用范疇、考慮到使用方式和其潛在的風(fēng)險點:

  • 它并不適用于 Mult Thread + Mult Event Loop 的場景,因此它并非線程安全的。
  • 應(yīng)當(dāng)保持和 ThreadPoolExecutor 相同的調(diào)用方式。
  • 不同于 Mult Thread 中子線程不依賴于主線程的運行,而在 Mult Coroutine 中子協(xié)程必須依賴于主協(xié)程,因此主協(xié)程在子協(xié)程沒有全部運行完畢之前不能直接 done 掉。這也解釋了為什么 TaskGroup 官方實現(xiàn)中沒有提供類似于 shutdown 之類的方法,而是只提供上下文管理的運行方式。

有了上述 3 點的考量,我們決定將 ThreadPoolExecutor 平替成 CoroutinePoolExecutor。這樣的好處在于,作為學(xué)習(xí)者一方面可以了解 ThreadPoolExecutor 的內(nèi)部實現(xiàn)機制,另一方面站在巨人肩膀上的編程借鑒往往會事半功倍,對于自我的提升也是較為明顯的。

在考慮這些因素的同時,我們將繼續(xù)深入研究協(xié)程池的設(shè)計和實現(xiàn)。通過對適用范圍和使用方式的明確,我們能更好地把握 CoroutinePoolExecutor 的潛在優(yōu)勢,為異步應(yīng)用的性能提升做出更有針對性的貢獻。

具體代碼實現(xiàn)

在這里我先貼出完整的代碼實現(xiàn),其中著重點已經(jīng)用注釋標(biāo)明。

以下是 CoroutinePoolExecutor 的代碼實現(xiàn):

import os
import asyncio
import weakref
import logging
import itertools


async def _worker(executor_reference: "CoroutinePoolExecutor", work_queue: asyncio.Queue):
    try:
        while True:
            work_item = await work_queue.get()

            if work_item is not None:
                await work_item.run()
                del work_item

                executor = executor_reference()
                if executor is not None:
                    # Notify available coroutines
                    executor._idle_semaphore.release()
                del executor
                continue

            # Notifies the next coroutine task that it is time to exit
            await work_queue.put(None)
            break

    except Exception as exc:
        logging.critical('Exception in worker', exc_info=True)


class _WorkItem:
    def __init__(self, future, coro):
        self.future = future
        self.coro = coro

    async def run(self):
        try:
            result = await self.coro
        except Exception as exc:
            self.future.set_exception(exc)
        else:
            self.future.set_result(result)


class CoroutinePoolExecutor:
    """
    Coroutine pool implemented based on ThreadPoolExecutor
    Different from ThreadPoolExecutor, because the running of sub-coroutine depends on the main coroutine
    So you must use the shutdown method to wait for all subtasks and wait for them to complete execution
    """

    # Used to assign unique thread names when coroutine_name_prefix is not supplied.
    _counter = itertools.count().__next__

    def __init__(self, max_workers, coroutine_name_prefix=""):

        if max_workers is None:
            max_workers = min(32, (os.cpu_count() or 1) + 4)
        if max_workers <= 0:
            raise ValueError("max_workers must be greater than 0")

        self._max_workers = max_workers
        self._work_queue = asyncio.Queue()
        self._idle_semaphore = asyncio.Semaphore(0)
        self._coroutines = set()
        self._shutdown = False
        self._shutdown_lock = asyncio.Lock()
        self._coroutine_name_prefix = (coroutine_name_prefix or (
            f"{__class__.__name__}-{self._counter()}"
        ))

    async def submit(self, coro):
        async with self._shutdown_lock:
            # When the executor is closed, new coroutine tasks should be rejected, otherwise it will cause the problem that the newly added tasks cannot be executed.
            # This is because after shutdown, all sub-coroutines will end their work
            # one after another. Even if there are new coroutine tasks, they will not
            # be reactivated.
            if self._shutdown:
                raise RuntimeError('cannot schedule new coroutine task after shutdown')

            f = asyncio.Future()
            w = _WorkItem(
                f,
                coro
            )
            await self._work_queue.put(w)
            await self._adjust_coroutine_count()
            return f

    async def _adjust_coroutine_count(self):

        try:
            # 2 functions:
            # - When there is an idle coroutine and the semaphore is not 0, there is no need to create a new sub-coroutine.
            # - Prevent exceptions from modifying self._coroutines members when the for loop self._coroutines and await task in shutdown are modified
            # Since the Semaphore provided by asyncio does not have a timeout
            # parameter, you can choose to use it with wait_for.
            if await asyncio.wait_for(
                    self._idle_semaphore.acquire(),
                    0
            ):
                return
        except TimeoutError:
            pass

        num_coroutines = len(self._coroutines)
        if num_coroutines < self._max_workers:
            coroutine_name = f"{self._coroutine_name_prefix or self}_{num_coroutines}"
            t = asyncio.create_task(
                coro=_worker(
                    weakref.ref(self),
                    self._work_queue
                ),
                name=coroutine_name
            )

            self._coroutines.add(t)

    async def shutdown(self, wait=True, *, cancel_futures=False):
        async with self._shutdown_lock:
            self._shutdown = True

            if cancel_futures:
                while True:
                    try:
                        work_item = self._work_queue.get_nowait()
                    except asyncio.QueueEmpty:
                        break
                    if work_item is not None:
                        work_item.future.cancel()

            # None is an exit signal, given by the shutdown method, when the shutdown method is called
            # will notify the sub-coroutine to stop working and exit the loop
            await self._work_queue.put(None)

        if wait:
            for t in self._coroutines:
                await t

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.shutdown(wait=True)
        return False

以下是 CoroutinePoolExecutor 的使用方式:

import asyncio

from coroutinepoolexecutor import CoroutinePoolExecutor


async def task(i):
    await asyncio.sleep(1)
    print(f"task-{i}")


async def main():
    async with CoroutinePoolExecutor(2) as executor:
        for i in range(10):
            await executor.submit(task(i))

if __name__ == "__main__":
    asyncio.run(main())

我們知道,在線程池中,工作線程一旦創(chuàng)建會不斷的領(lǐng)取新的任務(wù)并執(zhí)行,除開 shutdown() 調(diào)用,否則對于靜態(tài)的線程池來講工作線程不會自己結(jié)束。

在上述協(xié)程池代碼實現(xiàn)中,CoroutinePoolExecutor 類包含了主要的對外調(diào)用功能的接口、內(nèi)部提供了存儲 task 的 Queue、工作協(xié)程自動生成 name 的計數(shù)器、保障協(xié)程的信號量鎖等等。

而 _worker 函數(shù)是工作協(xié)程的運行函數(shù),其會在工作協(xié)程啟動后,不斷的從 CoroutinePoolExecutor 的 Queue 中得到 _WorkItem 并由 _WorkItem 具體執(zhí)行 coro task。

剩下的 _WorkItem 是一個 future 對象與 coro task 的封裝器,其功能是解耦 future 對象和 coro task、并在 coro task 運行時和運行后設(shè)置 future 的結(jié)果。

對于異步循環(huán)的思考

在此 CoroutinePoolExecutor 實現(xiàn)后,我其實又有了一個新的思考。Python 的 EventLoop 相較于 Node.js 的 EventLoop 來說其實更加的底層,它有感的暴露了出來。

具體體現(xiàn)在當(dāng) Python Event Loop 啟動后,如果 main coroutine 停止運行,那么所有的 subtask coroutine 也會停止運行,尤其是對于一些需要清理資源的操作、如 aiohttp 的 close session、CoroutinePoolExecutor 的 shutdown 等都會在某些情況顯得無措,說的更具體點就是不知道在什么時候調(diào)用。

對于這些問題,我們可以繼承 BaseEventLoop 自己手動對 EventLoop 的功能進行擴展,如在事件循環(huán)關(guān)閉之前添加 hook function,甚至可以限制整個 EventLoop 的 max_workers 或者做成動態(tài)的可調(diào)節(jié) coroutine 數(shù)量的 EventLoop 都行。

無論如何,只要心里有想法,就可以去將它實現(xiàn) .. 學(xué)習(xí)本身就是一個不斷挑戰(zhàn)的過程。

到此這篇關(guān)于Python Coroutine池化的實現(xiàn)詳解的文章就介紹到這了,更多相關(guān)Python Coroutine池化內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Zabbix實現(xiàn)微信報警功能

    Zabbix實現(xiàn)微信報警功能

    這篇文章主要介紹了Zabbix實現(xiàn)微信報警的相關(guān)資料,本文圖文并茂介紹的非常詳細,需要的朋友可以參考下
    2016-10-10
  • 一個基于flask的web應(yīng)用誕生 用戶注冊功能開發(fā)(5)

    一個基于flask的web應(yīng)用誕生 用戶注冊功能開發(fā)(5)

    一個基于flask的web應(yīng)用誕生第五篇,這篇文章主要介紹了用戶注冊功能開發(fā),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-04-04
  • python使用代理IP爬取貓眼電影專業(yè)評分?jǐn)?shù)據(jù)

    python使用代理IP爬取貓眼電影專業(yè)評分?jǐn)?shù)據(jù)

    在編寫爬蟲程序的過程中,IP封鎖無疑是一個常見且棘手的問題,盡管網(wǎng)絡(luò)上存在大量的免費IP代理網(wǎng)站,但其質(zhì)量往往參差不齊,令人堪憂,本篇文章中介紹一下如何使用Python的Requests庫和BeautifulSoup庫來抓取貓眼電影網(wǎng)站上的專業(yè)評分?jǐn)?shù)據(jù),需要的朋友可以參考下
    2024-03-03
  • 在Python中處理日期和時間的基本知識點整理匯總

    在Python中處理日期和時間的基本知識點整理匯總

    這篇文章主要介紹了在Python中處理日期和時間的基本知識點整理匯總,是Python入門學(xué)習(xí)中的基礎(chǔ)知識,需要的朋友可以參考下
    2015-05-05
  • 對python_discover方法遍歷所有執(zhí)行的用例詳解

    對python_discover方法遍歷所有執(zhí)行的用例詳解

    今天小編就為大家分享一篇對python_discover方法遍歷所有執(zhí)行的用例詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2019-02-02
  • Python使用LRU緩存策略進行緩存的方法步驟

    Python使用LRU緩存策略進行緩存的方法步驟

    本文主要介紹了Python使用LRU緩存策略進行緩存的方法步驟,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2022-05-05
  • Python編程實現(xiàn)簡易的音樂播放器基本操作

    Python編程實現(xiàn)簡易的音樂播放器基本操作

    這篇文章主要來教大家利用Python編程來實現(xiàn)一個簡易的音樂播放器,文中含有基本功能的操作示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助
    2021-10-10
  • python數(shù)字圖像處理之圖像簡單濾波實現(xiàn)

    python數(shù)字圖像處理之圖像簡單濾波實現(xiàn)

    這篇文章主要為大家介紹了python數(shù)字圖像處理之圖像簡單濾波實現(xiàn)示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-06-06
  • python命令行參數(shù)用法實例分析

    python命令行參數(shù)用法實例分析

    這篇文章主要介紹了python命令行參數(shù)用法,結(jié)合實例形式分析了Python基于optparse模塊處理命令行參數(shù)相關(guān)使用技巧,需要的朋友可以參考下
    2019-06-06
  • Python3中省略號(...)用法介紹

    Python3中省略號(...)用法介紹

    本文主要介紹了Python3中省略號(...)用法介紹,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-02-02

最新評論