欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

python并發(fā)2之使用asyncio處理并發(fā)

 更新時(shí)間:2017年12月21日 10:45:47   作者:goodspeed  
本篇文章主要介紹了python并發(fā)2之使用asyncio處理并發(fā),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧

asyncio

在Python 2的時(shí)代,高性能的網(wǎng)絡(luò)編程主要是使用Twisted、Tornado和Gevent這三個(gè)庫(kù),但是它們的異步代碼相互之間既不兼容也不能移植。如上一節(jié)說(shuō)的,Gvanrossum希望在Python 3 實(shí)現(xiàn)一個(gè)原生的基于生成器的協(xié)程庫(kù),其中直接內(nèi)置了對(duì)異步IO的支持,這就是asyncio,它在Python 3.4被引入到標(biāo)準(zhǔn)庫(kù)。

asyncio 這個(gè)包使用事件循環(huán)驅(qū)動(dòng)的協(xié)程實(shí)現(xiàn)并發(fā)。

asyncio 包在引入標(biāo)準(zhǔn)庫(kù)之前代號(hào) “Tulip”(郁金香),所以在網(wǎng)上搜索資料時(shí),會(huì)經(jīng)??吹竭@種花的名字。

什么是事件循環(huán)?

wiki 上說(shuō):事件循環(huán)是”一種等待程序分配事件或者消息的編程架構(gòu)“?;旧蟻?lái)說(shuō)事件循環(huán)就是:”當(dāng)A發(fā)生時(shí),執(zhí)行B"?;蛘哂米詈?jiǎn)單的例子來(lái)解釋這一概念就是每個(gè)瀏覽器中都存在的JavaScript事件循環(huán)。當(dāng)你點(diǎn)擊了某個(gè)東西(“當(dāng)A發(fā)生時(shí)”),這一點(diǎn)擊動(dòng)作會(huì)發(fā)送給JavaScript的事件循環(huán),并檢查是否存在注冊(cè)過(guò)的onclick 回調(diào)來(lái)處理這一點(diǎn)擊(執(zhí)行B)。只要有注冊(cè)過(guò)的回調(diào)函數(shù)就會(huì)伴隨點(diǎn)擊動(dòng)作的細(xì)節(jié)信息被執(zhí)行。事件循環(huán)被認(rèn)為是一種虛幻是因?yàn)樗煌5氖謾C(jī)事件并通過(guò)循環(huán)來(lái)發(fā)如何應(yīng)對(duì)這些事件。

對(duì) Python 來(lái)說(shuō),用來(lái)提供事件循環(huán)的 asyncio 被加入標(biāo)準(zhǔn)庫(kù)中。asyncio 重點(diǎn)解決網(wǎng)絡(luò)服務(wù)中的問(wèn)題,事件循環(huán)在這里將來(lái)自套接字(socket)的 I/O 已經(jīng)準(zhǔn)備好讀和/或?qū)懽鳛椤爱?dāng)A發(fā)生時(shí)”(通過(guò)selectors模塊)。除了 GUI 和 I/O,事件循環(huán)也經(jīng)常用于在別的線程或子進(jìn)程中執(zhí)行代碼,并將事件循環(huán)作為調(diào)節(jié)機(jī)制(例如,合作式多任務(wù))。如果你恰好理解 Python 的 GIL,事件循環(huán)對(duì)于需要釋放 GIL 的地方很有用。

線程與協(xié)程

我們先看兩斷代碼,分別用 threading 模塊和asyncio 包實(shí)現(xiàn)的一段代碼。

# sinner_thread.py
import threading
import itertools
import time
import sys
class Signal: # 這個(gè)類(lèi)定義一個(gè)可變對(duì)象,用于從外部控制線程
 go = True

def spin(msg, signal): # 這個(gè)函數(shù)會(huì)在單獨(dú)的線程中運(yùn)行,signal 參數(shù)是前邊定義的Signal類(lèi)的實(shí)例
 write, flush = sys.stdout.write, sys.stdout.flush
 for char in itertools.cycle('|/-\\'): # itertools.cycle 函數(shù)從指定的序列中反復(fù)不斷地生成元素
  status = char + ' ' + msg
  write(status)
  flush()
  write('\x08' * len(status)) # 使用退格符把光標(biāo)移回行首
  time.sleep(.1) # 每 0.1 秒刷新一次
  if not signal.go: # 如果 go屬性不是 True,退出循環(huán)
   break

 write(' ' * len(status) + '\x08' * len(status)) # 使用空格清除狀態(tài)消息,把光標(biāo)移回開(kāi)頭

def slow_function(): # 模擬耗時(shí)操作
 # 假裝等待I/O一段時(shí)間
 time.sleep(3) # 調(diào)用sleep 會(huì)阻塞主線程,這么做事為了釋放GIL,創(chuàng)建從屬線程
 return 42

def supervisor(): # 這個(gè)函數(shù)設(shè)置從屬線程,顯示線程對(duì)象,運(yùn)行耗時(shí)計(jì)算,最后殺死進(jìn)程
 signal = Signal()
 spinner = threading.Thread(target=spin,
        args=('thinking!', signal))
 print('spinner object:', spinner) # 顯示線程對(duì)象 輸出 spinner object: <Thread(Thread-1, initial)>
 spinner.start() # 啟動(dòng)從屬進(jìn)程
 result = slow_function() # 運(yùn)行slow_function 行數(shù),阻塞主線程。同時(shí)叢書(shū)線程以動(dòng)畫(huà)形式旋轉(zhuǎn)指針
 signal.go = False
 spinner.join() # 等待spinner 線程結(jié)束
 return result
def main():
 result = supervisor() 
 print('Answer', result)
if __name__ == '__main__':
 main()

執(zhí)行一下,結(jié)果大致是這個(gè)樣子:

這是一個(gè)動(dòng)圖,“thinking" 前的 線是會(huì)動(dòng)的(為了錄屏,我把sleep 的時(shí)間調(diào)大了)

python 并沒(méi)有提供終止線程的API,所以若想關(guān)閉線程,必須給線程發(fā)送消息。這里我們使用signal.go 屬性:在主線程中把它設(shè)置為False后,spinner 線程會(huì)接收到,然后退出

現(xiàn)在我們?cè)倏聪率褂?asyncio 包的版本:

# spinner_asyncio.py
# 通過(guò)協(xié)程以動(dòng)畫(huà)的形式顯示文本式旋轉(zhuǎn)指針
import asyncio
import itertools
import sys
@asyncio.coroutine # 打算交給asyncio 處理的協(xié)程要使用 @asyncio.coroutine 裝飾
def spin(msg):
 write, flush = sys.stdout.write, sys.stdout.flush
 for char in itertools.cycle('|/-\\'): # itertools.cycle 函數(shù)從指定的序列中反復(fù)不斷地生成元素
  status = char + ' ' + msg
  write(status)
  flush()
  write('\x08' * len(status)) # 使用退格符把光標(biāo)移回行首
  try:
   yield from asyncio.sleep(0.1) # 使用 yield from asyncio.sleep(0.1) 代替 time.sleep(.1), 這樣的休眠不會(huì)阻塞事件循環(huán)
  except asyncio.CancelledError: # 如果 spin 函數(shù)蘇醒后拋出 asyncio.CancelledError 異常,其原因是發(fā)出了取消請(qǐng)求
   break

 write(' ' * len(status) + '\x08' * len(status)) # 使用空格清除狀態(tài)消息,把光標(biāo)移回開(kāi)頭

@asyncio.coroutine
def slow_function(): # 5 現(xiàn)在此函數(shù)是協(xié)程,使用休眠假裝進(jìn)行I/O 操作時(shí),使用 yield from 繼續(xù)執(zhí)行事件循環(huán)
 # 假裝等待I/O一段時(shí)間
 yield from asyncio.sleep(3) # 此表達(dá)式把控制權(quán)交給主循環(huán),在休眠結(jié)束后回復(fù)這個(gè)協(xié)程
 return 42

@asyncio.coroutine
def supervisor(): #這個(gè)函數(shù)也是協(xié)程,因此可以使用 yield from 驅(qū)動(dòng) slow_function
 spinner = asyncio.async(spin('thinking!')) # asyncio.async() 函數(shù)排定協(xié)程的運(yùn)行時(shí)間,使用一個(gè) Task 對(duì)象包裝spin 協(xié)程,并立即返回
 print('spinner object:', spinner) # Task 對(duì)象,輸出類(lèi)似 spinner object: <Task pending coro=<spin() running at spinner_asyncio.py:6>>
 # 驅(qū)動(dòng)slow_function() 函數(shù),結(jié)束后,獲取返回值。同事事件循環(huán)繼續(xù)運(yùn)行,
 # 因?yàn)閟low_function 函數(shù)最后使用yield from asyncio.sleep(3) 表達(dá)式把控制權(quán)交給主循環(huán)
 result = yield from slow_function()
 # Task 對(duì)象可以取消;取消后會(huì)在協(xié)程當(dāng)前暫停的yield處拋出 asyncio.CancelledError 異常
 # 協(xié)程可以捕獲這個(gè)異常,也可以延遲取消,甚至拒絕取消
 spinner.cancel()
 return result

def main():
 loop = asyncio.get_event_loop() # 獲取事件循環(huán)引用
 # 驅(qū)動(dòng)supervisor 協(xié)程,讓它運(yùn)行完畢;這個(gè)協(xié)程的返回值是這次調(diào)用的返回值
 result = loop.run_until_complete(supervisor())
 loop.close()
 print('Answer', result)
if __name__ == '__main__':
 main()

除非想阻塞主線程,從而凍結(jié)事件循環(huán)或整個(gè)應(yīng)用,否則不要再 asyncio 協(xié)程中使用 time.sleep().

如果協(xié)程需要在一段時(shí)間內(nèi)什么都不做,應(yīng)該使用 yield from asyncio.sleep(DELAY)

使用 @asyncio.coroutine 裝飾器不是強(qiáng)制要求,但建議這么做因?yàn)檫@樣能在代碼中突顯協(xié)程,如果還沒(méi)從中產(chǎn)出值,協(xié)程就把垃圾回收了(意味著操作未完成,可能有缺陷),可以發(fā)出警告。這個(gè)裝飾器不會(huì)預(yù)激協(xié)程。

這兩段代碼的執(zhí)行結(jié)果基本相同,現(xiàn)在我們看一下兩段代碼的核心代碼 supervisor 主要區(qū)別:

  1. asyncio.Task 對(duì)象差不多與 threading.Thread 對(duì)象等效(Task 對(duì)象像是實(shí)現(xiàn)寫(xiě)作時(shí)多任務(wù)的庫(kù)中的綠色線程
  2. Task 對(duì)象用于驅(qū)動(dòng)協(xié)程,Thread 對(duì)象用于調(diào)用可調(diào)用的對(duì)象
  3. Task 對(duì)象不由自己動(dòng)手實(shí)例化,而是通過(guò)把協(xié)程傳給 asyncio.async(...) 函數(shù)或 loop.create_task(...) 方法獲取
  4. 獲取的Task 對(duì)象已經(jīng)排定了運(yùn)行時(shí)間;Thread 實(shí)例必須調(diào)用start方法,明確告知它運(yùn)行
  5. 在線程版supervisor函數(shù)中,slow_function 是普通的函數(shù),由線程直接調(diào)用,而異步版的slow_function 函數(shù)是協(xié)程,由yield from 驅(qū)動(dòng)。
  6. 沒(méi)有API能從外部終止線程,因?yàn)榫€程隨時(shí)可能被中斷。而如果想終止任務(wù),可以使用Task.cancel() 實(shí)例方法,在協(xié)程內(nèi)部拋出CancelledError 異常。協(xié)程可以在暫停的yield 處捕獲這個(gè)異常,處理終止請(qǐng)求
  7. supervisor 協(xié)程必須在main 函數(shù)中由loop.run_until_complete 方法執(zhí)行。

協(xié)程和線程相比關(guān)鍵的一個(gè)優(yōu)點(diǎn)是,線程必須記住保留鎖,去保護(hù)程序中的重要部分,防止多步操作再執(zhí)行的過(guò)程中中斷,防止山水處于于曉狀態(tài)協(xié)程默認(rèn)會(huì)做好保護(hù),我們必須顯式產(chǎn)出(使用yield 或 yield from 交出控制權(quán))才能讓程序的余下部分運(yùn)行。

asyncio.Future:故意不阻塞

asynci.Future 類(lèi)與 concurrent.futures.Future 類(lèi)的接口基本一致,不過(guò)實(shí)現(xiàn)方式不同,不可互換。

上一篇[python并發(fā) 1:使用 futures 處理并發(fā)]()我們介紹過(guò) concurrent.futures.Future 的 future,在 concurrent.futures.Future 中,future只是調(diào)度執(zhí)行某物的結(jié)果。在 asyncio 包中,BaseEventLoop.create_task(...) 方法接收一個(gè)協(xié)程,排定它的運(yùn)行時(shí)間,然后返回一個(gè)asyncio.Task 實(shí)例(也是asyncio.Future 類(lèi)的實(shí)例,因?yàn)?Task 是 Future 的子類(lèi),用于包裝協(xié)程。(在 concurrent.futures.Future 中,類(lèi)似的操作是Executor.submit(...))。

與concurrent.futures.Future 類(lèi)似,asyncio.Future 類(lèi)也提供了

  1. .done() 返回布爾值,表示Future 是否已經(jīng)執(zhí)行
  2. .add_done_callback() 這個(gè)方法只有一個(gè)參數(shù),類(lèi)型是可調(diào)用對(duì)象,F(xiàn)uture運(yùn)行結(jié)束后會(huì)回調(diào)這個(gè)對(duì)象。
  3. .result() 這個(gè)方法沒(méi)有參數(shù),因此不能指定超時(shí)時(shí)間。 如果調(diào)用 .result() 方法時(shí)期還沒(méi)有運(yùn)行完畢,會(huì)拋出asyncio.InvalidStateError 異常。

對(duì)應(yīng)的 concurrent.futures.Future 類(lèi)中的 Future 運(yùn)行結(jié)束后調(diào)用result(), 會(huì)返回可調(diào)用對(duì)象的結(jié)果或者拋出執(zhí)行可調(diào)用對(duì)象時(shí)拋出的異常,如果是 Future 沒(méi)有運(yùn)行結(jié)束時(shí)調(diào)用 f.result()方法,這時(shí)會(huì)阻塞調(diào)用方所在的線程,直到有結(jié)果返回。此時(shí)result 方法還可以接收 timeout 參數(shù),如果在指定的時(shí)間內(nèi) Future 沒(méi)有運(yùn)行完畢,會(huì)拋出 TimeoutError 異常。

我們使用asyncio.Future 時(shí), 通常使用yield from,從中獲取結(jié)果,而不是使用 result()方法 yield from 表達(dá)式在暫停的協(xié)程中生成返回值,回復(fù)執(zhí)行過(guò)程。

asyncio.Future 類(lèi)的目的是與 yield from 一起使用,所以通常不需要使用以下方法:

  1. 不需調(diào)用 my_future.add_down_callback(...), 因?yàn)榭梢灾苯影严朐?future 運(yùn)行結(jié)束后的操作放在協(xié)程中 yield from my_future 表達(dá)式的后邊。(因?yàn)閰f(xié)程可以暫停和恢復(fù)函數(shù))
  2. 無(wú)需調(diào)用 my_future.result(), 因?yàn)?yield from 產(chǎn)生的結(jié)果就是(result = yield from my_future)

在 asyncio 包中,可以使用yield from 從asyncio.Future 對(duì)象中產(chǎn)出結(jié)果。這也就意味著我們可以這么寫(xiě):

res = yield from foo() # foo 可以是協(xié)程函數(shù),也可以是返回 Future 或 task 實(shí)例的普通函數(shù)

asyncio.async(...)* 函數(shù)

asyncio.async(coro_or_future, *, loop=None)

這個(gè)函數(shù)統(tǒng)一了協(xié)程和Future: 第一個(gè)參數(shù)可以是二者中的任意一個(gè)。如果是Future 或者 Task 對(duì)象,就直接返回,如果是協(xié)程,那么async 函數(shù)會(huì)自動(dòng)調(diào)用 loop.create_task(...) 方法創(chuàng)建 Task 對(duì)象。 loop 參數(shù)是可選的,用于傳入事件循環(huán); 如果沒(méi)有傳入,那么async函數(shù)會(huì)通過(guò)調(diào)用asyncio.get_event_loop() 函數(shù)獲取循環(huán)對(duì)象。

BaseEventLoop.create_task(coro)

這個(gè)方法排定協(xié)程的執(zhí)行時(shí)間,返回一個(gè) asyncio.Task 對(duì)象。如果在自定義的BaseEventLoop 子類(lèi)上調(diào)用,返回的對(duì)象可能是外部庫(kù)中與Task類(lèi)兼容的某個(gè)類(lèi)的實(shí)例。

BaseEventLoop.create_task() 方法只在Python3.4.2 及以上版本可用。 Python3.3 只能使用 asyncio.async(...)函數(shù)。
如果想在Python控制臺(tái)或者小型測(cè)試腳本中實(shí)驗(yàn)future和協(xié)程,可以使用下面的片段:

import asyncio
def run_sync(coro_or_future):
 loop = asyncio.get_event_loop()
 return loop.run_until_complete(coro_or_future)
a = run_sync(some_coroutine())

使用asyncio 和 aiohttp 包下載

現(xiàn)在,我們了解了asyncio 的基礎(chǔ)知識(shí),是時(shí)候使用asyncio 來(lái)重寫(xiě)我們 上一篇 [python并發(fā) 1:使用 futures 處理并發(fā)]() 下載國(guó)旗的腳本了。

先看一下代碼:

import asyncio
import aiohttp # 需要pip install aiohttp
from flags import save_flag, show, main, BASE_URL
@asyncio.coroutine # 我們知道,協(xié)程應(yīng)該使用 asyncio.coroutine 裝飾
def get_flag(cc):
 url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
  # 阻塞的操作通過(guò)協(xié)程實(shí)現(xiàn),客戶代碼通過(guò)yield from 把指責(zé)委托給協(xié)程,以便異步操作
 resp = yield from aiohttp.request('GET', url) 
 # 讀取也是異步操作
 image = yield from resp.read()
 return image

@asyncio.coroutine
def download_one(cc): # 這個(gè)函數(shù)也必須是協(xié)程,因?yàn)橛玫搅藋ield from
 image = yield from get_flag(cc) 
 show(cc)
 save_flag(image, cc.lower() + '.gif')
 return cc

def download_many(cc_list):
 loop = asyncio.get_event_loop() # 獲取事件序號(hào)底層實(shí)現(xiàn)的引用
 to_do = [download_one(cc) for cc in sorted(cc_list)] # 調(diào)用download_one 獲取各個(gè)國(guó)旗,構(gòu)建一個(gè)生成器對(duì)象列表
 # 雖然函數(shù)名稱(chēng)是wait 但它不是阻塞型函數(shù),wait 是一個(gè)協(xié)程,等傳給他的所有協(xié)程運(yùn)行完畢后結(jié)束
 wait_coro = asyncio.wait(to_do)
 res, _ = loop.run_until_complete(wait_coro) # 執(zhí)行事件循環(huán),知道wait_coro 運(yùn)行結(jié)束;事件循環(huán)運(yùn)行的過(guò)程中,這個(gè)腳本會(huì)在這里阻塞。
 loop.close() # 關(guān)閉事件循環(huán)
 return len(res)
if __name__ == '__main__':
 main(download_many)

這段代碼的運(yùn)行簡(jiǎn)述如下:

  1. 在download_many 函數(shù)獲取一個(gè)事件循環(huán),處理調(diào)用download_one 函數(shù)生成的幾個(gè)協(xié)程對(duì)象
  2. asyncio 事件循環(huán)一次激活各個(gè)協(xié)程
  3. 客戶代碼中的協(xié)程(get_flag)使用 yield from 把指責(zé)委托給庫(kù)里的協(xié)程(aiohttp.request)時(shí),控制權(quán)交還給事件循環(huán),執(zhí)行之前排定的協(xié)程
  4. 事件循環(huán)通過(guò)基于回調(diào)的底層API,在阻塞的操作執(zhí)行完畢后獲得通知。
  5. 獲得通知后,主循環(huán)把結(jié)果發(fā)給暫停的協(xié)程
  6. 協(xié)程向前執(zhí)行到下一個(gè)yield from 表達(dá)式,例如 get_flag 函數(shù)的yield from resp.read()。事件循環(huán)再次得到控制權(quán),重復(fù)第4~6步,直到循環(huán)終止。

download_many 函數(shù)中,我們使用了 asyncio.wait(...) 函數(shù),這個(gè)函數(shù)是一個(gè)協(xié)程,協(xié)程的參數(shù)是一個(gè)由future或者協(xié)程構(gòu)成的可迭代對(duì)象;wait 會(huì)分別把各個(gè)協(xié)程包裝進(jìn)一個(gè)Task對(duì)象。最終的結(jié)果是,wait 處理的所有對(duì)象都通過(guò)某種方式變成Future 類(lèi)的實(shí)例。

wait 是協(xié)程函數(shù),因此,返回的是一個(gè)協(xié)程或者生成器對(duì)象;waite_coro 變量中存儲(chǔ)的就是這種對(duì)象

loop.run_until_complete 方法的參數(shù)是一個(gè)future 或協(xié)程。如果是協(xié)程,run_until_complete 方法與 wait 函數(shù)一樣,把協(xié)程包裝進(jìn)一個(gè)Task 對(duì)象中。這里 run_until_complete 方法把 wait_coro 包裝進(jìn)一個(gè)Task 對(duì)象中,由yield from 驅(qū)動(dòng)。wait_coro 運(yùn)行結(jié)束后返回兩個(gè)參數(shù),第一個(gè)參數(shù)是結(jié)束的future 第二個(gè)參數(shù)是未結(jié)束的future。

<section class="caption">wait</section>有兩個(gè)命名參數(shù),timeout 和 return_when 如果設(shè)置了可能會(huì)返回未結(jié)束的future。

有一點(diǎn)你可能也注意到了,我們重寫(xiě)了get_flags 函數(shù),是因?yàn)橹坝玫降?requests 庫(kù)執(zhí)行的是阻塞型I/O操作。為了使用 asyncio 包,我們必須把函數(shù)改成異步版。

小技巧

如果你覺(jué)得 使用了協(xié)程后代碼難以理解,可以采用 Python之父(Guido van Rossum)的建議,假裝沒(méi)有yield from。

已上邊這段代碼為例:

@asyncio.coroutine
def get_flag(cc):
 url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
 resp = yield from aiohttp.request('GET', url) 
 image = yield from resp.read()
 return image
# 把yield form 去掉
def get_flag(cc):
 url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
 resp = aiohttp.request('GET', url) 
 image = resp.read()
 return image

# 現(xiàn)在是不是清晰多了

知識(shí)點(diǎn)

在asyncio 包的API中使用 yield from 時(shí),有個(gè)細(xì)節(jié)要注意:

使用asyncio包時(shí),我們編寫(xiě)的異步代碼中包含由asyncio本身驅(qū)動(dòng)的協(xié)程(委派生成器),而生成器最終把指責(zé)委托給asyncio包或者第三方庫(kù)中的協(xié)程。這種處理方式相當(dāng)于架起了管道,讓asyncio事件循環(huán)驅(qū)動(dòng)執(zhí)行底層異步I/O的庫(kù)函數(shù)。

避免阻塞型調(diào)用

我們先看一個(gè)圖,這個(gè)圖顯示了電腦從不同存儲(chǔ)介質(zhì)中讀取數(shù)據(jù)的延遲情況:

通過(guò)這個(gè)圖,我們可以看到,阻塞型調(diào)用對(duì)于CPU來(lái)說(shuō)是巨大的浪費(fèi)。有什么辦法可以避免阻塞型調(diào)用中止整個(gè)應(yīng)用程序么?

有兩種方法:

  1. 在單獨(dú)的線程中運(yùn)行各個(gè)阻塞型操作
  2. 把每個(gè)阻塞型操作轉(zhuǎn)化成非阻塞的異步調(diào)用使用

當(dāng)然我們推薦第二種方案,因?yàn)榈谝环N方案中如果每個(gè)連接都使用一個(gè)線程,成本太高。

第二種我們可以使用把生成器當(dāng)做協(xié)程使用的方式實(shí)現(xiàn)異步編程。對(duì)事件循環(huán)來(lái)說(shuō),調(diào)用回調(diào)與在暫停的協(xié)程上調(diào)用 .send() 方法效果差不多。各個(gè)暫停的協(xié)程消耗的內(nèi)存比線程小的多。

現(xiàn)在,你應(yīng)該能理解為什么 flags_asyncio.py 腳本比 flags.py 快的多了吧。

因?yàn)閒lags.py 是依次同步下載,每次下載都要用幾十億個(gè)CPU周期等待結(jié)果。而在flags_asyncio.py中,在download_many 函數(shù)中調(diào)用loop.run_until_complete 方法時(shí),事件循環(huán)驅(qū)動(dòng)各個(gè)download_one 協(xié)程,運(yùn)行到y(tǒng)ield from 表達(dá)式出,那個(gè)表達(dá)式又驅(qū)動(dòng)各個(gè) get_flag 協(xié)程,運(yùn)行到第一個(gè)yield from 表達(dá)式處,調(diào)用 aiohttp.request()函數(shù)。這些調(diào)用不會(huì)阻塞,因此在零點(diǎn)幾秒內(nèi)所有請(qǐng)求都可以全部開(kāi)始。

改進(jìn) asyncio 下載腳本

現(xiàn)在我們改進(jìn)一下上邊的 flags_asyncio.py,在其中添加上異常處理,計(jì)數(shù)器

import asyncio
import collections
from collections import namedtuple
from enum import Enum
import aiohttp
from aiohttp import web
from flags import save_flag, show, main, BASE_URL
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000
Result = namedtuple('Result', 'status data')
HTTPStatus = Enum('Status', 'ok not_found error')

# 自定義異常用于包裝其他HTTP貨網(wǎng)絡(luò)異常,并獲取country_code,以便報(bào)告錯(cuò)誤
class FetchError(Exception):
 def __init__(self, country_code):
  self.country_code = country_code

@asyncio.coroutine
def get_flag(cc):
 # 此協(xié)程有三種返回結(jié)果:
 # 1. 返回下載到的圖片
 # 2. HTTP 響應(yīng)為404 時(shí),拋出web.HTTPNotFound 異常
 # 3. 返回其他HTTP狀態(tài)碼時(shí), 拋出aiohttp.HttpProcessingError
 url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
 resp = yield from aiohttp.request('GET', url)
 if resp.status == 200:
  image = yield from resp.read()
  return image
 elif resp.status == 404:
  raise web.HttpNotFound()
 else:
  raise aiohttp.HttpProcessionError(
   code=resp.status, message=resp.reason,
   headers=resp.headers
  )

@asyncio.coroutine
def download_one(cc, semaphore):
 # semaphore 參數(shù)是 asyncio.Semaphore 類(lèi)的實(shí)例
 # Semaphore 類(lèi)是同步裝置,用于限制并發(fā)請(qǐng)求
 try:
  with (yield from semaphore):
    # 在yield from 表達(dá)式中把semaphore 當(dāng)成上下文管理器使用,防止阻塞整個(gè)系統(tǒng)
    # 如果semaphore 計(jì)數(shù)器的值是所允許的最大值,只有這個(gè)協(xié)程會(huì)阻塞
    image = yield from get_flag(cc)
    # 退出with語(yǔ)句后 semaphore 計(jì)數(shù)器的值會(huì)遞減,
    # 解除阻塞可能在等待同一個(gè)semaphore對(duì)象的其他協(xié)程實(shí)例
 except web.HTTPNotFound:
  status = HTTPStatus.not_found
  msg = 'not found'
 except Exception as exc:
  raise FetchError(cc) from exc
 else:
  save_flag(image, cc.lower() + '.gif')
  status = HTTPStatus.ok
  msg = 'ok'
 return Result(status, cc)

@asyncio.coroutine
def downloader_coro(cc_list):
 counter = collections.Counter()
 # 創(chuàng)建一個(gè) asyncio.Semaphore 實(shí)例,最多允許激活MAX_CONCUR_REQ個(gè)使用這個(gè)計(jì)數(shù)器的協(xié)程
 semaphore = asyncio.Semaphore(MAX_CONCUR_REQ)
 # 多次調(diào)用 download_one 協(xié)程,創(chuàng)建一個(gè)協(xié)程對(duì)象列表
 to_do = [download_one(cc, semaphore) for cc in sorted(cc_list)]
 # 獲取一個(gè)迭代器,這個(gè)迭代器會(huì)在future運(yùn)行結(jié)束后返回future
 to_do_iter = asyncio.as_completed(to_do)
 for future in to_do_iter:
  # 迭代允許結(jié)束的 future 
  try:
   res = yield from future # 獲取asyncio.Future 對(duì)象的結(jié)果(也可以調(diào)用future.result)
  except FetchError as exc:
   # 拋出的異常都包裝在FetchError 對(duì)象里
   country_code = exc.country_code
   try:
    # 嘗試從原來(lái)的異常 (__cause__)中獲取錯(cuò)誤消息
    error_msg = exc.__cause__.args[0]
   except IndexError:
    # 如果在原來(lái)的異常中找不到錯(cuò)誤消息,使用所連接異常的類(lèi)名作為錯(cuò)誤消息
    error_msg = exc.__cause__.__class__.__name__
   if error_msg:
    msg = '*** Error for {}: {}'
    print(msg.format(country_code, error_msg))
   status = HTTPStatus.error
  else:
   status = res.status
  counter[status] += 1
 return counter

def download_many(cc_list):
 loop = asyncio.get_event_loop()
 coro = downloader_coro(cc_list)
 counts = loop.run_until_complete(coro)
 loop.close()
 return counts
if __name__ == '__main__':
 main(download_many)

由于協(xié)程發(fā)起的請(qǐng)求速度較快,為了防止向服務(wù)器發(fā)起太多的并發(fā)請(qǐng)求,使服務(wù)器過(guò)載,我們?cè)赿ownload_coro 函數(shù)中創(chuàng)建一個(gè)asyncio.Semaphore 實(shí)例,然后把它傳給download_one 函數(shù)。

<secion class="caption">Semaphore</section> 對(duì)象維護(hù)著一個(gè)內(nèi)部計(jì)數(shù)器,若在對(duì)象上調(diào)用 .acquire() 協(xié)程方法,計(jì)數(shù)器則遞減;若在對(duì)象上調(diào)用 .release() 協(xié)程方法,計(jì)數(shù)器則遞增。計(jì)數(shù)器的值是在初始化的時(shí)候設(shè)定。

如果計(jì)數(shù)器大于0,那么調(diào)用 .acquire() 方法不會(huì)阻塞,如果計(jì)數(shù)器為0, .acquire() 方法會(huì)阻塞調(diào)用這個(gè)方法的協(xié)程,直到其他協(xié)程在同一個(gè) Semaphore 對(duì)象上調(diào)用 .release() 方法,讓計(jì)數(shù)器遞增。

在上邊的代碼中,我們并沒(méi)有手動(dòng)調(diào)用 .acquire() 或 .release() 方法,而是在 download_one 函數(shù)中 把 semaphore 當(dāng)做上下文管理器使用:

with (yield from semaphore):
 image = yield from get_flag(cc)

這段代碼保證,任何時(shí)候都不會(huì)有超過(guò) MAX_CONCUR_REQ 個(gè) get_flag 協(xié)程啟動(dòng)。

使用 asyncio.as_completed 函數(shù)

因?yàn)橐褂?yield from 獲取 asyncio.as_completed 函數(shù)產(chǎn)出的future的結(jié)果,所以 as_completed 函數(shù)秩序在協(xié)程中調(diào)用。由于 download_many 要作為參數(shù)傳給非協(xié)程的main 函數(shù),我已我們添加了一個(gè)新的 downloader_coro 協(xié)程,讓download_many 函數(shù)只用于設(shè)置事件循環(huán)。

使用Executor 對(duì)象,防止阻塞事件循環(huán)

現(xiàn)在我們回去看下上邊關(guān)于電腦從不同存儲(chǔ)介質(zhì)讀取數(shù)據(jù)的延遲情況圖,有一個(gè)實(shí)時(shí)需要注意,那就是訪問(wèn)本地文件系統(tǒng)也會(huì)阻塞。

上邊的代碼中,save_flag 函數(shù)阻塞了客戶代碼與 asyncio 事件循環(huán)公用的唯一線程,因此保存文件時(shí),整個(gè)應(yīng)用程序都會(huì)暫停。為了避免這個(gè)問(wèn)題,可以使用事件循環(huán)對(duì)象的 run_in_executor 方法。

asyncio 的事件循環(huán)在后臺(tái)維護(hù)著一個(gè)ThreadPoolExecutor 對(duì)象,我們可以調(diào)用 run_in_executor 方法,把可調(diào)用的對(duì)象發(fā)給它執(zhí)行。

下邊是我們改動(dòng)后的代碼:

@asyncio.coroutine
def download_one(cc, semaphore):
 try:
  with (yield from semaphore):
   image = yield from get_flag(cc)
 except web.HTTPNotFound:
  status = HTTPStatus.not_found
  msg = 'not found'
 except Exception as exc:
  raise FetchError(cc) from exc
 else:
  # 這里是改動(dòng)部分
  loop = asyncio.get_event_loop() # 獲取事件循環(huán)的引用
  loop.run_in_executor(None, save_flag, image, cc.lower() + '.gif')
  status = HTTPStatus.ok
  msg = 'ok'
 return Result(status, cc)

run_in_executor 方法的第一個(gè)參數(shù)是Executor 實(shí)例;如果設(shè)為None,使用事件循環(huán)的默認(rèn) ThreadPoolExecutor 實(shí)例。

從回調(diào)到future到協(xié)程

在接觸協(xié)程之前,我們可能對(duì)回調(diào)有一定的認(rèn)識(shí),那么和回調(diào)相比,協(xié)程有什么改進(jìn)呢?

python中的回調(diào)代碼樣式:

def stage1(response1):
 request2 = step1(response1)
 api_call2(request2, stage2)
 
def stage2(response2):
 request3 = step3(response3)
 api_call3(request3, stage3) 

 def stage3(response3):
  step3(response3) 
api_call1(request1, stage1)

上邊的代碼的缺陷:

  1. 容易出現(xiàn)回調(diào)地獄
  2. 代碼難以閱讀

在這個(gè)問(wèn)題上,協(xié)程能發(fā)揮很大的作用。如果換成協(xié)程和yield from 結(jié)果做的異步代碼,代碼示例如下:

@asyncio.coroutine
def three_stages(request1):
 response1 = yield from api_call1(request1)
 request2 = step1(response1)
 response2 = yield from api_call2(requests)
 request3 = step2(response2)
 response3 = yield from api_call3(requests)
 step3(response3) 
 
loop.create_task(three_stages(request1)

和之前的代碼相比,這個(gè)代碼就容易理解多了。如果異步調(diào)用 api_call1,api_call2,api_call3 會(huì)拋出異常,那么可以把相應(yīng)的 yield from 表達(dá)式放在 try/except 塊中處理異常。

使用協(xié)程必須習(xí)慣 yield from 表達(dá)式,并且協(xié)程不能直接調(diào)用,必須顯式的排定協(xié)程的執(zhí)行時(shí)間,或在其他排定了執(zhí)行時(shí)間的協(xié)程中使用yield from 表達(dá)式吧它激活。如果不使用 loop.create_task(three_stages(request1)),那么什么都不會(huì)發(fā)生。

下面我們用一個(gè)實(shí)際的例子來(lái)演示一下:

每次下載發(fā)起多次請(qǐng)求

我們修改一下上邊下載國(guó)旗的代碼,使在下載國(guó)旗的同時(shí)還可以獲取國(guó)家名稱(chēng)在保存圖片的時(shí)候使用。
我們使用協(xié)程和yield from 解決這個(gè)問(wèn)題:

@asyncio.coroutine
def http_get(url):
 resp = yield from aiohttp.request('GET', url)
 if resp.status == 200:
  ctype = resp.headers.get('Content-type', '').lower()
  if 'json' in ctype or url.endswith('json'):
   data = yield from resp.json()
  else:
   data = yield from resp.read()
  return data
 elif resp.status == 404:
  raise web.HttpNotFound()
 else:
  raise aiohttp.HttpProcessionError(
   code=resp.status, message=resp.reason,
   headers=resp.headers)

@asyncio.coroutine
def get_country(cc):
 url = "{}/{cc}/metadata.json".format(BASE_URL, cc=cc.lower())
 metadata = yield from http_get(url)
 return metadata['country']

@asyncio.coroutine
def get_flag(cc):
 url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
 return (yield from http_get(url))

@asyncio.coroutine
def download_one(cc, semaphore):
 try:
  with (yield from semaphore):
   image = yield from get_flag(cc)
  with (yield from semaphore):
   country = yield from get_country(cc)
 except web.HTTPNotFound:
  status = HTTPStatus.not_found
  msg = 'not found'
 except Exception as exc:
  raise FetchError(cc) from exc
 else:
  country = country.replace(' ', '_')
  filename = '{}--{}.gif'.format(country, cc)
  print(filename)
  loop = asyncio.get_event_loop()
  loop.run_in_executor(None, save_flag, image, filename)
  status = HTTPStatus.ok
  msg = 'ok'
 return Result(status, cc)

在這段代碼中,我們?cè)赿ownload_one 函數(shù)中分別在 semaphore 控制的兩個(gè)with 塊中調(diào)用get_flag 和 get_country,是為了節(jié)約時(shí)間。

get_flag 的return 語(yǔ)句在外層加上括號(hào),是因?yàn)?) 的運(yùn)算符優(yōu)先級(jí)高,會(huì)先執(zhí)行括號(hào)內(nèi)的yield from 語(yǔ)句 返回的結(jié)果。如果不加 會(huì)報(bào)句法錯(cuò)誤

加() ,相當(dāng)于

image = yield from http_get(url)
return image

如果不加(),那么程序會(huì)在 yield from 處中斷,交出控制權(quán),這時(shí)使用return 會(huì)報(bào)句法錯(cuò)誤。

總結(jié)

這一篇我們討論了:

  1. 對(duì)比了一個(gè)多線程程序和asyncio版,說(shuō)明了多線程和異步任務(wù)之間的關(guān)系
  2. 比較了 asyncio.Future 類(lèi) 和 concurrent.futures.Future 類(lèi)的區(qū)別
  3. 如何使用異步編程管理網(wǎng)絡(luò)應(yīng)用中的高并發(fā)
  4. 在異步編程中,與回調(diào)相比,協(xié)程顯著提升性能的方式

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • 利用pyuic5將ui文件轉(zhuǎn)換為py文件的方法

    利用pyuic5將ui文件轉(zhuǎn)換為py文件的方法

    今天小編就為大家分享一篇利用pyuic5將ui文件轉(zhuǎn)換為py文件的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2019-06-06
  • Python利用format函數(shù)實(shí)現(xiàn)對(duì)齊打印(左對(duì)齊、右對(duì)齊與居中對(duì)齊)

    Python利用format函數(shù)實(shí)現(xiàn)對(duì)齊打印(左對(duì)齊、右對(duì)齊與居中對(duì)齊)

    format是字符串內(nèi)嵌的一個(gè)方法,用于格式化字符串,下面這篇文章主要給大家介紹了關(guān)于Python利用format函數(shù)實(shí)現(xiàn)對(duì)齊打印(左對(duì)齊、右對(duì)齊與居中對(duì)齊)的相關(guān)資料,需要的朋友可以參考下
    2022-04-04
  • python格式的Caffe圖片數(shù)據(jù)均值計(jì)算學(xué)習(xí)

    python格式的Caffe圖片數(shù)據(jù)均值計(jì)算學(xué)習(xí)

    這篇文章主要為大家介紹了python格式的Caffe圖片數(shù)據(jù)均值計(jì)算學(xué)習(xí)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-06-06
  • Ruby使用eventmachine為HTTP服務(wù)器添加文件下載功能

    Ruby使用eventmachine為HTTP服務(wù)器添加文件下載功能

    這篇文章主要介紹了Ruby使用eventmachine為HTTP服務(wù)器添加文件下載功能的實(shí)例,同時(shí)作者也分享了Windows上eventmachine安裝報(bào)錯(cuò)問(wèn)題的解決方法,需要的朋友可以參考下
    2016-04-04
  • OpenCV中resize函數(shù)插值算法的實(shí)現(xiàn)過(guò)程(五種)

    OpenCV中resize函數(shù)插值算法的實(shí)現(xiàn)過(guò)程(五種)

    最新版OpenCV2.4.7中,cv::resize函數(shù)有五種插值算法:最近鄰、雙線性、雙三次、基于像素區(qū)域關(guān)系、蘭索斯插值。感興趣的可以了解一下
    2021-06-06
  • Python切片索引用法示例

    Python切片索引用法示例

    這篇文章主要介紹了Python切片索引用法,結(jié)合實(shí)例形式詳細(xì)分析了Python切片索引的常見(jiàn)使用方法與操作注意事項(xiàng),需要的朋友可以參考下
    2018-05-05
  • Django渲染Markdown文章目錄的方法示例

    Django渲染Markdown文章目錄的方法示例

    這篇文章主要介紹了Django渲染Markdown文章目錄的方法示例,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2019-01-01
  • python操作cfg配置文件方式

    python操作cfg配置文件方式

    今天小編就為大家分享一篇python操作cfg配置文件方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2019-12-12
  • Python如何判斷字符串是否僅包含數(shù)字

    Python如何判斷字符串是否僅包含數(shù)字

    在用Python進(jìn)行數(shù)據(jù)處理的時(shí)候,經(jīng)常會(huì)遇到DataFrame中的某一列本應(yīng)該是數(shù)值類(lèi)型,但由于數(shù)據(jù)不規(guī)范導(dǎo)致在字段中夾雜了非數(shù)值類(lèi)型,本文就介紹了Python如何判斷字符串是否僅包含數(shù)字,感興趣的可以了解一下
    2022-03-03
  • 基于Python實(shí)現(xiàn)隨機(jī)點(diǎn)名系統(tǒng)的示例代碼

    基于Python實(shí)現(xiàn)隨機(jī)點(diǎn)名系統(tǒng)的示例代碼

    在某些難以抉擇得時(shí)候,我們經(jīng)常要用外力來(lái)幫助我們做出選擇,比如,梁山出征方臘前沙場(chǎng)點(diǎn)兵,挑選先鋒的場(chǎng)景。所以本文就來(lái)用Python做個(gè)隨機(jī)點(diǎn)名系統(tǒng)吧,需要的可以參考一下
    2023-04-04

最新評(píng)論