python協(xié)程庫(kù)asyncio(異步io)問(wèn)題
介紹
- 異步IO:就是發(fā)起一個(gè)IO操作(如:網(wǎng)絡(luò)請(qǐng)求,文件讀寫(xiě)等),這些操作一般是比較耗時(shí)的,不用等待它結(jié)束,可以繼續(xù)做其他事情,結(jié)束時(shí)會(huì)發(fā)來(lái)通知。
- 協(xié)程:又稱(chēng)為微線程,在一個(gè)線程中執(zhí)行,執(zhí)行函數(shù)時(shí)可以隨時(shí)中斷,由程序(用戶(hù))自身控制,執(zhí)行效率極高,與多線程比較,沒(méi)有切換線程的開(kāi)銷(xiāo)和多線程鎖機(jī)制。
asyncio中幾個(gè)重要概念
1.事件循環(huán)
事件循環(huán)是每個(gè) asyncio 應(yīng)用的核心,管理所有的事件,在整個(gè)程序運(yùn)行過(guò)程中不斷循環(huán)執(zhí)行并追蹤事件發(fā)生的順序?qū)⑺鼈兎旁陉?duì)列中,空閑時(shí)調(diào)用相應(yīng)的事件處理者來(lái)處理這些事件。
創(chuàng)建事件循環(huán)
loop = asyncio.get_event_loop()
獲取當(dāng)前事件循環(huán)。
如果當(dāng)前 OS 線程沒(méi)有設(shè)置當(dāng)前事件循環(huán)并且 set_event_loop() 還沒(méi)有被調(diào)用,asyncio 將創(chuàng)建一個(gè)新的事件循環(huán)并將其設(shè)置為當(dāng)前循環(huán)。
另起一個(gè)線程創(chuàng)建事件循環(huán)
from threading import Thread import asyncio def start_thread_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() new_loop = asyncio.new_event_loop() loop_thread = Thread(target=start_thread_loop, args=(new_loop,)) loop_thread.setDaemon(True) # 守護(hù)線程 loop_thread.start()
2.Future
Future對(duì)象表示尚未完成的計(jì)算,還未完成的結(jié)果,它和task上沒(méi)有本質(zhì)上的區(qū)別
3.Task
是Future的子類(lèi),作用是在運(yùn)行某個(gè)任務(wù)的同時(shí)可以并發(fā)的運(yùn)行多個(gè)任務(wù)。asyncio.Task用于實(shí)現(xiàn)協(xié)作式多任務(wù)的庫(kù),且Task對(duì)象不能用戶(hù)手動(dòng)實(shí)例化,通過(guò)下面2個(gè)函數(shù)創(chuàng)建:loop.create_task() 或 asyncio.ensure_future()
- loop.create_task() ,要在定義loop對(duì)象之后,調(diào)用將方法對(duì)象轉(zhuǎn)化成了task的對(duì)象
- asyncio.ensure_future() 直接調(diào)用asyncio 的ensure_future() 方法,返回的也是task 對(duì)象(我們還沒(méi)有聲明 loop 也可以提前定義好 task 對(duì)象)
4.async/await 關(guān)鍵字
asyncio實(shí)現(xiàn)了TCP、UDP、SSL等協(xié)議,async定義一個(gè)協(xié)程,await用于掛起阻塞的異步調(diào)用接口。對(duì)于異步io你需要知道的重點(diǎn),要注意的是,await語(yǔ)法只能出現(xiàn)在通過(guò)async修飾的函數(shù)中,否則會(huì)報(bào)SyntaxError錯(cuò)誤。而且await后面的對(duì)象需要是一個(gè)Awaitable,或者實(shí)現(xiàn)了相關(guān)的協(xié)議。
注意
- 所有需要異步執(zhí)行的函數(shù),都需要asyncio中的輪詢(xún)器去輪詢(xún)執(zhí)行,如果函數(shù)阻塞,輪詢(xún)器就會(huì)去執(zhí)行下一個(gè)函數(shù)。所以所有需要異步執(zhí)行的函數(shù)都需要加入到這個(gè)輪詢(xún)器中。
- 若在協(xié)程中需要有延時(shí)操作,應(yīng)該使用 await asyncio.sleep(),而不是使用time.sleep(),因?yàn)槭褂胻ime.sleep()后會(huì)釋放GIL,阻塞整個(gè)主線程,從而阻塞整個(gè)事件循環(huán)。
創(chuàng)建一個(gè)協(xié)程
使用async可以定義協(xié)程對(duì)象,使用await可以針對(duì)耗時(shí)的操作進(jìn)行掛起,就像生成器里的yield一樣,函數(shù)讓出控制權(quán)。協(xié)程遇到await,事件循環(huán)將會(huì)掛起該協(xié)程,執(zhí)行別的協(xié)程,直到其他的協(xié)程也掛起或者執(zhí)行完畢,再進(jìn)行下一個(gè)協(xié)程的執(zhí)行
耗時(shí)的操作一般是一些IO操作,例如網(wǎng)絡(luò)請(qǐng)求,文件讀取等。我們使用asyncio.sleep函數(shù)來(lái)模擬IO操作。協(xié)程的目的也是讓這些IO操作異步化。
簡(jiǎn)單例子
import asyncio async def execute(x): print('Number:', x) coroutine = execute(1) print('Coroutine:', coroutine) print('After calling execute') loop = asyncio.get_event_loop() loop.run_until_complete(coroutine) print('After calling loop')
首先我們引入了 asyncio 這個(gè)包,這樣我們才可以使用 async 和 await,然后我們使用 async 定義了一個(gè) execute() 方法,方法接收一個(gè)數(shù)字參數(shù),方法執(zhí)行之后會(huì)打印這個(gè)數(shù)字。
隨后我們直接調(diào)用了這個(gè)方法,然而這個(gè)方法并沒(méi)有執(zhí)行,而是返回了一個(gè) coroutine 協(xié)程對(duì)象。隨后我們使用 get_event_loop() 方法創(chuàng)建了一個(gè)事件循環(huán) loop,并調(diào)用了 loop 對(duì)象的 run_until_complete() 方法將協(xié)程注冊(cè)到事件循環(huán) loop 中,然后啟動(dòng)。最后我們才看到了 execute() 方法打印了輸出結(jié)果。
可見(jiàn),async 定義的方法就會(huì)變成一個(gè)無(wú)法直接執(zhí)行的 coroutine 對(duì)象,必須將其注冊(cè)到事件循環(huán)中才可以執(zhí)行。
進(jìn)階例子
多個(gè)任務(wù),定義一個(gè)task列表,使用asyncio.gather(*tasks) 或 asyncio.wait(tasks) 接收
import asyncio import time now = lambda: time.time() """ asyncio.gather主要集中在收集結(jié)果上。它等待一堆task并按給定的順序返回結(jié)果。 asyncio.wait等待task。而不是直接給你結(jié)果,它提供完成和待處理的任務(wù)。你必須手工收集結(jié)果。 asyncio.wait(tasks) ps:asyncio.wait([1,2,3]) 也可以使用 asyncio.gather(*tasks) ps: asyncio.gather(1,2,3),前者接受一個(gè)task列表,后者接收一堆task。 """ # 定義一個(gè)異步任務(wù) async def do_some_work(x): print("waiting:", x) # 模擬io阻塞 await asyncio.sleep(x) return "Done after {}s".format(x) async def main(loop): """ :param loop: loop.create_task(需要傳進(jìn)loop參數(shù)) :return: None """ coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) # asyncio.ensure_future tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] # loop.create_task(需要傳進(jìn)loop參數(shù)) # tasks = [ # loop.create_task(coroutine1), # loop.create_task(coroutine2), # loop.create_task(coroutine3) # ] # 返回 完成的 task object dones, pendings = await asyncio.wait(tasks) print(dones, pendings) for task in dones: print("Task ret:", task.result()) # 返回 task 方法的 返回值 # results = await asyncio.gather(*tasks) # for result in results: # print("Task ret:",result) start = now() loop = asyncio.get_event_loop() loop.run_until_complete(main(loop)) print("Time:", now() - start)
gather和wait 的區(qū)別
把多個(gè)協(xié)程注冊(cè)進(jìn)一個(gè)事件循環(huán)中的兩種方法
使用方式區(qū)別
1.使用asyncio.wait()
loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
2.使用asyncio.gather()
loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(*tasks)) # *接收args參數(shù)
接收參數(shù)區(qū)別
asyncio.wait
參數(shù)必須是list對(duì)象 ,list 對(duì)象存放多個(gè) task object
用asyncio.ensure_future轉(zhuǎn)為task對(duì)象
tasks=[ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
不轉(zhuǎn)為task對(duì)象
loop = asyncio.get_event_loop() tasks=[ coroutine1, coroutine2, coroutine3 ] loop.run_until_complete(asyncio.wait(tasks))
asyncio.gather
必須用 *
來(lái)接收 list 對(duì)象
tasks=[ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(*tasks))
返回結(jié)果區(qū)別
asyncio.wait
asyncio.wait
返回dones
和pendings
- dones:表示已經(jīng)完成的任務(wù)
- pendings:表示未完成的任務(wù)
我們需要手動(dòng)去獲取結(jié)果
dones, pendings = await asyncio.wait(tasks) print(dones, pendings) for task in dones: print("Task ret:", task.result())
asyncio.gather
它的返回值就是 return的結(jié)果 ,不用再task.result() 來(lái)獲取
# 返回 task 方法的 返回值 results = await asyncio.gather(*tasks) for result in results: print("Task ret:",result)
另 asyncio.wait 帶有控制功能
【控制運(yùn)行任務(wù)數(shù)】:運(yùn)行第一個(gè)任務(wù)就返回
- FIRST_COMPLETED :第一個(gè)任務(wù)完全返回
- FIRST_EXCEPTION:產(chǎn)生第一個(gè)異常返回
- ALL_COMPLETED:所有任務(wù)完成返回 (默認(rèn)選項(xiàng))
import asyncio import random async def coro(tag): print(">", tag) await asyncio.sleep(random.uniform(0.5, 5)) print("<", tag) return tag loop = asyncio.get_event_loop() tasks = [coro(i) for i in range(1, 11)] # 第一次wait 完成情況 print("Get first result:") finished, unfinished = loop.run_until_complete( asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)) # 第一個(gè)任務(wù)完全返回 for task in finished: print(task.result()) print("unfinished:", len(unfinished)) # 繼續(xù)第一次未完成任務(wù) print("Get more results in 2 seconds:") finished2, unfinished2 = loop.run_until_complete( asyncio.wait(unfinished, timeout=2)) # 超時(shí)2s 返回 for task in finished2: print(task.result()) print("unfinished2:", len(unfinished2)) # 繼續(xù)第2次未完成任務(wù) print("Get all other results:") finished3, unfinished3 = loop.run_until_complete(asyncio.wait(unfinished2)) # ALL_COMPLETED:所有任務(wù)完成返回 (默認(rèn)項(xiàng)) for task in finished3: print(task.result()) loop.close()
動(dòng)態(tài)添加協(xié)程
很多時(shí)候,我們的事件循環(huán)用于注冊(cè)協(xié)程,而有的協(xié)程需要?jiǎng)討B(tài)的添加到事件循環(huán)中。一個(gè)簡(jiǎn)單的方式就是使用多線程。當(dāng)前線程創(chuàng)建一個(gè)事件循環(huán),然后在新建一個(gè)線程,在新線程中啟動(dòng)事件循環(huán)。當(dāng)前線程不會(huì)被block
相關(guān)函數(shù)介紹:
- loop.call_soon_threadsafe() :與 call_soon()類(lèi)似,等待此函數(shù)返回后馬上調(diào)用回調(diào)函數(shù),返回值是一個(gè) asyncio.Handle 對(duì)象,此對(duì)象內(nèi)只有一個(gè)方法為 cancel()方法,用來(lái)取消回調(diào)函數(shù)。
- loop.call_soon() : 與call_soon_threadsafe()類(lèi)似,call_soon_threadsafe() 是線程安全的
- loop.call_later():延遲多少秒后執(zhí)行回調(diào)函數(shù)
- loop.call_at():在指定時(shí)間執(zhí)行回調(diào)函數(shù),這里的時(shí)間統(tǒng)一使用 loop.time() 來(lái)替代 time.sleep()
- asyncio.run_coroutine_threadsafe(): 動(dòng)態(tài)的加入?yún)f(xié)程,參數(shù)為一個(gè)回調(diào)函數(shù)和一個(gè)loop對(duì)象,返回值為future對(duì)象,通過(guò)future.result()獲取回調(diào)函數(shù)返回值
動(dòng)態(tài)添加協(xié)程同步方式通過(guò)調(diào)用 call_soon_threadsafe()函數(shù),傳入一個(gè)回調(diào)函數(shù)callback和一個(gè)位置參數(shù)
注意:同步方式,回調(diào)函數(shù) more_work()為普通函數(shù)
import asyncio from threading import Thread import time now = lambda: time.time() def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() def more_work(x): print('More work {}'.format(x)) time.sleep(x) print('Finished more work {}'.format(x)) start = now() new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.start() print('TIME: {}'.format(time.time() - start)) new_loop.call_soon_threadsafe(more_work, 6) new_loop.call_soon_threadsafe(more_work, 3) print('here')
啟動(dòng)上述代碼之后,當(dāng)前線程不會(huì)被block,新線程中會(huì)按照順序執(zhí)行call_soon_threadsafe方法注冊(cè)的more_work方法, 后者因?yàn)閠ime.sleep操作是同步阻塞的,因此運(yùn)行完畢more_work需要大致6 + 3
異步方式
import asyncio import time from threading import Thread now = lambda: time.time() def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() async def do_some_work(x): print('Waiting {}'.format(x)) await asyncio.sleep(x) print('Done after {}s'.format(x)) start = now() new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.start() print('TIME: {}'.format(time.time() - start)) asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop) asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)
上述的例子,主線程中創(chuàng)建一個(gè)new_loop,然后在另外的子線程中開(kāi)啟一個(gè)無(wú)限事件循環(huán)。
主線程通過(guò)run_coroutine_threadsafe新注冊(cè)協(xié)程對(duì)象。這樣就能在子線程中進(jìn)行事件循環(huán)的并發(fā)操作,同時(shí)主線程又不會(huì)被block。一共執(zhí)行的時(shí)間大概在6s左右。
協(xié)程的停止
future對(duì)象有幾個(gè)狀態(tài):
Pending
Running
Done
Cacelled
創(chuàng)建future的時(shí)候,task為pending,事件循環(huán)調(diào)用執(zhí)行的時(shí)候當(dāng)然就是running,調(diào)用完畢自然就是done,如果需要停止事件循環(huán),就需要先把task取消??梢允褂胊syncio.Task獲取事件循環(huán)的task
import asyncio import time now = lambda: time.time() async def do_some_work(x): print("Waiting:", x) await asyncio.sleep(x) return "Done after {}s".format(x) coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(2) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3), ] start = now() loop = asyncio.get_event_loop() try: loop.run_until_complete(asyncio.wait(tasks)) except KeyboardInterrupt as e: print(asyncio.Task.all_tasks()) for task in asyncio.Task.all_tasks(): print(task.cancel()) loop.stop() loop.run_forever() finally: loop.close() print("Time:", now() - start)
啟動(dòng)事件循環(huán)之后,馬上ctrl+c,會(huì)觸發(fā)run_until_complete的執(zhí)行異常 KeyBorardInterrupt。然后通過(guò)循環(huán)asyncio.Task取消future。
True表示cannel成功,loop stop之后還需要再次開(kāi)啟事件循環(huán),最后在close,不然還會(huì)拋出異常
循環(huán)task,逐個(gè)cancel是一種方案,可是正如上面我們把task的列表封裝在main函數(shù)中,main函數(shù)外進(jìn)行事件循環(huán)的調(diào)用。這個(gè)時(shí)候,main相當(dāng)于最外出的一個(gè)task,那么處理包裝的main函數(shù)即可。
協(xié)程中生產(chǎn)-消費(fèi)模型設(shè)計(jì)
通過(guò)上面的動(dòng)態(tài)添加協(xié)程的思想,我們可以設(shè)計(jì)一個(gè)生產(chǎn)-消費(fèi)的模型,至于中間件(管道)是什么無(wú)所謂,下面以?xún)?nèi)置隊(duì)列和redis隊(duì)列來(lái)舉例說(shuō)明。
提示:若想主線程退出時(shí),子線程也隨之退出,需要將子線程設(shè)置為守護(hù)線程,函數(shù) setDaemon(True)
import asyncio from threading import Thread from collections import deque import random import time def start_thread_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() def consumer(): while True: if dq: msg = dq.pop() if msg: asyncio.run_coroutine_threadsafe(thread_example('Zarten'+ msg), new_loop) async def thread_example(name): print('正在執(zhí)行name:', name) await asyncio.sleep(2) return '返回結(jié)果:' + name dq = deque() new_loop = asyncio.new_event_loop() loop_thread = Thread(target= start_thread_loop, args=(new_loop,)) loop_thread.setDaemon(True) loop_thread.start() consumer_thread = Thread(target= consumer) consumer_thread.setDaemon(True) consumer_thread.start() while True: i = random.randint(1, 10) dq.appendleft(str(i)) time.sleep(2)
redis隊(duì)列模型
生產(chǎn)者代碼:
import redis conn_pool = redis.ConnectionPool(host='127.0.0.1') redis_conn = redis.Redis(connection_pool=conn_pool) redis_conn.lpush('coro_test', '1') redis_conn.lpush('coro_test', '2') redis_conn.lpush('coro_test', '3') redis_conn.lpush('coro_test', '4')
消費(fèi)者代碼:
import asyncio from threading import Thread import redis def get_redis(): conn_pool = redis.ConnectionPool(host= '127.0.0.1') return redis.Redis(connection_pool= conn_pool) def start_thread_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() async def thread_example(name): print('正在執(zhí)行name:', name) await asyncio.sleep(2) return '返回結(jié)果:' + name redis_conn = get_redis() new_loop = asyncio.new_event_loop() loop_thread = Thread(target= start_thread_loop, args=(new_loop,)) loop_thread.setDaemon(True) loop_thread.start() #循環(huán)接收redis消息并動(dòng)態(tài)加入?yún)f(xié)程 while True: msg = redis_conn.rpop('coro_test') if msg: asyncio.run_coroutine_threadsafe(thread_example('Zarten' + bytes.decode(msg, 'utf-8')), new_loop)
asyncio在aiohttp中的應(yīng)用
aiohttp是一個(gè)異步庫(kù),分為客戶(hù)端和服務(wù)端,下面只是簡(jiǎn)單對(duì)客戶(hù)端做個(gè)介紹以及一個(gè)經(jīng)常遇到的異常情況。
aiohttp客戶(hù)端為異步網(wǎng)絡(luò)請(qǐng)求庫(kù)
import asyncio import aiohttp count = 0 async def get_http(url): async with aiohttp.ClientSession() as session: async with session.get(url) as res: global count count += 1 print(count, res.status) def main(): loop = asyncio.get_event_loop() url = 'https://www.baidu.com/s?ie=utf-8&f=8&rsv_bp=1&ch=&tn=baiduerr&bar=&wd={0}' tasks = [get_http(url.format(i)) for i in range(10)] loop.run_until_complete(asyncio.wait(tasks)) loop.close() if __name__ == '__main__': main()
aiohttp并發(fā)量太大的異常解決方案
在使用aiohttp客戶(hù)端進(jìn)行大量并發(fā)請(qǐng)求時(shí),程序會(huì)拋出 ValueError: too many file descriptors in select() 的錯(cuò)誤。
異常代碼示例
說(shuō)明:測(cè)試機(jī)器為windows系統(tǒng)
import asyncio import aiohttp count = 0 async def get_http(url): async with aiohttp.ClientSession() as session: async with session.get(url) as res: global count count += 1 print(count, res.status) def main(): loop = asyncio.get_event_loop() url = 'https://www.baidu.com/s?ie=utf-8&f=8&rsv_bp=1&ch=&tn=baiduerr&bar=&wd={0}' tasks = [get_http(url.format(i)) for i in range(600)] loop.run_until_complete(asyncio.wait(tasks)) loop.close() if __name__ == '__main__': main()
原因分析:使用aiohttp時(shí),python內(nèi)部會(huì)使用select(),操作系統(tǒng)對(duì)文件描述符最大數(shù)量有限制,linux為1024個(gè),windows為509個(gè)。
解決方案:
最常見(jiàn)的解決方案是:限制并發(fā)數(shù)量(一般500),若并發(fā)的量不大可不作限制。其他方案這里不做介紹,如windows下使用loop = asyncio.ProactorEventLoop() 以及使用回調(diào)方式等
限制并發(fā)數(shù)量方法
提示:此方法也可用來(lái)作為異步爬蟲(chóng)的限速方法(反反爬)
使用semaphore = asyncio.Semaphore(500) 以及在協(xié)程中使用 async with semaphore: 操作
具體代碼如下:
import asyncio import aiohttp async def get_http(url): async with semaphore: async with aiohttp.ClientSession() as session: async with session.get(url) as res: global count count += 1 print(count, res.status) if __name__ == '__main__': count = 0 semaphore = asyncio.Semaphore(500) loop = asyncio.get_event_loop() url = 'https://www.baidu.com/s?ie=utf-8&f=8&rsv_bp=1&ch=&tn=baiduerr&bar=&wd={0}' tasks = [get_http(url.format(i)) for i in range(600)] loop.run_until_complete(asyncio.wait(tasks)) loop.close()
在線程或進(jìn)程池中執(zhí)行代碼
在《流暢的python》中有這樣一段話。
函數(shù)(例如io讀寫(xiě),requests網(wǎng)絡(luò)請(qǐng)求)阻塞了客戶(hù)代碼與asycio事件循環(huán)的唯一線程,因此在執(zhí)行調(diào)用時(shí),整個(gè)應(yīng)用程序都會(huì)凍結(jié)。這個(gè)問(wèn)題的解決方法是,使用事件循環(huán)對(duì)象的 run_in_executor方法。
asyncio的事件循環(huán)在背后維護(hù)著一個(gè)ThreadPoolExecutor對(duì)象,我們可以調(diào)用run_in_executor方法,把可調(diào)用對(duì)象發(fā)給它執(zhí)行。
import asyncio from time import sleep, strftime from concurrent import futures executor = futures.ThreadPoolExecutor(max_workers=5) async def blocked_sleep(name, t): print(strftime('[%H:%M:%S]'), end=' ') print('sleep {} is running {}s'.format(name, t)) loop = asyncio.get_event_loop() await loop.run_in_executor(executor, sleep, t) print(strftime('[%H:%M:%S]'), end=' ') print('sleep {} is end'.format(name)) return t async def main(): future = (blocked_sleep(i, i) for i in range(1, 6)) fs = asyncio.gather(*future) return await fs loop = asyncio.get_event_loop() results = loop.run_until_complete(main()) print('results: {}'.format(results))
在同一個(gè)線程里,兩個(gè) event loop 無(wú)法同時(shí) run,但這不能阻止您用兩個(gè)線程分別跑兩個(gè) event loop,
其次再說(shuō) ThreadPoolExecutor。您也可以看到,它根本不是 asyncio 庫(kù)的東西。當(dāng)您創(chuàng)建一個(gè) ThreadPoolExecutor 對(duì)象時(shí),您實(shí)際上是創(chuàng)建了一個(gè)線程池。
僅此而已,與 asyncio、event loop 并無(wú)瓜葛。而當(dāng)您明確使用一個(gè) event loop 的 run_in_executor() 方法時(shí),其實(shí)底層做的只有兩件事:
1,用線程池執(zhí)行給定函數(shù),與 asyncio 毫無(wú)關(guān)系;
2,給線程池執(zhí)行結(jié)果增加一個(gè)回調(diào),該回調(diào)會(huì)在 event loop 的下一次循環(huán)中保存執(zhí)行結(jié)果。
所以 run_in_executor() 只是將傳統(tǒng)的線程池結(jié)果拉回到給定 event loop 中,以便進(jìn)一步處理而已,不存在誰(shuí)共享誰(shuí)的關(guān)系,指定誰(shuí)是誰(shuí)。您可以嘗試一下,在多個(gè)線程中跑多個(gè) event loop,然后都向同一個(gè)線程池扔任務(wù),然后返回結(jié)果:
import asyncio import threading import time from concurrent.futures.thread import ThreadPoolExecutor e = ThreadPoolExecutor() def worker(index): print(index, 'before:', time.strftime('%X')) time.sleep(1) print(index, 'after:', time.strftime('%X')) return index def main(index): loop = asyncio.new_event_loop() res = loop.run_until_complete(loop.run_in_executor(e, worker, index)) print('Thread', index, 'got result', res) threads = [] for i in range(5): t = threading.Thread(target=main, args=(i,)) t.start() threads.append(t) for t in threads: t.join()
不同于上面的方法,這里是把阻塞的方法放到新的線程里跑。
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
基于python實(shí)現(xiàn)MQTT發(fā)布訂閱過(guò)程原理解析
這篇文章主要介紹了基于python實(shí)現(xiàn)MQTT發(fā)布訂閱過(guò)程原理解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-07-07Python實(shí)現(xiàn)統(tǒng)計(jì)代碼行的方法分析
這篇文章主要介紹了Python實(shí)現(xiàn)統(tǒng)計(jì)代碼行的方法,結(jié)合實(shí)例形式分析了Python針對(duì)代碼行數(shù)的計(jì)算實(shí)現(xiàn)步驟與操作技巧,需要的朋友可以參考下2017-07-07使用pyecharts在jupyter notebook上繪圖
這篇文章主要介紹了使用pyecharts在jupyter notebook上繪圖,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2017-07-07Python re.split方法分割字符串的實(shí)現(xiàn)示例
本文主要介紹了Python re.split方法分割字符串的實(shí)現(xiàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-08-08python使用SimpleXMLRPCServer實(shí)現(xiàn)簡(jiǎn)單的rpc過(guò)程
這篇文章主要介紹了python使用SimpleXMLRPCServer實(shí)現(xiàn)簡(jiǎn)單的rpc過(guò)程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-06-06python re.sub()替換正則的匹配內(nèi)容方法
今天小編就為大家分享一篇python re.sub()替換正則的匹配內(nèi)容方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-07-07Python實(shí)戰(zhàn)之能監(jiān)控文件變化的神器—看門(mén)狗
這篇文章主要介紹了Python實(shí)戰(zhàn)之能監(jiān)控文件變化的神器—看門(mén)狗,文中有非常詳細(xì)的圖文及代碼示例,對(duì)正在學(xué)習(xí)python的小伙伴們有非常好的幫助,需要的朋友可以參考下2021-05-05