欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python asyncio的基本使用示例教程

 更新時間:2024年02月05日 10:03:09   作者:__walden  
Task主要作用是幫助event_loop調(diào)度,如果Task發(fā)現(xiàn)它打包的協(xié)程在await一個對象(執(zhí)行一個io操作),則Task會掛起該協(xié)程的執(zhí)行,把控制權交回event_loop,讓他去運行其他Task,當該對象運行完畢,Task將打包的協(xié)程恢復運行,本文介紹Python asyncio使用小結,感興趣的朋友一起看看吧

Asyncio在python3.6、3.7的基本使用方式

  • 什么是事件循環(huán)(event loop):

? 一方面,它類似于 CPU ,順序執(zhí)行協(xié)程的代碼;另一方面,它相當于操作系統(tǒng),完成協(xié)程的調(diào)度,即一個協(xié)程“暫停”時,決定接下來執(zhí)行哪個協(xié)程。

  • 可等待對象(Future、協(xié)程、Task):

? 把可等待對象注冊進事件循環(huán)里,事件循環(huán)會安排他們執(zhí)行并返回結果

import asyncio
# 用async定義一個協(xié)程
async def wait_and_print(wait_time, name):
    # 這是一個模擬io阻塞的定時器,在sleep(或者io)的時候,
    # 協(xié)程會把控制權交還給event_loop,讓他去執(zhí)行別的協(xié)程
    await asyncio.sleep(wait_time)
    print(f'wait_time:{wait_time}, name:{name}')
# 直接 wait_and_print(2, 'a') 這樣不能運行一個協(xié)程,要將它注冊入事件循環(huán)中
# 在python3.7
# asyncio.run(wait_and_print(2, 'a'))
# 在python3.6
loop = asyncio.get_event_loop()
# run_until_complete 這個函數(shù)會阻塞運行直到里面的協(xié)程運行完畢返回結果
loop.run_until_complete(wait_and_print(2, 'a'))

結果輸出:wait_time:2, name:a

常用到的對象和函數(shù)及其使用場景

協(xié)程對象:要運行一個協(xié)程對象,必須把他注冊到事件循環(huán)里,那么有三種方法

  • 直接注冊到event_loop中
  • 在別的協(xié)程中await它,間接的注冊到事件循環(huán)里
  • 把它包裝成一個Task對象,此時會直接注冊入’下一輪’的事件循環(huán)中
import asyncio
async def wait_and_print(wait_time, name):
    # await會阻塞直到后面跟的協(xié)程運行完畢返回結果
    await asyncio.sleep(wait_time)
    print(f'wait_time:{wait_time}, name:{name}')
# # 1.直接注冊入事件循環(huán)
# loop = asyncio.get_event_loop()
# loop.run_until_complete(wait_and_print(2, 'a'))
#
#
# # 2.間接注冊入事件循環(huán)
# # 在其他協(xié)程中await它,再把那個協(xié)程注冊入事件循環(huán)
# async def run():
#     await wait_and_print(2, 'a')
#
#
# loop = asyncio.get_event_loop()
# loop.run_until_complete(run())
# # 3.打包成Task對象
#
async def run():
    # 關于下面兩個函數(shù)的區(qū)別后面會在說完Task和Future對象后進行討論
    # python3.6
    # asyncio.ensure_future(wait_and_print(2, 'a'))
    # python3.7
    print(asyncio.create_task(wait_and_print(2, 'a')))
    await asyncio.sleep(3)
loop = asyncio.get_event_loop()
loop.run_until_complete(run())

結果輸出:
<Task pending coro=<wait_and_print() running at /Users/frostw/Documents/share_new/share/第二部分/1.py:4>>
wait_time:2, name:a

Task

把協(xié)程對象打包成一個Task,這個過程會直接注冊這個協(xié)程進入事件循環(huán)中

它的主要作用是幫助event_loop進行調(diào)度,如果Task發(fā)現(xiàn)它打包的協(xié)程在await一個對象(執(zhí)行一個io操作),則該Task會掛起該協(xié)程的執(zhí)行,把控制權交回event_loop,讓他去運行其他Task,當該對象運行完畢,Task會將打包的協(xié)程恢復運行

Future

Task的父類,一個比較底層的對象,用于支持底層回調(diào)式代碼與高層異步/等待式代碼交互,它有一些方法,例如result(),set_result(),done(),cancelled(),cancel()等,但是一般不會使用Future對象,如果要使用也是在Task對象使用

enture_future()和create_task()

一般的使用就是傳入一個協(xié)程對象,然后打包成一個task對象后返回,但是enture_future()還有別的用法,下面是python作者相關的討論

大概的翻譯如下:

  • 如果有個可能是協(xié)程或者future或者task的對象,你希望能對它使用僅在Future類中定義過的方法(這里或許唯一能舉出的有用的函數(shù)的例子就是cancel()),就對它使用entrue_future。這個函數(shù)會在當傳入的對象已經(jīng)是一個Future或者Task,就啥也不干,原封不動返回它,當它是一個協(xié)程則把他打包成一個Task并返回。
  • 現(xiàn)版本來說(python3.7)如果你就是想把一個協(xié)程注冊入事件循環(huán)運行,那就該用create_task(),如果是想提供一個底層些的API(類似asyncio自帶的api)時候要用到Future對象,那就用enture_future()

asyncio.wait_for()

  • wait_for會檢查傳進去的對象,如果是協(xié)程則會打包成task
  • 等待可等待對象完成,并可以設置超時
  • 這里涉及到一個取消task/future的概念,也就是之前提到的future.cancel(),如果超時則把future取消
import asyncio
async def wait_and_print(wait_time, name):
    await asyncio.sleep(wait_time)
    print(f'wait_time:{wait_time}, name:{name}')
async def run():
    # 為了我們方便查看這個任務的后續(xù)狀態(tài),先打包成一個task_
    task_ = asyncio.create_task(wait_and_print(1000, 'a'))
    try:
        await asyncio.wait_for(task_, timeout=3)
    except asyncio.TimeoutError:
        print('發(fā)現(xiàn)任務超時,已被取消')
        # 查看被取消后的狀態(tài)
        print(task_)
        print(task_.cancelled())
asyncio.run(run())

結果輸出:
發(fā)現(xiàn)任務超時,已被取消
<Task cancelled coro=<wait_and_print() done, defined at /Users/frostw/Documents/share_new/share/第二部分/2.py:4>>
True

asyncio.gather()

  • 并發(fā)的運行協(xié)程,或者task,并把他們的結果按照順序放在列表中返回
  • gather會檢查傳進去的對象,如果是協(xié)程則會打包成task
import asyncio
async def wait_and_print(wait_time, name):
    await asyncio.sleep(wait_time)
    print(f'wait_time:{wait_time}, name:{name}')
    # 添加一個return
    return f'wait_time:{wait_time}, name:{name}'
# loop = asyncio.get_event_loop()
# # 會把三個協(xié)程打包成任務task加入循環(huán)
# gather_result = loop.run_until_complete(asyncio.gather(wait_and_print(1, 'a'),
#                                                        wait_and_print(2, 'b'),
#                                                        wait_and_print(1, 'c')))
# print(gather_result)
async def wait_and_cancel_task(wait_time, task):
    await asyncio.sleep(wait_time)
    task.cancel()
    print(f'{task} 被取消了')
    return f'{task} 被取消了'
async def run():
    # 封裝一個b_task
    b_task = asyncio.create_task(wait_and_print(5, 'b'))
    print(b_task)
    gather_ = asyncio.gather(wait_and_print(1, 'a'),
                             b_task,
                             wait_and_print(3, 'c'),
                             # 在這里把他給取消了
                             wait_and_cancel_task(4, b_task), return_exceptions=True)
    print(await gather_)
loop = asyncio.get_event_loop()
loop.run_until_complete(run())

結果輸出:
<Task pending coro=<wait_and_print() running at /Users/frostw/Documents/share_new/share/第二部分/3.py:4>>
wait_time:1, name:a
wait_time:3, name:c
<Task pending coro=<wait_and_print() running at /Users/frostw/Documents/share_new/share/第二部分/3.py:5> wait_for=<Future cancelled> cb=[gather.<locals>._done_callback() at /Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:691]> 被取消了
['wait_time:1, name:a', CancelledError(), 'wait_time:3, name:c', '<Task pending coro=<wait_and_print() running at /Users/frostw/Documents/share_new/share/第二部分/3.py:5> wait_for=<Future cancelled> cb=[gather.<locals>._done_callback() at /Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:691]> 被取消了']

  • 協(xié)程并不一定按順序執(zhí)行,但結果是按順序返回
  • 關于被取消:
    • return_exceptions 參數(shù)為False(默認):所有協(xié)程任務所引發(fā)的首個異常會立刻作為gather的結果返回,但其他任務不會被取消
    • return_exceptions 參數(shù)為True:所有協(xié)程任務所引發(fā)的異常不會立刻返回,會聚合在列表中當作普通結果返回
    • 如果gather本身被取消,那它所有的協(xié)程任務都會被取消

結合例子討論loop的調(diào)度過程

官網(wǎng)示例1

   import asyncio
   async def compute(x, y):
       print("Compute %s + %s ..." % (x, y))
       await asyncio.sleep(1.0)
       return x + y
   async def print_sum(x, y):
       result = await compute(x, y)
       print("%s + %s = %s" % (x, y, result))
   loop = asyncio.get_event_loop()
   loop.run_until_complete(print_sum(1, 2))
   loop.close()

示例2

import asyncio
async def wait_and_print(wait_time, name):
    await asyncio.sleep(wait_time)
    print(f'wait_time:{wait_time}, name:{name}')
async def run():
    asyncio.create_task(wait_and_print(1, 'a'))
    await wait_and_print(1, 'b')
if __name__ == '__main__':
    asyncio.run(run())
# 原因:create_task會把任務自動安排到下一輪事件循環(huán)

結果輸出:
wait_time:1, name:b
wait_time:1, name:a

示例3

import asyncio
async def wait_and_print(wait_time, name):
    await asyncio.sleep(wait_time)
    print(f'wait_time:{wait_time}, name:{name}')
async def run():
    asyncio.create_task(wait_and_print(1, 'a'))
    await asyncio.sleep(0)
    await wait_and_print(1, 'b')
if __name__ == '__main__':
    asyncio.run(run())
# 原因:await asyncio.sleep(0)會使得task將控制權交還給loop進行下一個任務

結果輸出:
wait_time:1, name:a
wait_time:1, name:b

示例4

import asyncio
async def wait_and_print(wait_times, name):
    await asyncio.sleep(wait_times)
    print(f'wait_times:{wait_times}  name:{name}')
async def run():
    asyncio.ensure_future(asyncio.ensure_future(wait_and_print(1, 'f')))
    asyncio.ensure_future(wait_and_print(1, 'b'))
    asyncio.ensure_future(asyncio.gather(wait_and_print(1, 'd')))
    asyncio.create_task(wait_and_print(1, 'a'))
    await wait_and_print(1, 'c')
    asyncio.ensure_future(asyncio.gather(wait_and_print(2, 'g'), wait_and_print(0, 'h')))
    await wait_and_print(1, 'e')
# c>f>b>d>a>h>e>
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())

wait_times:1  name:c
wait_times:1  name:f
wait_times:1  name:b
wait_times:1  name:d
wait_times:1  name:a
wait_times:0  name:h
wait_times:1  name:e

基于Asyncio的發(fā)布訂閱程序

import asyncio
import random
async def mama(zhuo_zi):
    while True:
        zheng_bao_zi = random.randint(1, 10)
        await asyncio.sleep(zheng_bao_zi)
        pan_zi = random.randint(1, 10)
        [await zhuo_zi.put(bao_zi) for bao_zi in range(pan_zi)]
        print(f'媽媽花了{zheng_bao_zi}秒,上桌了{pan_zi}個包子')
        print(f'現(xiàn)在桌子上的包子總數(shù):{zhuo_zi.qsize()}')
async def xiao_ming(zhuo_zi):
    while True:
        if zhuo_zi.qsize() == 0:
            print(f'小明發(fā)現(xiàn)沒包子了,等一會')
            await asyncio.sleep(1)
        else:
            await zhuo_zi.get()
            chi_bao_zi = random.randint(1, 3)
            await asyncio.sleep(chi_bao_zi)
            print(f'小明花了{chi_bao_zi}秒吃了個包子')
async def run(mama_number, xiao_ming_number):
    zhuo_zi = asyncio.Queue()
    mama_tasks = [asyncio.create_task(mama(zhuo_zi)) for number in range(mama_number)]
    xiao_ming_tasks = [asyncio.create_task(xiao_ming(zhuo_zi)) for number in range(xiao_ming_number)]
    return mama_tasks, xiao_ming_tasks
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    mama_tasks, xiao_ming_tasks = loop.run_until_complete(run(mama_number=3, xiao_ming_number=8))
    loop.run_forever()

簡易的通過協(xié)程池發(fā)送請求的爬蟲

import asyncio
import logging
import time
import json
log = logging
def parsed_response(response):
    """
       解析 response
       :param : response byte
       :return: status_code int
                headers dict
                str body
    """
    r = response.decode()
    header, body = r.split('\r\n\r\n', 1)
    h = header.split('\r\n')
    status_code = int(h[0].split()[1])
    headers = {}
    for line in h[1:]:
        k, v = line.split(': ')
        headers[k] = v
    return status_code, headers, body
def parse_url(url):
    """
    對 url 進行字符串拆分
    :param url: http://httpbin.org/headers
    :return: (protocol, host, port, path)
    """
    # 檢查協(xié)議
    protocol = 'http'
    if url[:7] == 'http://':
        u = url.split('://')[1]
    elif url[:8] == 'https://':
        protocol = 'https'
        u = url.split('://')[1]
    else:
        u = url
    # https://g.cn:1234/hello
    # g.cn:1234/hello
    # 檢查默認 path
    i = u.find('/')
    if i == -1:
        host = u
        path = '/'
    else:
        host = u[:i]
        path = u[i:]
    # 檢查端口
    port_dict = {
        'http': 80,
        'https': 443,
    }
    # 默認端口
    port = port_dict[protocol]
    if ':' in host:
        h = host.split(':')
        host = h[0]
        port = int(h[1])
    return protocol, host, port, path
def save_response(queue, save_name='data'):
    """
    將一個隊列里的數(shù)據(jù)保存到json文本
    :param queue: asyncio.Queue
    :param save_name: str json文本的name
    :return:
    """
    data = []
    for i in range(queue.qsize()):
        status_code, headers, body = queue.get_nowait()
        save_dict = dict(id=i,
                         status_code=status_code,
                         headers=headers,
                         body=body)
        data.append(save_dict)
    s = json.dumps(data, indent=2, ensure_ascii=False)
    with open(save_name, 'w', encoding='utf-8') as f:
        f.write(s)
async def connect(host, port, path='headers', tls=False):
    """
    :param host: str: httpbin.org
    :param port: int: 80/443
    :param tls:  bool
    :param path: str: headers/
    :return: response content (bytes)
    """
    log.debug(f'connect to {host}:{port}, tls:{tls}')
    if tls is True:
        protocol = 'https'
        reader, writer = await asyncio.open_connection(
            host,
            port,
            ssl=True)
    else:
        protocol = 'http'
        reader, writer = await asyncio.open_connection(
            host,
            port)
    log.debug(f'{host}:{port}, tls:{tls} connected')
    request = f'GET {protocol}://{host}:{port}/{path} HTTP/1.1\r\nHost:{host}\r\n\r\n'
    # 由于設置了timeout, 在以下操作任意階段皆有可能超時導致協(xié)程任務被丟棄
    # 所以必須添加finally字段用來正確關閉連接
    try:
        log.debug(f'request send--{request}')
        writer.write(request.encode('utf-8'))
        await writer.drain()
        log.debug(f'request send done--{request}')
        log.debug(f'response recv start')
        response = b''
        # 接收response
        while True:
            line = await reader.readline()
            response += line
            if line == b'}\n':
                break
        log.debug(f'response recv done')
        log.debug(f'close connect')
        writer.close()
        return response
    finally:
        writer.close()
async def http_request(url):
    """
    返回請求后的狀態(tài)碼,頁面內(nèi)容
    :param url: http://httpbin.org/headers
    :return: state_code, header, body
    """
    protocol, host, port, path = parse_url(url)
    if port == 443:
        response = await connect(host, port, tls=True)
    else:
        response = await connect(host, port, tls=False)
    status_code, headers, body = parsed_response(response)
    return status_code, headers, body
async def create_one_coroutine(url_queue, coros_queue, response_queue, task_timeout=1):
    """
    :param url_queue:
    :param coros_queue:
    :param response_queue:
    :param task_timeout:
    :return:
    """
    # 用當前協(xié)程池隊列大小給予協(xié)程id
    thread_id = coros_queue.qsize()
    log.info(f'添加一個協(xié)程{thread_id}')
    # 將協(xié)程放入?yún)f(xié)程池隊列
    await coros_queue.put(thread_id)
    while True:
        # 如果urls任務隊列空了,則從協(xié)程池隊列中去掉一個協(xié)程
        if url_queue.empty() is True:
            await coros_queue.get()
            log.info(f'協(xié)程{thread_id}關閉')
            break
        url = await url_queue.get()
        log.info(f'協(xié)程{thread_id}從隊列中拿取一個url, 還剩{url_queue.qsize()}個url')
        try:
            # 將http_request(url)返回的response裝入response隊列中
            await response_queue.put(
                await asyncio.wait_for(http_request(url), timeout=task_timeout))
        # asyncio.wait_for()超時會引發(fā)一個TimeoutError
        except asyncio.TimeoutError:
            log.info(f'協(xié)程{thread_id}請求超時,丟棄這次請求')
        else:
            log.info(f'協(xié)程{thread_id}成功獲取一次響應')
async def if_threads_done(threads_queue, response_queue):
    """
    監(jiān)視urls協(xié)程的管理協(xié)程,
    每1秒對urls協(xié)程隊列進行一次判定,
    如果urls協(xié)程隊列已空則關閉loop
    :param response_queue: asyncio.Queue
    :param threads_queue: asyncio.Queue
    :return:
    """
    start_time = time.time()
    while True:
        # 每隔一秒檢查一次協(xié)程池隊列是否為空
        await asyncio.sleep(1)
        if threads_queue.empty() is True:
            log.info(f'所有協(xié)程已經(jīng)關閉,關閉loop')
            break
    # 如果協(xié)程池隊列已經(jīng)沒有協(xié)程在運行,則統(tǒng)計時間->保存responses->關閉loop
    log.info(f'獲取成功的響應數(shù)量:{response_queue.qsize()}')
    save_response(response_queue, save_name='response.txt')
    use_time = time.time() - start_time
    log.info(f'總耗時:{str(use_time)}')
    log.info(f'響應已存入response.txt')
    loop = asyncio.get_running_loop()
    loop.stop()
async def run_with_pool(urls, threads=5, task_timeout=5):
    """
    :param urls: list
    :param threads: int
    :param task_timeout: int
    :return: loop
    """
    # 創(chuàng)建一個url隊列,用于協(xié)程提取url
    url_queue = asyncio.Queue()
    # 創(chuàng)建一個coroutines隊列,用于管理協(xié)程
    coros_queue = asyncio.Queue()
    # 創(chuàng)建一個response隊列,用于保存response
    response_queue = asyncio.Queue()
    # 將urls列表加入隊列
    [url_queue.put_nowait(url) for url in urls]
    # 創(chuàng)建協(xié)程池
    coros = [create_one_coroutine(url_queue, coros_queue, response_queue, task_timeout)
             for i in range(0, threads)]
    [asyncio.create_task(coro) for coro in coros]
    # 創(chuàng)建一個監(jiān)視urls協(xié)程的管理協(xié)程, 每1秒對urls協(xié)程隊列進行一次判定,如果urls協(xié)程隊列已空則關閉loop
    asyncio.create_task(if_threads_done(coros_queue, response_queue))
def main():
    tasks = [
                'http://httpbin.org'
            ] * 50
    tasks += [
                 'https://httpbin.org'
             ] * 50
    loop = asyncio.get_event_loop()
    loop.create_task(run_with_pool(tasks, threads=15, task_timeout=3))
    try:
        loop.run_forever()
    finally:
        loop.close()
if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO)
    main()

到此這篇關于Python asyncio的基本使用的文章就介紹到這了,更多相關Python asyncio使用內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • 淺談Django自定義模板標簽template_tags的用處

    淺談Django自定義模板標簽template_tags的用處

    這篇文章主要介紹了淺談Django自定義模板標簽template_tags的用處,具有一定借鑒價值,需要的朋友可以參考下。
    2017-12-12
  • 使用Python刪除PPT中所有超鏈接的操作步驟

    使用Python刪除PPT中所有超鏈接的操作步驟

    在某些PPT使用場景中,比如需要打印幻燈片或者超鏈接已失效時,演示文稿中的超鏈接可能會成為一種干擾,這時我們需要移除PowerPoint演示文稿中的超鏈接,本文將介紹如何使用Python刪除PowerPoint演示文稿中的所有超鏈接,需要的朋友可以參考下
    2024-08-08
  • 如何利用Boost.Python實現(xiàn)Python C/C++混合編程詳解

    如何利用Boost.Python實現(xiàn)Python C/C++混合編程詳解

    這篇文章主要給大家介紹了關于如何利用Boost.Python實現(xiàn)Python C/C++混合編程的相關資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起看看吧
    2018-11-11
  • Python使用PIL.image保存圖片

    Python使用PIL.image保存圖片

    PIL庫支持圖像存儲、顯示和處理,它能夠處理幾乎所有圖片格式,可以完成對圖像的縮放、剪裁、疊加以及向圖像添加線條、圖像和文字等操作,下面這篇文章主要給大家介紹了關于Python使用PIL.image保存圖片的相關資料,需要的朋友可以參考下
    2022-12-12
  • 利用python實現(xiàn)數(shù)據(jù)分析

    利用python實現(xiàn)數(shù)據(jù)分析

    為什么要利用python進行數(shù)據(jù)分析?python擁有一個巨大的活躍的科學計算社區(qū),擁有不斷改良的庫,能夠輕松的集成C,C++,Fortran代碼(Cython項目),可以同時用于研究和原型的構建以及生產(chǎn)系統(tǒng)的構建。
    2017-01-01
  • Python對Tornado請求與響應的數(shù)據(jù)處理

    Python對Tornado請求與響應的數(shù)據(jù)處理

    這篇文章主要介紹了Python對Tornado請求與響應的數(shù)據(jù)處理,需要的朋友可以參考下
    2020-02-02
  • 使用python向MongoDB插入時間字段的操作

    使用python向MongoDB插入時間字段的操作

    這篇文章主要介紹了使用python向MongoDB插入時間字段的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-05-05
  • Python3.5實現(xiàn)的三級菜單功能示例

    Python3.5實現(xiàn)的三級菜單功能示例

    這篇文章主要介紹了Python3.5實現(xiàn)的三級菜單功能,涉及Python針對json格式數(shù)據(jù)的讀取、遍歷、查找、判斷等相關操作技巧,需要的朋友可以參考下
    2019-03-03
  • python web框架 django wsgi原理解析

    python web框架 django wsgi原理解析

    這篇文章主要介紹了python web框架 django wsgi原理解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值
    2019-08-08
  • 編寫同時兼容Python2.x與Python3.x版本的代碼的幾個示例

    編寫同時兼容Python2.x與Python3.x版本的代碼的幾個示例

    這篇文章主要介紹了編寫同時兼容Python2.x與Python3.x版本的代碼的幾個示例,在Python2.7.x的更新中由于采用了某些Python3中的代碼編寫特性、使得在有些原本不同之處編寫兼容性代碼成為可能,需要的朋友可以參考下
    2015-03-03

最新評論