Python asyncio的基本使用示例教程
Asyncio在python3.6、3.7的基本使用方式
- 什么是事件循環(huán)(event loop):
? 一方面,它類(lèi)似于 CPU ,順序執(zhí)行協(xié)程的代碼;另一方面,它相當(dāng)于操作系統(tǒng),完成協(xié)程的調(diào)度,即一個(gè)協(xié)程“暫停”時(shí),決定接下來(lái)執(zhí)行哪個(gè)協(xié)程。
- 可等待對(duì)象(Future、協(xié)程、Task):
? 把可等待對(duì)象注冊(cè)進(jìn)事件循環(huán)里,事件循環(huán)會(huì)安排他們執(zhí)行并返回結(jié)果
import asyncio # 用async定義一個(gè)協(xié)程 async def wait_and_print(wait_time, name): # 這是一個(gè)模擬io阻塞的定時(shí)器,在sleep(或者io)的時(shí)候, # 協(xié)程會(huì)把控制權(quán)交還給event_loop,讓他去執(zhí)行別的協(xié)程 await asyncio.sleep(wait_time) print(f'wait_time:{wait_time}, name:{name}') # 直接 wait_and_print(2, 'a') 這樣不能運(yùn)行一個(gè)協(xié)程,要將它注冊(cè)入事件循環(huán)中 # 在python3.7 # asyncio.run(wait_and_print(2, 'a')) # 在python3.6 loop = asyncio.get_event_loop() # run_until_complete 這個(gè)函數(shù)會(huì)阻塞運(yùn)行直到里面的協(xié)程運(yùn)行完畢返回結(jié)果 loop.run_until_complete(wait_and_print(2, 'a'))
結(jié)果輸出:wait_time:2, name:a
常用到的對(duì)象和函數(shù)及其使用場(chǎng)景
協(xié)程對(duì)象:要運(yùn)行一個(gè)協(xié)程對(duì)象,必須把他注冊(cè)到事件循環(huán)里,那么有三種方法
- 直接注冊(cè)到event_loop中
- 在別的協(xié)程中await它,間接的注冊(cè)到事件循環(huán)里
- 把它包裝成一個(gè)Task對(duì)象,此時(shí)會(huì)直接注冊(cè)入’下一輪’的事件循環(huán)中
import asyncio async def wait_and_print(wait_time, name): # await會(huì)阻塞直到后面跟的協(xié)程運(yùn)行完畢返回結(jié)果 await asyncio.sleep(wait_time) print(f'wait_time:{wait_time}, name:{name}') # # 1.直接注冊(cè)入事件循環(huán) # loop = asyncio.get_event_loop() # loop.run_until_complete(wait_and_print(2, 'a')) # # # # 2.間接注冊(cè)入事件循環(huán) # # 在其他協(xié)程中await它,再把那個(gè)協(xié)程注冊(cè)入事件循環(huán) # async def run(): # await wait_and_print(2, 'a') # # # loop = asyncio.get_event_loop() # loop.run_until_complete(run()) # # 3.打包成Task對(duì)象 # async def run(): # 關(guān)于下面兩個(gè)函數(shù)的區(qū)別后面會(huì)在說(shuō)完Task和Future對(duì)象后進(jìn)行討論 # python3.6 # asyncio.ensure_future(wait_and_print(2, 'a')) # python3.7 print(asyncio.create_task(wait_and_print(2, 'a'))) await asyncio.sleep(3) loop = asyncio.get_event_loop() loop.run_until_complete(run())
結(jié)果輸出:
<Task pending coro=<wait_and_print() running at /Users/frostw/Documents/share_new/share/第二部分/1.py:4>>
wait_time:2, name:a
Task
把協(xié)程對(duì)象打包成一個(gè)Task,這個(gè)過(guò)程會(huì)直接注冊(cè)這個(gè)協(xié)程進(jìn)入事件循環(huán)中
它的主要作用是幫助event_loop進(jìn)行調(diào)度,如果Task發(fā)現(xiàn)它打包的協(xié)程在await一個(gè)對(duì)象(執(zhí)行一個(gè)io操作),則該Task會(huì)掛起該協(xié)程的執(zhí)行,把控制權(quán)交回event_loop,讓他去運(yùn)行其他Task,當(dāng)該對(duì)象運(yùn)行完畢,Task會(huì)將打包的協(xié)程恢復(fù)運(yùn)行
Future
Task的父類(lèi),一個(gè)比較底層的對(duì)象,用于支持底層回調(diào)式代碼與高層異步/等待式代碼交互,它有一些方法,例如result(),set_result(),done(),cancelled(),cancel()等,但是一般不會(huì)使用Future對(duì)象,如果要使用也是在Task對(duì)象使用
enture_future()和create_task()
一般的使用就是傳入一個(gè)協(xié)程對(duì)象,然后打包成一個(gè)task對(duì)象后返回,但是enture_future()還有別的用法,下面是python作者相關(guān)的討論
大概的翻譯如下:
- 如果有個(gè)可能是協(xié)程或者future或者task的對(duì)象,你希望能對(duì)它使用僅在Future類(lèi)中定義過(guò)的方法(這里或許唯一能舉出的有用的函數(shù)的例子就是cancel()),就對(duì)它使用entrue_future。這個(gè)函數(shù)會(huì)在當(dāng)傳入的對(duì)象已經(jīng)是一個(gè)Future或者Task,就啥也不干,原封不動(dòng)返回它,當(dāng)它是一個(gè)協(xié)程則把他打包成一個(gè)Task并返回。
- 現(xiàn)版本來(lái)說(shuō)(python3.7)如果你就是想把一個(gè)協(xié)程注冊(cè)入事件循環(huán)運(yùn)行,那就該用create_task(),如果是想提供一個(gè)底層些的API(類(lèi)似asyncio自帶的api)時(shí)候要用到Future對(duì)象,那就用enture_future()
asyncio.wait_for()
- wait_for會(huì)檢查傳進(jìn)去的對(duì)象,如果是協(xié)程則會(huì)打包成task
- 等待可等待對(duì)象完成,并可以設(shè)置超時(shí)
- 這里涉及到一個(gè)取消task/future的概念,也就是之前提到的future.cancel(),如果超時(shí)則把future取消
import asyncio async def wait_and_print(wait_time, name): await asyncio.sleep(wait_time) print(f'wait_time:{wait_time}, name:{name}') async def run(): # 為了我們方便查看這個(gè)任務(wù)的后續(xù)狀態(tài),先打包成一個(gè)task_ task_ = asyncio.create_task(wait_and_print(1000, 'a')) try: await asyncio.wait_for(task_, timeout=3) except asyncio.TimeoutError: print('發(fā)現(xiàn)任務(wù)超時(shí),已被取消') # 查看被取消后的狀態(tài) print(task_) print(task_.cancelled()) asyncio.run(run())
結(jié)果輸出:
發(fā)現(xiàn)任務(wù)超時(shí),已被取消
<Task cancelled coro=<wait_and_print() done, defined at /Users/frostw/Documents/share_new/share/第二部分/2.py:4>>
True
asyncio.gather()
- 并發(fā)的運(yùn)行協(xié)程,或者task,并把他們的結(jié)果按照順序放在列表中返回
- gather會(huì)檢查傳進(jìn)去的對(duì)象,如果是協(xié)程則會(huì)打包成task
import asyncio async def wait_and_print(wait_time, name): await asyncio.sleep(wait_time) print(f'wait_time:{wait_time}, name:{name}') # 添加一個(gè)return return f'wait_time:{wait_time}, name:{name}' # loop = asyncio.get_event_loop() # # 會(huì)把三個(gè)協(xié)程打包成任務(wù)task加入循環(huán) # gather_result = loop.run_until_complete(asyncio.gather(wait_and_print(1, 'a'), # wait_and_print(2, 'b'), # wait_and_print(1, 'c'))) # print(gather_result) async def wait_and_cancel_task(wait_time, task): await asyncio.sleep(wait_time) task.cancel() print(f'{task} 被取消了') return f'{task} 被取消了' async def run(): # 封裝一個(gè)b_task b_task = asyncio.create_task(wait_and_print(5, 'b')) print(b_task) gather_ = asyncio.gather(wait_and_print(1, 'a'), b_task, wait_and_print(3, 'c'), # 在這里把他給取消了 wait_and_cancel_task(4, b_task), return_exceptions=True) print(await gather_) loop = asyncio.get_event_loop() loop.run_until_complete(run())
結(jié)果輸出:
<Task pending coro=<wait_and_print() running at /Users/frostw/Documents/share_new/share/第二部分/3.py:4>>
wait_time:1, name:a
wait_time:3, name:c
<Task pending coro=<wait_and_print() running at /Users/frostw/Documents/share_new/share/第二部分/3.py:5> wait_for=<Future cancelled> cb=[gather.<locals>._done_callback() at /Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:691]> 被取消了
['wait_time:1, name:a', CancelledError(), 'wait_time:3, name:c', '<Task pending coro=<wait_and_print() running at /Users/frostw/Documents/share_new/share/第二部分/3.py:5> wait_for=<Future cancelled> cb=[gather.<locals>._done_callback() at /Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:691]> 被取消了']
- 協(xié)程并不一定按順序執(zhí)行,但結(jié)果是按順序返回
- 關(guān)于被取消:
- return_exceptions 參數(shù)為False(默認(rèn)):所有協(xié)程任務(wù)所引發(fā)的首個(gè)異常會(huì)立刻作為gather的結(jié)果返回,但其他任務(wù)不會(huì)被取消
- return_exceptions 參數(shù)為T(mén)rue:所有協(xié)程任務(wù)所引發(fā)的異常不會(huì)立刻返回,會(huì)聚合在列表中當(dāng)作普通結(jié)果返回
- 如果gather本身被取消,那它所有的協(xié)程任務(wù)都會(huì)被取消
結(jié)合例子討論loop的調(diào)度過(guò)程
官網(wǎng)示例1
import asyncio async def compute(x, y): print("Compute %s + %s ..." % (x, y)) await asyncio.sleep(1.0) return x + y async def print_sum(x, y): result = await compute(x, y) print("%s + %s = %s" % (x, y, result)) loop = asyncio.get_event_loop() loop.run_until_complete(print_sum(1, 2)) loop.close()
示例2
import asyncio async def wait_and_print(wait_time, name): await asyncio.sleep(wait_time) print(f'wait_time:{wait_time}, name:{name}') async def run(): asyncio.create_task(wait_and_print(1, 'a')) await wait_and_print(1, 'b') if __name__ == '__main__': asyncio.run(run()) # 原因:create_task會(huì)把任務(wù)自動(dòng)安排到下一輪事件循環(huán)
結(jié)果輸出:
wait_time:1, name:b
wait_time:1, name:a
示例3
import asyncio async def wait_and_print(wait_time, name): await asyncio.sleep(wait_time) print(f'wait_time:{wait_time}, name:{name}') async def run(): asyncio.create_task(wait_and_print(1, 'a')) await asyncio.sleep(0) await wait_and_print(1, 'b') if __name__ == '__main__': asyncio.run(run()) # 原因:await asyncio.sleep(0)會(huì)使得task將控制權(quán)交還給loop進(jìn)行下一個(gè)任務(wù)
結(jié)果輸出:
wait_time:1, name:a
wait_time:1, name:b
示例4
import asyncio async def wait_and_print(wait_times, name): await asyncio.sleep(wait_times) print(f'wait_times:{wait_times} name:{name}') async def run(): asyncio.ensure_future(asyncio.ensure_future(wait_and_print(1, 'f'))) asyncio.ensure_future(wait_and_print(1, 'b')) asyncio.ensure_future(asyncio.gather(wait_and_print(1, 'd'))) asyncio.create_task(wait_and_print(1, 'a')) await wait_and_print(1, 'c') asyncio.ensure_future(asyncio.gather(wait_and_print(2, 'g'), wait_and_print(0, 'h'))) await wait_and_print(1, 'e') # c>f>b>d>a>h>e> if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(run())
wait_times:1 name:c
wait_times:1 name:f
wait_times:1 name:b
wait_times:1 name:d
wait_times:1 name:a
wait_times:0 name:h
wait_times:1 name:e
基于Asyncio的發(fā)布訂閱程序
import asyncio import random async def mama(zhuo_zi): while True: zheng_bao_zi = random.randint(1, 10) await asyncio.sleep(zheng_bao_zi) pan_zi = random.randint(1, 10) [await zhuo_zi.put(bao_zi) for bao_zi in range(pan_zi)] print(f'媽媽花了{(lán)zheng_bao_zi}秒,上桌了{(lán)pan_zi}個(gè)包子') print(f'現(xiàn)在桌子上的包子總數(shù):{zhuo_zi.qsize()}') async def xiao_ming(zhuo_zi): while True: if zhuo_zi.qsize() == 0: print(f'小明發(fā)現(xiàn)沒(méi)包子了,等一會(huì)') await asyncio.sleep(1) else: await zhuo_zi.get() chi_bao_zi = random.randint(1, 3) await asyncio.sleep(chi_bao_zi) print(f'小明花了{(lán)chi_bao_zi}秒吃了個(gè)包子') async def run(mama_number, xiao_ming_number): zhuo_zi = asyncio.Queue() mama_tasks = [asyncio.create_task(mama(zhuo_zi)) for number in range(mama_number)] xiao_ming_tasks = [asyncio.create_task(xiao_ming(zhuo_zi)) for number in range(xiao_ming_number)] return mama_tasks, xiao_ming_tasks if __name__ == '__main__': loop = asyncio.get_event_loop() mama_tasks, xiao_ming_tasks = loop.run_until_complete(run(mama_number=3, xiao_ming_number=8)) loop.run_forever()
簡(jiǎn)易的通過(guò)協(xié)程池發(fā)送請(qǐng)求的爬蟲(chóng)
import asyncio import logging import time import json log = logging def parsed_response(response): """ 解析 response :param : response byte :return: status_code int headers dict str body """ r = response.decode() header, body = r.split('\r\n\r\n', 1) h = header.split('\r\n') status_code = int(h[0].split()[1]) headers = {} for line in h[1:]: k, v = line.split(': ') headers[k] = v return status_code, headers, body def parse_url(url): """ 對(duì) url 進(jìn)行字符串拆分 :param url: http://httpbin.org/headers :return: (protocol, host, port, path) """ # 檢查協(xié)議 protocol = 'http' if url[:7] == 'http://': u = url.split('://')[1] elif url[:8] == 'https://': protocol = 'https' u = url.split('://')[1] else: u = url # https://g.cn:1234/hello # g.cn:1234/hello # 檢查默認(rèn) path i = u.find('/') if i == -1: host = u path = '/' else: host = u[:i] path = u[i:] # 檢查端口 port_dict = { 'http': 80, 'https': 443, } # 默認(rèn)端口 port = port_dict[protocol] if ':' in host: h = host.split(':') host = h[0] port = int(h[1]) return protocol, host, port, path def save_response(queue, save_name='data'): """ 將一個(gè)隊(duì)列里的數(shù)據(jù)保存到j(luò)son文本 :param queue: asyncio.Queue :param save_name: str json文本的name :return: """ data = [] for i in range(queue.qsize()): status_code, headers, body = queue.get_nowait() save_dict = dict(id=i, status_code=status_code, headers=headers, body=body) data.append(save_dict) s = json.dumps(data, indent=2, ensure_ascii=False) with open(save_name, 'w', encoding='utf-8') as f: f.write(s) async def connect(host, port, path='headers', tls=False): """ :param host: str: httpbin.org :param port: int: 80/443 :param tls: bool :param path: str: headers/ :return: response content (bytes) """ log.debug(f'connect to {host}:{port}, tls:{tls}') if tls is True: protocol = 'https' reader, writer = await asyncio.open_connection( host, port, ssl=True) else: protocol = 'http' reader, writer = await asyncio.open_connection( host, port) log.debug(f'{host}:{port}, tls:{tls} connected') request = f'GET {protocol}://{host}:{port}/{path} HTTP/1.1\r\nHost:{host}\r\n\r\n' # 由于設(shè)置了timeout, 在以下操作任意階段皆有可能超時(shí)導(dǎo)致協(xié)程任務(wù)被丟棄 # 所以必須添加finally字段用來(lái)正確關(guān)閉連接 try: log.debug(f'request send--{request}') writer.write(request.encode('utf-8')) await writer.drain() log.debug(f'request send done--{request}') log.debug(f'response recv start') response = b'' # 接收response while True: line = await reader.readline() response += line if line == b'}\n': break log.debug(f'response recv done') log.debug(f'close connect') writer.close() return response finally: writer.close() async def http_request(url): """ 返回請(qǐng)求后的狀態(tài)碼,頁(yè)面內(nèi)容 :param url: http://httpbin.org/headers :return: state_code, header, body """ protocol, host, port, path = parse_url(url) if port == 443: response = await connect(host, port, tls=True) else: response = await connect(host, port, tls=False) status_code, headers, body = parsed_response(response) return status_code, headers, body async def create_one_coroutine(url_queue, coros_queue, response_queue, task_timeout=1): """ :param url_queue: :param coros_queue: :param response_queue: :param task_timeout: :return: """ # 用當(dāng)前協(xié)程池隊(duì)列大小給予協(xié)程id thread_id = coros_queue.qsize() log.info(f'添加一個(gè)協(xié)程{thread_id}') # 將協(xié)程放入?yún)f(xié)程池隊(duì)列 await coros_queue.put(thread_id) while True: # 如果urls任務(wù)隊(duì)列空了,則從協(xié)程池隊(duì)列中去掉一個(gè)協(xié)程 if url_queue.empty() is True: await coros_queue.get() log.info(f'協(xié)程{thread_id}關(guān)閉') break url = await url_queue.get() log.info(f'協(xié)程{thread_id}從隊(duì)列中拿取一個(gè)url, 還剩{url_queue.qsize()}個(gè)url') try: # 將http_request(url)返回的response裝入response隊(duì)列中 await response_queue.put( await asyncio.wait_for(http_request(url), timeout=task_timeout)) # asyncio.wait_for()超時(shí)會(huì)引發(fā)一個(gè)TimeoutError except asyncio.TimeoutError: log.info(f'協(xié)程{thread_id}請(qǐng)求超時(shí),丟棄這次請(qǐng)求') else: log.info(f'協(xié)程{thread_id}成功獲取一次響應(yīng)') async def if_threads_done(threads_queue, response_queue): """ 監(jiān)視urls協(xié)程的管理協(xié)程, 每1秒對(duì)urls協(xié)程隊(duì)列進(jìn)行一次判定, 如果urls協(xié)程隊(duì)列已空則關(guān)閉loop :param response_queue: asyncio.Queue :param threads_queue: asyncio.Queue :return: """ start_time = time.time() while True: # 每隔一秒檢查一次協(xié)程池隊(duì)列是否為空 await asyncio.sleep(1) if threads_queue.empty() is True: log.info(f'所有協(xié)程已經(jīng)關(guān)閉,關(guān)閉loop') break # 如果協(xié)程池隊(duì)列已經(jīng)沒(méi)有協(xié)程在運(yùn)行,則統(tǒng)計(jì)時(shí)間->保存responses->關(guān)閉loop log.info(f'獲取成功的響應(yīng)數(shù)量:{response_queue.qsize()}') save_response(response_queue, save_name='response.txt') use_time = time.time() - start_time log.info(f'總耗時(shí):{str(use_time)}') log.info(f'響應(yīng)已存入response.txt') loop = asyncio.get_running_loop() loop.stop() async def run_with_pool(urls, threads=5, task_timeout=5): """ :param urls: list :param threads: int :param task_timeout: int :return: loop """ # 創(chuàng)建一個(gè)url隊(duì)列,用于協(xié)程提取url url_queue = asyncio.Queue() # 創(chuàng)建一個(gè)coroutines隊(duì)列,用于管理協(xié)程 coros_queue = asyncio.Queue() # 創(chuàng)建一個(gè)response隊(duì)列,用于保存response response_queue = asyncio.Queue() # 將urls列表加入隊(duì)列 [url_queue.put_nowait(url) for url in urls] # 創(chuàng)建協(xié)程池 coros = [create_one_coroutine(url_queue, coros_queue, response_queue, task_timeout) for i in range(0, threads)] [asyncio.create_task(coro) for coro in coros] # 創(chuàng)建一個(gè)監(jiān)視urls協(xié)程的管理協(xié)程, 每1秒對(duì)urls協(xié)程隊(duì)列進(jìn)行一次判定,如果urls協(xié)程隊(duì)列已空則關(guān)閉loop asyncio.create_task(if_threads_done(coros_queue, response_queue)) def main(): tasks = [ 'http://httpbin.org' ] * 50 tasks += [ 'https://httpbin.org' ] * 50 loop = asyncio.get_event_loop() loop.create_task(run_with_pool(tasks, threads=15, task_timeout=3)) try: loop.run_forever() finally: loop.close() if __name__ == '__main__': logging.basicConfig(level=logging.INFO) main()
到此這篇關(guān)于Python asyncio的基本使用的文章就介紹到這了,更多相關(guān)Python asyncio使用內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
淺談Django自定義模板標(biāo)簽template_tags的用處
這篇文章主要介紹了淺談Django自定義模板標(biāo)簽template_tags的用處,具有一定借鑒價(jià)值,需要的朋友可以參考下。2017-12-12如何利用Boost.Python實(shí)現(xiàn)Python C/C++混合編程詳解
這篇文章主要給大家介紹了關(guān)于如何利用Boost.Python實(shí)現(xiàn)Python C/C++混合編程的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起看看吧2018-11-11利用python實(shí)現(xiàn)數(shù)據(jù)分析
為什么要利用python進(jìn)行數(shù)據(jù)分析?python擁有一個(gè)巨大的活躍的科學(xué)計(jì)算社區(qū),擁有不斷改良的庫(kù),能夠輕松的集成C,C++,Fortran代碼(Cython項(xiàng)目),可以同時(shí)用于研究和原型的構(gòu)建以及生產(chǎn)系統(tǒng)的構(gòu)建。2017-01-01Python對(duì)Tornado請(qǐng)求與響應(yīng)的數(shù)據(jù)處理
這篇文章主要介紹了Python對(duì)Tornado請(qǐng)求與響應(yīng)的數(shù)據(jù)處理,需要的朋友可以參考下2020-02-02使用python向MongoDB插入時(shí)間字段的操作
這篇文章主要介紹了使用python向MongoDB插入時(shí)間字段的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-05-05Python3.5實(shí)現(xiàn)的三級(jí)菜單功能示例
這篇文章主要介紹了Python3.5實(shí)現(xiàn)的三級(jí)菜單功能,涉及Python針對(duì)json格式數(shù)據(jù)的讀取、遍歷、查找、判斷等相關(guān)操作技巧,需要的朋友可以參考下2019-03-03編寫(xiě)同時(shí)兼容Python2.x與Python3.x版本的代碼的幾個(gè)示例
這篇文章主要介紹了編寫(xiě)同時(shí)兼容Python2.x與Python3.x版本的代碼的幾個(gè)示例,在Python2.7.x的更新中由于采用了某些Python3中的代碼編寫(xiě)特性、使得在有些原本不同之處編寫(xiě)兼容性代碼成為可能,需要的朋友可以參考下2015-03-03