欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

python異步編程之a(chǎn)syncio低階API的使用詳解

 更新時(shí)間:2024年01月08日 08:44:43   作者:金色旭光  
asyncio中低階API的種類很多,涉及到開(kāi)發(fā)的5個(gè)方面,這篇文章主要為大家詳細(xì)介紹了這些低階API的具體使用,感興趣的小伙伴可以學(xué)習(xí)一下

1.低階API介紹

asyncio中低階API的種類很多,涉及到開(kāi)發(fā)的5個(gè)方面。包括:

  • 獲取事件循環(huán)
  • 事件循環(huán)方法集
  • 傳輸
  • 協(xié)議
  • 事件循環(huán)策略

本篇中只講解asyncio常見(jiàn)常用的函數(shù),很多底層函數(shù)如網(wǎng)絡(luò)、IPC、套接字、信號(hào)等不在本篇范圍。

2.獲取事件循環(huán)

事件循環(huán)是異步中重要的概念之一,用于驅(qū)動(dòng)任務(wù)的執(zhí)行。包含的低階API如下:

函數(shù)功能
asyncio.get_running_loop()獲取當(dāng)前運(yùn)行的事件循環(huán)首選函數(shù)。
asyncio.get_event_loop()獲得一個(gè)事件循環(huán)實(shí)例
asyncio.set_event_loop()將策略設(shè)置到事件循環(huán)
asyncio.new_event_loop()創(chuàng)建一個(gè)新的事件循環(huán)

在asyncio初識(shí)這篇中提到過(guò)事件循環(huán),可以把事件循環(huán)當(dāng)做是一個(gè)while循環(huán),在周期性的運(yùn)行并執(zhí)行一些任務(wù)。這個(gè)說(shuō)法比較抽象,事件循環(huán)本質(zhì)上其實(shí)是能調(diào)用操作系統(tǒng)IO模型的模塊。以Linux系統(tǒng)為例,IO模型有阻塞,非阻塞,IO多路復(fù)用等。asyncio 常用的是IO多路復(fù)用模型的epool和 kqueue。事件循環(huán)原理涉及到異步編程的操作系統(tǒng)原理,后續(xù)更新一系列相關(guān)文章。

get_event_loop()創(chuàng)建一個(gè)事件循環(huán),用于驅(qū)動(dòng)協(xié)程的執(zhí)行

import asyncio

async def demo(i):
    print(f"hello {i}")

def main():
    loop = asyncio.get_event_loop()
    print(loop._selector)
    task = loop.create_task(demo(1))
    loop.run_until_complete(task)

main()

結(jié)果:

<selectors.KqueueSelector object at 0x104eabe20>
hello 1

可以通過(guò)loop._selector屬性獲取到當(dāng)前事件循環(huán)使用的是kqueue模型

獲取循環(huán)

import asyncio

async def demo(i):
    res = asyncio.get_running_loop()
    print(res)
    print(f"hello {i}")


def main():
    loop = asyncio.get_event_loop()
    task = loop.create_task(demo(1))
    loop.run_until_complete(task)
main()

結(jié)果:

<_UnixSelectorEventLoop running=True closed=False debug=False>
hello 1

推薦使用asyncio.run 創(chuàng)建事件循環(huán),底層API主要用于庫(kù)的編寫。

3.生命周期

生命周期是用于管理任務(wù)的啟停的函數(shù),如下:

函數(shù)功能
loop.run_until_complete()運(yùn)行一個(gè)期程/任務(wù)/可等待對(duì)象直到完成。
loop.run_forever()一直運(yùn)行事件循環(huán),直到被顯示停止
loop.stop()停止事件循環(huán)
loop.close()關(guān)閉事件循環(huán)
loop.is_running()返回 True , 如果事件循環(huán)正在運(yùn)行
loop.is_closed()返回 True ,如果事件循環(huán)已經(jīng)被關(guān)閉
await loop.shutdown_asyncgens()關(guān)閉異步生成器

run_until_complete

運(yùn)行一個(gè)期程/任務(wù)/可等待對(duì)象直到完成。run_until_complete的參數(shù)是一個(gè)futrue對(duì)象。當(dāng)傳入一個(gè)協(xié)程,其內(nèi)部會(huì)自動(dòng)封裝成task。run_until_complete()是會(huì)自動(dòng)關(guān)閉事件循環(huán)的函數(shù),區(qū)別于run_forever()是需要手動(dòng)關(guān)閉事件循環(huán)的函數(shù)。

import asyncio 


async def demo(i):
    print(f"hello {i}")


def main():
    loop = asyncio.get_event_loop()
    
    task = loop.create_task(demo(1))

    # 傳入的是一個(gè)任務(wù)
    loop.run_until_complete(task)

    # 傳入的是一個(gè)協(xié)程也可以
    loop.run_until_complete(demo(20))


main()

結(jié)果:

hello 1
hello 20

4.調(diào)試

函數(shù)功能
loop.set_debug()開(kāi)啟或禁用調(diào)試模式
loop.get_debug()獲取當(dāng)前測(cè)試模式

5.調(diào)度回調(diào)函數(shù)

在異步編程中回調(diào)函數(shù)是一種很常見(jiàn)的方法,想要在事件循環(huán)中增加一些回調(diào)函數(shù),可以有如下方法:

函數(shù)功能
loop.call_soon()盡快調(diào)用回調(diào)。
loop.call_soon_threadsafe()loop.call_soon() 方法線程安全的變體。
loop.call_later()在給定時(shí)間之后調(diào)用回調(diào)函數(shù)。
loop.call_at()在指定的時(shí)間調(diào)用回調(diào)函數(shù)。

這些回調(diào)函數(shù)既可以回調(diào)普通函數(shù)也可以回調(diào)協(xié)程函數(shù)。

call_soon函數(shù)原型:

loop.call_soon(callback, *args, context=None)

示例:

import asyncio

async def my_coroutine():
    print("協(xié)程被執(zhí)行")

async def other_coro():
    print("非call_soon調(diào)用")

def callback_function():
    print("回調(diào)函數(shù)被執(zhí)行")


# 創(chuàng)建一個(gè)事件循環(huán)
loop = asyncio.get_event_loop()

# 使用create_task包裝協(xié)程函數(shù),并調(diào)度執(zhí)行
loop.call_soon(loop.create_task, my_coroutine())

# 調(diào)度一個(gè)常規(guī)函數(shù)以盡快執(zhí)行
loop.call_soon(callback_function)

# 啟動(dòng)一個(gè)事件循環(huán)
task = loop.create_task(other_coro())
loop.run_until_complete(task)

結(jié)果:

回調(diào)函數(shù)被執(zhí)行
非call_soon調(diào)用
協(xié)程被執(zhí)行

結(jié)果分析:

call_soon調(diào)用普通函數(shù)直接傳入函數(shù)名作為參數(shù),調(diào)用協(xié)程函數(shù)需要講協(xié)程通過(guò)loop.create_task封裝成task。

6.線程/進(jìn)程池

函數(shù)功能
await loop.run_in_executor()多線程中運(yùn)行一個(gè)阻塞的函數(shù)
loop.set_default_executor()設(shè)置 loop.run_in_executor() 默認(rèn)執(zhí)行器

asyncio.run_in_executor 用于在異步事件循環(huán)中執(zhí)行一個(gè)阻塞的函數(shù)或方法。它將阻塞的調(diào)用委托給一個(gè)線程池或進(jìn)程池,以確保不阻塞主事件循環(huán)??梢杂糜谠趨f(xié)程中調(diào)用一些不支持異步編程的方法,不支持異步編程的模塊。

run_in_executor

import asyncio
import concurrent.futures

def blocking_function():
    # 模擬一個(gè)阻塞的操作
    import time
    time.sleep(2)
    return "阻塞函數(shù)返回"

async def async_function2():
    print("async_function2 start")
    await asyncio.sleep(1)
    print("async_function2 end")

async def async_function():
    print("異步函數(shù)開(kāi)始執(zhí)行。。。")

    print("調(diào)用同步阻塞函數(shù)")
    # 使用run_in_executor調(diào)度執(zhí)行阻塞函數(shù)
    result = await loop.run_in_executor(None, blocking_function)

    print(f"獲取同步函數(shù)的結(jié)果: {result}")

# 創(chuàng)建一個(gè)事件循環(huán)
loop = asyncio.get_event_loop()

# 運(yùn)行異步函數(shù)
loop.run_until_complete(asyncio.gather(async_function(), async_function2()))

結(jié)果:

異步函數(shù)開(kāi)始執(zhí)行。。。
調(diào)用同步阻塞函數(shù)
async_function2 start
async_function2 end
獲取同步函數(shù)的結(jié)果: 阻塞函數(shù)返回

結(jié)果分析:

通過(guò)事件循環(huán)執(zhí)行任務(wù)async_function,在async_function中通過(guò)loop.run_in_executor調(diào)用同步阻塞函數(shù)blocking_function,該阻塞函數(shù)沒(méi)有影響事件循環(huán)中另一個(gè)任務(wù)async_function2的執(zhí)行。

await loop.run_in_executor(None, blocking_function)中None代表使用的是默認(rèn)線程池,也可以替換成其他線程池。

使用自定義線程池和進(jìn)程池

import asyncio
import concurrent.futures

def blocking_function():
    # 模擬一個(gè)阻塞的操作
    import time
    time.sleep(2)
    return "阻塞函數(shù)返回"

async def async_function():
    print("異步函數(shù)開(kāi)始執(zhí)行。。。")

    print("調(diào)用同步阻塞函數(shù)")

    # 線程池
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, blocking_function)
        print('線程池調(diào)用返回結(jié)果:', result)

    # 進(jìn)程池
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, blocking_function)
        print('進(jìn)程池調(diào)用返回結(jié)果:', result)

if __name__ == '__main__':
    # 創(chuàng)建一個(gè)事件循環(huán)
    loop = asyncio.get_event_loop()

    # 運(yùn)行異步函數(shù)
    loop.run_until_complete(async_function())

結(jié)果:

異步函數(shù)開(kāi)始執(zhí)行。。。
調(diào)用同步阻塞函數(shù)
線程池調(diào)用返回結(jié)果: 阻塞函數(shù)返回
進(jìn)程池調(diào)用返回結(jié)果: 阻塞函數(shù)返回

結(jié)果分析:

通過(guò)線程池concurrent.futures.ThreadPoolExecutor()和進(jìn)程池concurrent.futures.ProcessPoolExecutor()執(zhí)行阻塞函數(shù)。

7.任務(wù)與期程

函數(shù)功能
loop.create_future()創(chuàng)建一個(gè) Future 對(duì)象。
loop.create_task()將協(xié)程當(dāng)作 Task 一樣調(diào)度。
loop.set_task_factory()設(shè)置 loop.create_task() 使用的工廠,它將用來(lái)創(chuàng)建 Tasks 。
loop.get_task_factory()獲取 loop.create_task() 使用的工廠,它用來(lái)創(chuàng)建 Tasks 。

create_futurecreate_future 的功能是創(chuàng)建一個(gè)future對(duì)象。future對(duì)象通常不需要手動(dòng)創(chuàng)建,因?yàn)閠ask會(huì)自動(dòng)管理任務(wù)結(jié)果。相當(dāng)于task是全自動(dòng),創(chuàng)建future是半自動(dòng)。創(chuàng)建的future就需要手動(dòng)的講future狀態(tài)設(shè)置成完成,才能表示task的狀態(tài)為完成。

import asyncio


def foo(future, result):
    print(f"此時(shí)future的狀態(tài):{future}")
    future.set_result(result)
    print(f"此時(shí)future的狀態(tài):{future}")


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    # 手動(dòng)創(chuàng)建future對(duì)象
    all_done = loop.create_future()

    # 設(shè)置一個(gè)回調(diào)函數(shù)用于修改設(shè)置future的結(jié)果
    loop.call_soon(foo, all_done, "Future is done!")

    result = loop.run_until_complete(all_done)

    print("返回結(jié)果", result)
    print("獲取future的結(jié)果", all_done.result())

結(jié)果:

此時(shí)future的狀態(tài):<Future pending cb=[_run_until_complete_cb() at /Users/lib/python3.10/asyncio/base_events.py:184]>
此時(shí)future的狀態(tài):<Future finished result='Future is done!'>
返回結(jié)果 Future is done!
獲取future的結(jié)果 Future is done!

結(jié)果分析:

future設(shè)置結(jié)果之后之后,future對(duì)象的狀態(tài)就從pending變成finished狀態(tài)。如果一個(gè)future沒(méi)有手動(dòng)設(shè)置結(jié)果,那么事件循環(huán)就不會(huì)停止。

create_task將協(xié)程封裝成一個(gè)task對(duì)象,事件循環(huán)主要操作的是task對(duì)象。協(xié)程沒(méi)有狀態(tài),而task是有狀態(tài)的。

import asyncio 


async def demo(i):
    print(f"hello {i}")
    await asyncio.sleep(1)

def main():
    loop = asyncio.get_event_loop()

    # 將攜程封裝成task,給事件使用
    task = loop.create_task(demo(1))

    loop.run_until_complete(task)

main()
>>> 
hello 1

asyncio.create_task 和 loop.create_task的區(qū)別:

兩者實(shí)現(xiàn)的功能都是一樣的,將協(xié)程封裝成一個(gè)task,讓協(xié)程擁有了生命周期。區(qū)別僅僅在于使用的方法。asyncio.create_task 是高階API,不需要?jiǎng)?chuàng)建事件循環(huán),而loop.create_task需要先創(chuàng)建事件循環(huán)再使用該方法。

到此這篇關(guān)于python異步編程之a(chǎn)syncio低階API的使用詳解的文章就介紹到這了,更多相關(guān)python asyncio低階API內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論