欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

python協(xié)程庫(kù)asyncio(異步io)問(wèn)題

 更新時(shí)間:2023年11月10日 14:50:37   作者:少年白char  
這篇文章主要介紹了python協(xié)程庫(kù)asyncio(異步io)問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教

介紹

  • 異步IO:就是發(fā)起一個(gè)IO操作(如:網(wǎng)絡(luò)請(qǐng)求,文件讀寫(xiě)等),這些操作一般是比較耗時(shí)的,不用等待它結(jié)束,可以繼續(xù)做其他事情,結(jié)束時(shí)會(huì)發(fā)來(lái)通知。
  • 協(xié)程:又稱(chēng)為微線程,在一個(gè)線程中執(zhí)行,執(zhí)行函數(shù)時(shí)可以隨時(shí)中斷,由程序(用戶(hù))自身控制,執(zhí)行效率極高,與多線程比較,沒(méi)有切換線程的開(kāi)銷(xiāo)和多線程鎖機(jī)制。

asyncio中幾個(gè)重要概念

1.事件循環(huán)

事件循環(huán)是每個(gè) asyncio 應(yīng)用的核心,管理所有的事件,在整個(gè)程序運(yùn)行過(guò)程中不斷循環(huán)執(zhí)行并追蹤事件發(fā)生的順序?qū)⑺鼈兎旁陉?duì)列中,空閑時(shí)調(diào)用相應(yīng)的事件處理者來(lái)處理這些事件。

創(chuàng)建事件循環(huán)

loop = asyncio.get_event_loop()

獲取當(dāng)前事件循環(huán)。

如果當(dāng)前 OS 線程沒(méi)有設(shè)置當(dāng)前事件循環(huán)并且 set_event_loop() 還沒(méi)有被調(diào)用,asyncio 將創(chuàng)建一個(gè)新的事件循環(huán)并將其設(shè)置為當(dāng)前循環(huán)。

另起一個(gè)線程創(chuàng)建事件循環(huán)

from threading import Thread
import asyncio

def start_thread_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()
    
new_loop = asyncio.new_event_loop()
loop_thread = Thread(target=start_thread_loop, args=(new_loop,))
loop_thread.setDaemon(True) # 守護(hù)線程
loop_thread.start()

2.Future

Future對(duì)象表示尚未完成的計(jì)算,還未完成的結(jié)果,它和task上沒(méi)有本質(zhì)上的區(qū)別

3.Task

是Future的子類(lèi),作用是在運(yùn)行某個(gè)任務(wù)的同時(shí)可以并發(fā)的運(yùn)行多個(gè)任務(wù)。asyncio.Task用于實(shí)現(xiàn)協(xié)作式多任務(wù)的庫(kù),且Task對(duì)象不能用戶(hù)手動(dòng)實(shí)例化,通過(guò)下面2個(gè)函數(shù)創(chuàng)建:loop.create_task() 或 asyncio.ensure_future()

  • loop.create_task() ,要在定義loop對(duì)象之后,調(diào)用將方法對(duì)象轉(zhuǎn)化成了task的對(duì)象
  • asyncio.ensure_future() 直接調(diào)用asyncio 的ensure_future() 方法,返回的也是task 對(duì)象(我們還沒(méi)有聲明 loop 也可以提前定義好 task 對(duì)象)

4.async/await 關(guān)鍵字

asyncio實(shí)現(xiàn)了TCP、UDP、SSL等協(xié)議,async定義一個(gè)協(xié)程,await用于掛起阻塞的異步調(diào)用接口。對(duì)于異步io你需要知道的重點(diǎn),要注意的是,await語(yǔ)法只能出現(xiàn)在通過(guò)async修飾的函數(shù)中,否則會(huì)報(bào)SyntaxError錯(cuò)誤。而且await后面的對(duì)象需要是一個(gè)Awaitable,或者實(shí)現(xiàn)了相關(guān)的協(xié)議。

注意

  • 所有需要異步執(zhí)行的函數(shù),都需要asyncio中的輪詢(xún)器去輪詢(xún)執(zhí)行,如果函數(shù)阻塞,輪詢(xún)器就會(huì)去執(zhí)行下一個(gè)函數(shù)。所以所有需要異步執(zhí)行的函數(shù)都需要加入到這個(gè)輪詢(xún)器中。
  • 若在協(xié)程中需要有延時(shí)操作,應(yīng)該使用 await asyncio.sleep(),而不是使用time.sleep(),因?yàn)槭褂胻ime.sleep()后會(huì)釋放GIL,阻塞整個(gè)主線程,從而阻塞整個(gè)事件循環(huán)。

創(chuàng)建一個(gè)協(xié)程

使用async可以定義協(xié)程對(duì)象,使用await可以針對(duì)耗時(shí)的操作進(jìn)行掛起,就像生成器里的yield一樣,函數(shù)讓出控制權(quán)。協(xié)程遇到await,事件循環(huán)將會(huì)掛起該協(xié)程,執(zhí)行別的協(xié)程,直到其他的協(xié)程也掛起或者執(zhí)行完畢,再進(jìn)行下一個(gè)協(xié)程的執(zhí)行

耗時(shí)的操作一般是一些IO操作,例如網(wǎng)絡(luò)請(qǐng)求,文件讀取等。我們使用asyncio.sleep函數(shù)來(lái)模擬IO操作。協(xié)程的目的也是讓這些IO操作異步化。

簡(jiǎn)單例子

import asyncio
 
async def execute(x):
    print('Number:', x)
 
coroutine = execute(1)
print('Coroutine:', coroutine)
print('After calling execute')
 
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)
print('After calling loop')

首先我們引入了 asyncio 這個(gè)包,這樣我們才可以使用 async 和 await,然后我們使用 async 定義了一個(gè) execute() 方法,方法接收一個(gè)數(shù)字參數(shù),方法執(zhí)行之后會(huì)打印這個(gè)數(shù)字。

隨后我們直接調(diào)用了這個(gè)方法,然而這個(gè)方法并沒(méi)有執(zhí)行,而是返回了一個(gè) coroutine 協(xié)程對(duì)象。隨后我們使用 get_event_loop() 方法創(chuàng)建了一個(gè)事件循環(huán) loop,并調(diào)用了 loop 對(duì)象的 run_until_complete() 方法將協(xié)程注冊(cè)到事件循環(huán) loop 中,然后啟動(dòng)。最后我們才看到了 execute() 方法打印了輸出結(jié)果。

可見(jiàn),async 定義的方法就會(huì)變成一個(gè)無(wú)法直接執(zhí)行的 coroutine 對(duì)象,必須將其注冊(cè)到事件循環(huán)中才可以執(zhí)行。

進(jìn)階例子

多個(gè)任務(wù),定義一個(gè)task列表,使用asyncio.gather(*tasks) 或 asyncio.wait(tasks) 接收

import asyncio
import time

now = lambda: time.time()

"""
asyncio.gather主要集中在收集結(jié)果上。它等待一堆task并按給定的順序返回結(jié)果。

asyncio.wait等待task。而不是直接給你結(jié)果,它提供完成和待處理的任務(wù)。你必須手工收集結(jié)果。
asyncio.wait(tasks) ps:asyncio.wait([1,2,3]) 也可以使用 asyncio.gather(*tasks) ps: asyncio.gather(1,2,3),前者接受一個(gè)task列表,后者接收一堆task。
"""


# 定義一個(gè)異步任務(wù)
async def do_some_work(x):
    print("waiting:", x)
    # 模擬io阻塞
    await asyncio.sleep(x)
    return "Done after {}s".format(x)


async def main(loop):
    """
    :param loop: loop.create_task(需要傳進(jìn)loop參數(shù))
    :return: None
    """
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(4)
    # asyncio.ensure_future
    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]
    # loop.create_task(需要傳進(jìn)loop參數(shù))
    # tasks = [
    #     loop.create_task(coroutine1),
    #     loop.create_task(coroutine2),
    #     loop.create_task(coroutine3)
    # ]
    # 返回 完成的 task object
    dones, pendings = await asyncio.wait(tasks)
    print(dones, pendings)
    for task in dones:
        print("Task ret:", task.result())

    # 返回 task 方法的 返回值
    # results = await asyncio.gather(*tasks)
    # for result in results:
    #     print("Task ret:",result)


start = now()
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
print("Time:", now() - start)

gather和wait 的區(qū)別

把多個(gè)協(xié)程注冊(cè)進(jìn)一個(gè)事件循環(huán)中的兩種方法

使用方式區(qū)別

1.使用asyncio.wait()

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

2.使用asyncio.gather()

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks)) # *接收args參數(shù) 

接收參數(shù)區(qū)別

asyncio.wait

參數(shù)必須是list對(duì)象 ,list 對(duì)象存放多個(gè) task object

用asyncio.ensure_future轉(zhuǎn)為task對(duì)象

tasks=[
       asyncio.ensure_future(coroutine1),
       asyncio.ensure_future(coroutine2),
       asyncio.ensure_future(coroutine3)
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

不轉(zhuǎn)為task對(duì)象

loop = asyncio.get_event_loop()

tasks=[
       coroutine1,
       coroutine2,
       coroutine3
]

loop.run_until_complete(asyncio.wait(tasks))

asyncio.gather

必須用 * 來(lái)接收 list 對(duì)象

tasks=[
       asyncio.ensure_future(coroutine1),
       asyncio.ensure_future(coroutine2),
       asyncio.ensure_future(coroutine3)
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))

返回結(jié)果區(qū)別

asyncio.wait

asyncio.wait返回donespendings

  • dones:表示已經(jīng)完成的任務(wù)
  • pendings:表示未完成的任務(wù)

我們需要手動(dòng)去獲取結(jié)果

dones, pendings = await asyncio.wait(tasks)
    print(dones, pendings)
    for task in dones:
        print("Task ret:", task.result())

asyncio.gather

它的返回值就是 return的結(jié)果 ,不用再task.result() 來(lái)獲取

# 返回 task 方法的 返回值
    results = await asyncio.gather(*tasks)
    for result in results:
         print("Task ret:",result)

另 asyncio.wait 帶有控制功能

【控制運(yùn)行任務(wù)數(shù)】:運(yùn)行第一個(gè)任務(wù)就返回

  • FIRST_COMPLETED :第一個(gè)任務(wù)完全返回
  • FIRST_EXCEPTION:產(chǎn)生第一個(gè)異常返回
  • ALL_COMPLETED:所有任務(wù)完成返回 (默認(rèn)選項(xiàng))
import asyncio
import random


async def coro(tag):
    print(">", tag)
    await asyncio.sleep(random.uniform(0.5, 5))
    print("<", tag)
    return tag


loop = asyncio.get_event_loop()

tasks = [coro(i) for i in range(1, 11)]

# 第一次wait 完成情況
print("Get first result:")
finished, unfinished = loop.run_until_complete(
    asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)) # 第一個(gè)任務(wù)完全返回

for task in finished:
    print(task.result())
print("unfinished:", len(unfinished))

# 繼續(xù)第一次未完成任務(wù)
print("Get more results in 2 seconds:")
finished2, unfinished2 = loop.run_until_complete(
    asyncio.wait(unfinished, timeout=2)) # 超時(shí)2s 返回

for task in finished2:
    print(task.result())
print("unfinished2:", len(unfinished2))

# 繼續(xù)第2次未完成任務(wù)
print("Get all other results:")
finished3, unfinished3 = loop.run_until_complete(asyncio.wait(unfinished2)) # ALL_COMPLETED:所有任務(wù)完成返回 (默認(rèn)項(xiàng))

for task in finished3:
    print(task.result())

loop.close()

動(dòng)態(tài)添加協(xié)程

很多時(shí)候,我們的事件循環(huán)用于注冊(cè)協(xié)程,而有的協(xié)程需要?jiǎng)討B(tài)的添加到事件循環(huán)中。一個(gè)簡(jiǎn)單的方式就是使用多線程。當(dāng)前線程創(chuàng)建一個(gè)事件循環(huán),然后在新建一個(gè)線程,在新線程中啟動(dòng)事件循環(huán)。當(dāng)前線程不會(huì)被block

相關(guān)函數(shù)介紹:

  • loop.call_soon_threadsafe() :與 call_soon()類(lèi)似,等待此函數(shù)返回后馬上調(diào)用回調(diào)函數(shù),返回值是一個(gè) asyncio.Handle 對(duì)象,此對(duì)象內(nèi)只有一個(gè)方法為 cancel()方法,用來(lái)取消回調(diào)函數(shù)。
  • loop.call_soon() : 與call_soon_threadsafe()類(lèi)似,call_soon_threadsafe() 是線程安全的
  • loop.call_later():延遲多少秒后執(zhí)行回調(diào)函數(shù)
  • loop.call_at():在指定時(shí)間執(zhí)行回調(diào)函數(shù),這里的時(shí)間統(tǒng)一使用 loop.time() 來(lái)替代 time.sleep()
  • asyncio.run_coroutine_threadsafe(): 動(dòng)態(tài)的加入?yún)f(xié)程,參數(shù)為一個(gè)回調(diào)函數(shù)和一個(gè)loop對(duì)象,返回值為future對(duì)象,通過(guò)future.result()獲取回調(diào)函數(shù)返回值

動(dòng)態(tài)添加協(xié)程同步方式通過(guò)調(diào)用 call_soon_threadsafe()函數(shù),傳入一個(gè)回調(diào)函數(shù)callback和一個(gè)位置參數(shù)

注意:同步方式,回調(diào)函數(shù) more_work()為普通函數(shù)

import asyncio
from threading import Thread
import time

now = lambda: time.time()


def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()


def more_work(x):
    print('More work {}'.format(x))
    time.sleep(x)
    print('Finished more work {}'.format(x))


start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))

new_loop.call_soon_threadsafe(more_work, 6)
new_loop.call_soon_threadsafe(more_work, 3)
print('here')

啟動(dòng)上述代碼之后,當(dāng)前線程不會(huì)被block,新線程中會(huì)按照順序執(zhí)行call_soon_threadsafe方法注冊(cè)的more_work方法, 后者因?yàn)閠ime.sleep操作是同步阻塞的,因此運(yùn)行完畢more_work需要大致6 + 3

異步方式

import asyncio
import time
from threading import Thread

now = lambda: time.time()


def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()


async def do_some_work(x):
    print('Waiting {}'.format(x))
    await asyncio.sleep(x)
    print('Done after {}s'.format(x))


start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))

asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop)
asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)

上述的例子,主線程中創(chuàng)建一個(gè)new_loop,然后在另外的子線程中開(kāi)啟一個(gè)無(wú)限事件循環(huán)。

主線程通過(guò)run_coroutine_threadsafe新注冊(cè)協(xié)程對(duì)象。這樣就能在子線程中進(jìn)行事件循環(huán)的并發(fā)操作,同時(shí)主線程又不會(huì)被block。一共執(zhí)行的時(shí)間大概在6s左右。

協(xié)程的停止

future對(duì)象有幾個(gè)狀態(tài):

  • Pending
  • Running
  • Done
  • Cacelled

創(chuàng)建future的時(shí)候,task為pending,事件循環(huán)調(diào)用執(zhí)行的時(shí)候當(dāng)然就是running,調(diào)用完畢自然就是done,如果需要停止事件循環(huán),就需要先把task取消??梢允褂胊syncio.Task獲取事件循環(huán)的task

import asyncio
import time

now = lambda: time.time()

async def do_some_work(x):
    print("Waiting:", x)
    await asyncio.sleep(x)
    return "Done after {}s".format(x)


coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(2)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3),
]

start = now()

loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:
    print(asyncio.Task.all_tasks())
    for task in asyncio.Task.all_tasks():
        print(task.cancel())
    loop.stop()
    loop.run_forever()
finally:
    loop.close()

print("Time:", now() - start)

啟動(dòng)事件循環(huán)之后,馬上ctrl+c,會(huì)觸發(fā)run_until_complete的執(zhí)行異常 KeyBorardInterrupt。然后通過(guò)循環(huán)asyncio.Task取消future。

True表示cannel成功,loop stop之后還需要再次開(kāi)啟事件循環(huán),最后在close,不然還會(huì)拋出異常

循環(huán)task,逐個(gè)cancel是一種方案,可是正如上面我們把task的列表封裝在main函數(shù)中,main函數(shù)外進(jìn)行事件循環(huán)的調(diào)用。這個(gè)時(shí)候,main相當(dāng)于最外出的一個(gè)task,那么處理包裝的main函數(shù)即可。

協(xié)程中生產(chǎn)-消費(fèi)模型設(shè)計(jì)

通過(guò)上面的動(dòng)態(tài)添加協(xié)程的思想,我們可以設(shè)計(jì)一個(gè)生產(chǎn)-消費(fèi)的模型,至于中間件(管道)是什么無(wú)所謂,下面以?xún)?nèi)置隊(duì)列和redis隊(duì)列來(lái)舉例說(shuō)明。

提示:若想主線程退出時(shí),子線程也隨之退出,需要將子線程設(shè)置為守護(hù)線程,函數(shù) setDaemon(True)

import asyncio
from threading import Thread
from collections import deque
import random
import time

def start_thread_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

def consumer():
    while True:
        if dq:
            msg = dq.pop()
            if msg:
                asyncio.run_coroutine_threadsafe(thread_example('Zarten'+ msg), new_loop)


async def thread_example(name):
    print('正在執(zhí)行name:', name)
    await asyncio.sleep(2)
    return '返回結(jié)果:' + name



dq = deque()

new_loop = asyncio.new_event_loop()
loop_thread = Thread(target= start_thread_loop, args=(new_loop,))
loop_thread.setDaemon(True)
loop_thread.start()

consumer_thread = Thread(target= consumer)
consumer_thread.setDaemon(True)
consumer_thread.start()

while True:
    i = random.randint(1, 10)
    dq.appendleft(str(i))
    time.sleep(2)

redis隊(duì)列模型

生產(chǎn)者代碼:

import redis

conn_pool = redis.ConnectionPool(host='127.0.0.1')
redis_conn = redis.Redis(connection_pool=conn_pool)

redis_conn.lpush('coro_test', '1')
redis_conn.lpush('coro_test', '2')
redis_conn.lpush('coro_test', '3')
redis_conn.lpush('coro_test', '4')

消費(fèi)者代碼:

import asyncio
from threading import Thread
import redis

def get_redis():
    conn_pool = redis.ConnectionPool(host= '127.0.0.1')
    return redis.Redis(connection_pool= conn_pool)

def start_thread_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

async def thread_example(name):
    print('正在執(zhí)行name:', name)
    await asyncio.sleep(2)
    return '返回結(jié)果:' + name


redis_conn = get_redis()

new_loop = asyncio.new_event_loop()
loop_thread = Thread(target= start_thread_loop, args=(new_loop,))
loop_thread.setDaemon(True)
loop_thread.start()

#循環(huán)接收redis消息并動(dòng)態(tài)加入?yún)f(xié)程
while True:
    msg = redis_conn.rpop('coro_test')
    if msg:
        asyncio.run_coroutine_threadsafe(thread_example('Zarten' + bytes.decode(msg, 'utf-8')), new_loop)

asyncio在aiohttp中的應(yīng)用

aiohttp是一個(gè)異步庫(kù),分為客戶(hù)端和服務(wù)端,下面只是簡(jiǎn)單對(duì)客戶(hù)端做個(gè)介紹以及一個(gè)經(jīng)常遇到的異常情況。

aiohttp客戶(hù)端為異步網(wǎng)絡(luò)請(qǐng)求庫(kù)

import asyncio
import aiohttp

count = 0

async def get_http(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as res:
            global count
            count += 1
            print(count, res.status)

def main():
    loop = asyncio.get_event_loop()
    url = 'https://www.baidu.com/s?ie=utf-8&f=8&rsv_bp=1&ch=&tn=baiduerr&bar=&wd={0}'
    tasks = [get_http(url.format(i)) for i in range(10)]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
if __name__ == '__main__':
    main()

aiohttp并發(fā)量太大的異常解決方案

在使用aiohttp客戶(hù)端進(jìn)行大量并發(fā)請(qǐng)求時(shí),程序會(huì)拋出 ValueError: too many file descriptors in select() 的錯(cuò)誤。

異常代碼示例

說(shuō)明:測(cè)試機(jī)器為windows系統(tǒng)

import asyncio
import aiohttp

count = 0

async def get_http(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as res:
            global count
            count += 1
            print(count, res.status)

def main():
    loop = asyncio.get_event_loop()
    url = 'https://www.baidu.com/s?ie=utf-8&f=8&rsv_bp=1&ch=&tn=baiduerr&bar=&wd={0}'
    tasks = [get_http(url.format(i)) for i in range(600)]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
if __name__ == '__main__':
    main()

原因分析:使用aiohttp時(shí),python內(nèi)部會(huì)使用select(),操作系統(tǒng)對(duì)文件描述符最大數(shù)量有限制,linux為1024個(gè),windows為509個(gè)。

解決方案:

最常見(jiàn)的解決方案是:限制并發(fā)數(shù)量(一般500),若并發(fā)的量不大可不作限制。其他方案這里不做介紹,如windows下使用loop = asyncio.ProactorEventLoop() 以及使用回調(diào)方式等

限制并發(fā)數(shù)量方法

提示:此方法也可用來(lái)作為異步爬蟲(chóng)的限速方法(反反爬)

使用semaphore = asyncio.Semaphore(500) 以及在協(xié)程中使用 async with semaphore: 操作

具體代碼如下:

import asyncio
import aiohttp


async def get_http(url):
    async with semaphore:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as res:
                global count
                count += 1
                print(count, res.status)

if __name__ == '__main__':
    count = 0

    semaphore = asyncio.Semaphore(500)
    loop = asyncio.get_event_loop()
    url = 'https://www.baidu.com/s?ie=utf-8&f=8&rsv_bp=1&ch=&tn=baiduerr&bar=&wd={0}'
    tasks = [get_http(url.format(i)) for i in range(600)]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

在線程或進(jìn)程池中執(zhí)行代碼

在《流暢的python》中有這樣一段話。

函數(shù)(例如io讀寫(xiě),requests網(wǎng)絡(luò)請(qǐng)求)阻塞了客戶(hù)代碼與asycio事件循環(huán)的唯一線程,因此在執(zhí)行調(diào)用時(shí),整個(gè)應(yīng)用程序都會(huì)凍結(jié)。這個(gè)問(wèn)題的解決方法是,使用事件循環(huán)對(duì)象的 run_in_executor方法。

asyncio的事件循環(huán)在背后維護(hù)著一個(gè)ThreadPoolExecutor對(duì)象,我們可以調(diào)用run_in_executor方法,把可調(diào)用對(duì)象發(fā)給它執(zhí)行。

import asyncio
from time import sleep, strftime
from concurrent import futures

executor = futures.ThreadPoolExecutor(max_workers=5)


async def blocked_sleep(name, t):
    print(strftime('[%H:%M:%S]'), end=' ')
    print('sleep {} is running {}s'.format(name, t))
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(executor, sleep, t)
    print(strftime('[%H:%M:%S]'), end=' ')
    print('sleep {} is end'.format(name))
    return t


async def main():
    future = (blocked_sleep(i, i) for i in range(1, 6))
    fs = asyncio.gather(*future)
    return await fs


loop = asyncio.get_event_loop()
results = loop.run_until_complete(main())
print('results: {}'.format(results))

在同一個(gè)線程里,兩個(gè) event loop 無(wú)法同時(shí) run,但這不能阻止您用兩個(gè)線程分別跑兩個(gè) event loop,

其次再說(shuō) ThreadPoolExecutor。您也可以看到,它根本不是 asyncio 庫(kù)的東西。當(dāng)您創(chuàng)建一個(gè) ThreadPoolExecutor 對(duì)象時(shí),您實(shí)際上是創(chuàng)建了一個(gè)線程池。

僅此而已,與 asyncio、event loop 并無(wú)瓜葛。而當(dāng)您明確使用一個(gè) event loop 的 run_in_executor() 方法時(shí),其實(shí)底層做的只有兩件事:

1,用線程池執(zhí)行給定函數(shù),與 asyncio 毫無(wú)關(guān)系;

2,給線程池執(zhí)行結(jié)果增加一個(gè)回調(diào),該回調(diào)會(huì)在 event loop 的下一次循環(huán)中保存執(zhí)行結(jié)果。

所以 run_in_executor() 只是將傳統(tǒng)的線程池結(jié)果拉回到給定 event loop 中,以便進(jìn)一步處理而已,不存在誰(shuí)共享誰(shuí)的關(guān)系,指定誰(shuí)是誰(shuí)。您可以嘗試一下,在多個(gè)線程中跑多個(gè) event loop,然后都向同一個(gè)線程池扔任務(wù),然后返回結(jié)果:

import asyncio
import threading
import time
from concurrent.futures.thread import ThreadPoolExecutor

e = ThreadPoolExecutor()


def worker(index):
    print(index, 'before:', time.strftime('%X'))
    time.sleep(1)
    print(index, 'after:', time.strftime('%X'))
    return index


def main(index):
    loop = asyncio.new_event_loop()
    res = loop.run_until_complete(loop.run_in_executor(e, worker, index))
    print('Thread', index, 'got result', res)


threads = []
for i in range(5):
    t = threading.Thread(target=main, args=(i,))
    t.start()
    threads.append(t)

for t in threads:
    t.join()

不同于上面的方法,這里是把阻塞的方法放到新的線程里跑。

總結(jié)

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • 基于python實(shí)現(xiàn)MQTT發(fā)布訂閱過(guò)程原理解析

    基于python實(shí)現(xiàn)MQTT發(fā)布訂閱過(guò)程原理解析

    這篇文章主要介紹了基于python實(shí)現(xiàn)MQTT發(fā)布訂閱過(guò)程原理解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-07-07
  • 對(duì)Django外鍵關(guān)系的描述

    對(duì)Django外鍵關(guān)系的描述

    今天小編就為大家分享一篇對(duì)Django外鍵關(guān)系的描述,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2019-07-07
  • Python實(shí)現(xiàn)統(tǒng)計(jì)代碼行的方法分析

    Python實(shí)現(xiàn)統(tǒng)計(jì)代碼行的方法分析

    這篇文章主要介紹了Python實(shí)現(xiàn)統(tǒng)計(jì)代碼行的方法,結(jié)合實(shí)例形式分析了Python針對(duì)代碼行數(shù)的計(jì)算實(shí)現(xiàn)步驟與操作技巧,需要的朋友可以參考下
    2017-07-07
  • 使用pyecharts在jupyter notebook上繪圖

    使用pyecharts在jupyter notebook上繪圖

    這篇文章主要介紹了使用pyecharts在jupyter notebook上繪圖,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2017-07-07
  • pytorch模型的保存和加載、checkpoint操作

    pytorch模型的保存和加載、checkpoint操作

    這篇文章主要介紹了pytorch模型的保存和加載、checkpoint操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-06-06
  • Python re.split方法分割字符串的實(shí)現(xiàn)示例

    Python re.split方法分割字符串的實(shí)現(xiàn)示例

    本文主要介紹了Python re.split方法分割字符串的實(shí)現(xiàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2022-08-08
  • python使用SimpleXMLRPCServer實(shí)現(xiàn)簡(jiǎn)單的rpc過(guò)程

    python使用SimpleXMLRPCServer實(shí)現(xiàn)簡(jiǎn)單的rpc過(guò)程

    這篇文章主要介紹了python使用SimpleXMLRPCServer實(shí)現(xiàn)簡(jiǎn)單的rpc過(guò)程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-06-06
  • python游戲地圖最短路徑求解

    python游戲地圖最短路徑求解

    這篇文章主要為大家詳細(xì)介紹了python游戲地圖最短路徑的求解,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2019-01-01
  • python re.sub()替換正則的匹配內(nèi)容方法

    python re.sub()替換正則的匹配內(nèi)容方法

    今天小編就為大家分享一篇python re.sub()替換正則的匹配內(nèi)容方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2019-07-07
  • Python實(shí)戰(zhàn)之能監(jiān)控文件變化的神器—看門(mén)狗

    Python實(shí)戰(zhàn)之能監(jiān)控文件變化的神器—看門(mén)狗

    這篇文章主要介紹了Python實(shí)戰(zhàn)之能監(jiān)控文件變化的神器—看門(mén)狗,文中有非常詳細(xì)的圖文及代碼示例,對(duì)正在學(xué)習(xí)python的小伙伴們有非常好的幫助,需要的朋友可以參考下
    2021-05-05

最新評(píng)論