詳解Python如何使用并發(fā)模型編程
關(guān)于什么是并發(fā)模型,我在這里引用 Go 語言聯(lián)合創(chuàng)造者 Rob Pike 的一段話:
并發(fā)是指一次處理多件事。并行是指一次做多件事。二者不同,但是有聯(lián)系。一個關(guān)于結(jié)構(gòu),一個關(guān)于執(zhí)行。并發(fā)用于制定方案,用來解決可能(但未必)并行的問題。
在不涉及并發(fā)概念的情況下,一個單進程單線程的程序執(zhí)行情況可能是這樣的:調(diào)用一個函數(shù),發(fā)出調(diào)用的代碼開始等待函數(shù)執(zhí)行完成,直到函數(shù)返回結(jié)果,如果函數(shù)拋出異常,則可以把調(diào)用函數(shù)的代碼放到 try/except
語句塊中,來捕獲和處理異常。
但是,當涉及到并發(fā)時,情況就沒這么簡單了。在啟用多線程(或多進程)后,你無法在一個線程(或進程)中知道另一個線程(或進程)被調(diào)用的函數(shù)何時執(zhí)行完成,也無法輕松得知函數(shù)調(diào)用結(jié)果或捕獲異常。只能采用某種通知的方式,來進行線程(或進程)間通信,這可能是一個信號,也可能是一個消息隊列等。
本文主要講解如何讓 Python 能夠同時處理多個任務(wù),即如何使用并發(fā)模型編程。
目標
我們將要實現(xiàn)一個旋轉(zhuǎn)指針程序,啟動一個程序,阻塞 3 秒鐘(模擬耗時任務(wù)),在這期間,終端展示字符動畫,讓用戶知道程序仍在執(zhí)行,并沒有停止,3 秒后程序打印耗時任務(wù)的計算結(jié)果并退出。
實現(xiàn)好的程序效果如下:
這有點像下載進度條,因為打印旋轉(zhuǎn)指針和耗時任務(wù)是“同時”進行的,這種場景只能通過并發(fā)模型來實現(xiàn)。
我們將分別使用多線程、多進程以及協(xié)程來實現(xiàn)這個程序,以此來演示 Python 的的并發(fā)模型用法。
多線程版
第一版旋轉(zhuǎn)指針程序使用 Python 多線程來編寫,首先,我們需要定義兩個函數(shù) spin
、slow
分別用來實現(xiàn)旋轉(zhuǎn)指針和模擬耗時任務(wù)(比如從網(wǎng)上下載一個文件)。
import itertools import time from threading import Thread, Event def spin(msg: str, done: Event) -> None: for char in itertools.cycle(r'\|/-'): status = f'\r{char} {msg}' print(status, end='', flush=True) if done.wait(0.1): break blanks = ' ' * len(status) print(f'\r{blanks}\r', end='') def slow() -> int: time.sleep(3) return 42
threading
模塊提供多線程支持,Thread
實例用來管理一個新的線程,Event
可以用來進行線程間通信。
spin
函數(shù)將作為一個任務(wù)在單獨的線程中執(zhí)行,它接收兩個參數(shù) msg
、done
,傳遞進來的 msg
將會跟隨旋轉(zhuǎn)指針一起被打印,done
參數(shù)類型為 threading.Event
,用來實現(xiàn)多個線程間的通信,以此來同步任務(wù)狀態(tài)。
itertools.cycle(r'\|/-')
是一個無限迭代器,一次產(chǎn)出一個字符,不停的迭代。比如用 for
遍歷 itertools.cycle('123')
,將得到無限迭代的數(shù)據(jù) 123123123...
。這里 \|/-
字符不停迭代并被打印,就會產(chǎn)生旋轉(zhuǎn)指針的效果。
打印的 status
字符串第一個字符為 \r
,可以實現(xiàn)將光標移動到行首,這是一個使用文本在控制臺實現(xiàn)動畫的小技巧。
接下來的 done.wait(0.1)
是這個函數(shù)的關(guān)鍵代碼,它是主線程與執(zhí)行當前函數(shù)的子線程之間通信的橋梁。done.wait
方法簽名為 Event.wait(self, timeout=None)
,該方法等待 timeout
指定的時間后返回 False
,我們在這里指定為 0.1 秒。如果在其他線程中使用 Event.set()
設(shè)置了這個事件,則當前線程該方法將立即返回 True
,此時 for
循環(huán)就會被 break
掉。
spin
函數(shù)在退出前,還會打印幾個空格來實現(xiàn)清空當前行打印內(nèi)容的效果,并且最終還將光標移動到行首。
slow
函數(shù)使用 time.sleep(3)
暫停 3 秒,模擬耗時操作,這個函數(shù)將像我們往常編寫的單線程代碼一樣在主線程中執(zhí)行。
接下來我們要編寫多線程代碼來分別調(diào)用 spin
和 slow
兩個函數(shù),完成這個旋轉(zhuǎn)指針程序。
def supervisor() -> int: done = Event() spinner = Thread(target=spin, args=('thinking!', done)) print(f'spinner object: {spinner}') spinner.start() result = slow() done.set() spinner.join() return result def main() -> None: result = supervisor() print(f'Answer: {result}') if __name__ == '__main__': main()
supervisor
函數(shù)中,首先實例化了一個 Event
對象,用于多線程通信。
接著,又實例化了一個 Thread
對象,用來管理子線程,target
參數(shù)接收一個函數(shù) spin
,這個函數(shù)將在一個獨立的子線程中執(zhí)行,args
參數(shù)接收一個元組,在子線程中調(diào)用 spin
函數(shù)時,元組的各個參數(shù)將被原樣傳遞給 spin
函數(shù)。
Thread
對象必須要顯式的調(diào)用 start
方法才能啟動,所以代碼執(zhí)行到 spinner.start()
時,子線程才會真正開始執(zhí)行。子線程只會執(zhí)行 spin
函數(shù),至于下方的代碼與子線程無關(guān),都是主線程要執(zhí)行的代碼。
子線程的執(zhí)行對主線程執(zhí)行不會產(chǎn)生影響,主線程代碼會繼續(xù)往下運行,主線程調(diào)用 slow()
時會被耗時任務(wù)所阻塞。此時,子線程內(nèi)部代碼執(zhí)行不受影響,所以子線程會不停的打印旋轉(zhuǎn)指針。
等待 3 秒結(jié)束后,主線程中 slow()
函數(shù)返回結(jié)果,主線程調(diào)用 done.set()
將 Event
對象設(shè)置為 True
。此時,子線程 spin
函數(shù)內(nèi)部 done.wait(0.1)
會立即返回 True
,隨即 for
循環(huán)終止,spin
執(zhí)行完成后子線程也就退出了。
主線程不受子線程退出影響,會接著往下執(zhí)行,調(diào)用 spinner.join()
是為了等待子線程結(jié)束,主線程會阻塞在這里,保證子線程結(jié)束后才會往下執(zhí)行。顯然,子線程在執(zhí)行完 spin
函數(shù)就結(jié)束了,所以主線程代碼會繼續(xù)往下執(zhí)行。
supervisor
函數(shù)最終返回 slow
方法的返回值 result
。
入口函數(shù) main
打印 result
值后,主線程也退出了,程序終止。
以上,就是多線程版旋轉(zhuǎn)指針程序的全部邏輯了。
我們來測試下這個程序執(zhí)行效果:
多線程對象 spinner
輸出結(jié)果為 <Thread(Thread-1, initial)>
,其中 Thread-1
是線程名稱,initial
是線程狀態(tài),表示當前線程剛初始化完成,尚未啟動。
多進程版
Python 提供了 multiprocessing
來支持多進程,這個模塊的 API 基本模仿了多線程的 threading
模塊,所以有了前文的基礎(chǔ),多進程代碼也非常容易看懂。
同多線程一樣,multiprocessing
包也為多進程通信提供了 Event
對象。不同的是,threading.Event
是一個類,multiprocessing.Event
是一個函數(shù),它返回一個 synchronize.Event
類實例。所以 spin
函數(shù)簽名需要進行如下修改:
from multiprocessing import Process, Event from multiprocessing import synchronize def spin(msg: str, done: synchronize.Event) -> None: ...
spin
函數(shù)內(nèi)部代碼無需調(diào)整,只需要修改參數(shù) done
的類型注解即可。所以不難發(fā)現(xiàn) synchronize.Event
同樣支持 Event.wait(self, timeout=None)
方法。
多進程版本的 supervisor
函數(shù)也要稍作修改:
def supervisor() -> int: done = Event() spinner = Process(target=spin, args=('thinking!', done)) print(f'spinner object: {spinner}') spinner.start() result = slow() done.set() spinner.join() return result
雖然 multiprocessing.Event
和 threading.Event
類型不同,但二者用法和作用則完全相同。
這里使用 Process
實例化一個進程對象,Process
用法和 Thread
用法同樣如出一轍。
只需要對代碼做少量的改動,我們就將程序從多線程遷移到了多進程。這一點,Python 做的非常友好,掌握了多線程編程,基本上就掌握了多進程編程,我們只需要在適當?shù)臅r候,使用不同的模塊即可。
下面是多進程版本旋轉(zhuǎn)指針程序測試效果:
多進程對象 spinner
輸出結(jié)果為 <Process name='Process-1' parent=94367 initial>
,進程名稱為 Process-1
,parent
代表父進程 ID 為 94367
(即主進程 ID),initial
是進程狀態(tài),表示當前進程剛初始化完成,尚未啟動。
協(xié)程版
最后我們將使用協(xié)程實現(xiàn)旋轉(zhuǎn)指針程序,這一版本代碼改動會比較大。
Python 在 3.5 版本提供了 async
、await
關(guān)鍵字(可以參考 PEP 492),開始原生支持了協(xié)程,我們不再需要編寫難懂的 yeild from
來使用生成器實現(xiàn)協(xié)程功能了。
Python 協(xié)程通常在單線程的事件循環(huán)中運行。協(xié)程是一個可以掛起自身并在以后恢復(fù)的“函數(shù)”,async
用來定義協(xié)程,一個協(xié)程必須顯式的使用 await
關(guān)鍵字主動讓出控制權(quán),另一個協(xié)程才有機會在主事件循環(huán)的調(diào)度下并發(fā)的執(zhí)行。
協(xié)程版本旋轉(zhuǎn)指針程序需要對 spin
和 slow
兩個函數(shù)做如下修改:
import asyncio import itertools async def spin(msg: str) -> None: for char in itertools.cycle(r'\|/-'): status = f'\r{char} {msg}' print(status, end='', flush=True) try: await asyncio.sleep(0.1) except asyncio.CancelledError: break blanks = ' ' * len(status) print(f'\r{blanks}\r', end='') async def slow() -> int: await asyncio.sleep(3) return 42
首先我們使用 async def
將 spin
定義為一個協(xié)程,讓其不再是一個常規(guī)的函數(shù)。
spin
協(xié)程取消了第二個參數(shù),因為 Python 沒有為協(xié)程提供 Event
對象來進行通信,我們需要采用其他招式。
在原來使用 Event
通信的地方替換成了由 try/except
包裹的 await asyncio.sleep(0.1)
語句塊代碼。這段代碼塊有如下三個作用:
await asyncio.sleep(0.1)
的作用類似time.sleep
,可以讓程序暫停 0.1 秒。不同的是,使用await asyncio.sleep
暫停時不阻塞其他協(xié)程。- 因為這里加入了
await
關(guān)鍵字,代碼執(zhí)行到這里時,當前協(xié)程會主動讓出控制權(quán),不再繼續(xù)往下執(zhí)行,由事件循環(huán)來調(diào)度其他協(xié)程執(zhí)行。 - 如果在控制當前協(xié)程的
Task
實例中調(diào)用cancel
方法(有關(guān)Task
的內(nèi)容稍后會進行講解),await asyncio.sleep(0.1)
會拋出CancelledError
異常,這里使用try/except
捕獲異常后退出循環(huán)。這樣,我們就在多個協(xié)程間利用異常機制完成了通信,而不必借助于額外的Event
對象。
slow
函數(shù)也被改造為一個協(xié)程,其內(nèi)部原來編寫的阻塞代碼 time.sleep(3)
被替換為了 await asyncio.sleep(3)
。
可以發(fā)現(xiàn),其實協(xié)程與普通的函數(shù)在定義上差別不大,只不過多了兩個關(guān)鍵字 async
和 await
。但二者在執(zhí)行方式上大有不同,普通函數(shù)在使用 ()
運算符調(diào)用時(即 spin()
)會立刻執(zhí)行,而協(xié)程在使用 spin()
時只會創(chuàng)建一個協(xié)程對象,不會執(zhí)行。
要執(zhí)行上面兩個協(xié)程對象,我們還要對 supervisor
和 main
函數(shù)進行改造:
async def supervisor() -> int: spinner = asyncio.create_task(spin('thinking!')) print(f'spinner object: {spinner}') result = await slow() spinner.cancel() return result def main() -> None: result = asyncio.run(supervisor()) print(f'Answer: {result}') if __name__ == '__main__': main()
supervisor
函數(shù)同樣被修改為協(xié)程,spin('thinking!')
并不會像函數(shù)一樣立即執(zhí)行,只會創(chuàng)建一個協(xié)程對象,將它傳遞給 asyncio.create_task
,我們可以得到一個 asyncio.Task
對象,這個 Task
對象包裝了協(xié)程對象并調(diào)度其執(zhí)行,它還提供控制和查詢協(xié)程對象運行狀態(tài)的方法。
使用 await
關(guān)鍵字來調(diào)用 slow
協(xié)程,這將阻塞 supervisor
程序(但會讓出控制權(quán),使其他協(xié)程得以執(zhí)行),直到 slow
返回,返回結(jié)果賦值給 result
變量。
接著調(diào)用了 spinner.cancel()
,Task.cancel
方法調(diào)用后,將立即在 Task
所包裝的協(xié)程對象即 spin
協(xié)程中拋出 CancelledError
異常,spin
中需要使用 try/except
捕獲 await asyncio.sleep(0.1)
拋出的異常,這樣,就實現(xiàn)了不同協(xié)程之間通過異常進行通信。
main
是唯一的普通函數(shù),沒有被改造為協(xié)程。main
函數(shù)中的 asyncio.run
是整個協(xié)程的啟動入口,asyncio.run
函數(shù)啟動事件循環(huán),驅(qū)動 supervisor()
協(xié)程運行,最終也將啟動其他協(xié)程。
在以上示例代碼中,我們可以總結(jié)出運行協(xié)程的 3 種方式:
asyncio.run(coroutine())
:在一個常規(guī)函數(shù)中調(diào)用,是協(xié)程啟動入口,將開啟協(xié)程的事件循環(huán),調(diào)用后保持阻塞,直至拿到coroutine()
的返回結(jié)果。asyncio.create_task(coroutine())
:在協(xié)程中調(diào)用,接收另一個協(xié)程對象并調(diào)度其最終執(zhí)行,返回的Task
對象是對協(xié)程對象的包裝,并且提供控制和查詢協(xié)程對象運行狀態(tài)的方法。await coroutine()
:在協(xié)程中調(diào)用,await
關(guān)鍵字主動讓出執(zhí)行控制權(quán),終止當前協(xié)程執(zhí)行,直至拿到coroutine()
的返回結(jié)果。同時這也是一個表達式,返回結(jié)果即為coroutine()
返回值。
下面是協(xié)程版本旋轉(zhuǎn)指針程序測試效果:
在協(xié)程版本中,spinner
是一個 Task
對象,其字符串表示形式為 <Task pending name='Task-2' coro=<spin() running at /Users/jianghushinian/spin/spinner_async.py:8>>
。
根據(jù)以上示例代碼,我們可以總結(jié)出 Python 協(xié)程的最大特點:一處異步,處處異步。在協(xié)程中任何耗時操作都會減慢事件循環(huán),由于事件循環(huán)是單線程管理的,所以這會影響其他所有協(xié)程。在編寫協(xié)程代碼,要格外小心,不要寫出同步阻塞的代碼。好在如今 Python 已經(jīng)從語法層面原生支持協(xié)程,比使用生成器實現(xiàn)協(xié)程的年代要好多了。
給你留個小作業(yè):嘗試將 slow
協(xié)程中 await asyncio.sleep(3)
替換成普通的 time.sleep(3)
觀察下效果并思考為什么。
總結(jié)
本文我們分別使用了多線程、多進程以及協(xié)程這三種不同的并發(fā)模型實現(xiàn)了旋轉(zhuǎn)指針程序。三者比較起來,多線程、多進程在語法上差別不大,協(xié)程則大為不同,理解起來也更加困難。
由 supervisor
中打印的 spinner
對象結(jié)果可以看出,線程對象使用 Thread
來表示,進程對象使用 Process
來表示,協(xié)程對象則使用 Task
來表示。協(xié)程的定位是用戶態(tài)線程,相比傳統(tǒng)意義上的線程更加輕量,所以叫作 Task
也合理,代表同一個線程下的不同任務(wù)。
線程和進程無法在外部終止,即主線程和主進程無法終止由子線程或子進程來執(zhí)行的 spin
函數(shù),只能通過 Event
來進行通信,然后由 spin
函數(shù)內(nèi)部自己終止。協(xié)程則可以通過任務(wù)實例方法 Task.cancel()
進行通信,spin
協(xié)程中捕獲 CancelledError
異常后終止自身代碼。
記住,使用協(xié)程的代碼只有一個執(zhí)行流,就如同單線程代碼,同樣只有一個執(zhí)行流,只不過單線程代碼執(zhí)行流永遠是從上到下,而協(xié)程的執(zhí)行流則由事件循環(huán)來控制。
多線程和多進程模型是搶占式的,由操作系統(tǒng)進行調(diào)度執(zhí)行。用戶通常需要控制的是不要讓多個線程(進程)同時操作同一個數(shù)據(jù),常使用互斥鎖來解決這一問題。而協(xié)程只有一個控制循環(huán),協(xié)程的控制權(quán)在我們自己手里,我們決定什么時候切換其他任務(wù)來執(zhí)行。所以在編寫協(xié)程代碼時,要時刻注意不要寫出同步阻塞代碼。
到此這篇關(guān)于詳解Python如何使用并發(fā)模型編程的文章就介紹到這了,更多相關(guān)Python并發(fā)模型內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python中的elasticsearch_dsl查詢語句轉(zhuǎn)換成es查詢語句詳解
這篇文章主要介紹了python中的elasticsearch_dsl查詢語句轉(zhuǎn)換成es查詢語句詳解,ElasticSearch在實際生產(chǎn)里通常和LogStash,Kibana,F(xiàn)ileBeat一起構(gòu)成Elastic?Stack來使用,它是這些組件里面最核心的一個,需要的朋友可以參考下2023-07-07Anconda環(huán)境下Vscode安裝Python的方法詳解
anaconda指的是一個開源的Python發(fā)行版本,其包含了conda、Python等180多個科學包及其依賴項。這篇文章主要介紹了Anconda環(huán)境下Vscode安裝Python的方法,需要的朋友可以參考下2020-03-03Python數(shù)據(jù)分析之PMI數(shù)據(jù)圖形展示
這篇文章主要介紹了Python數(shù)據(jù)分析之PMI數(shù)據(jù)圖形展示,文章介紹了簡單的python爬蟲,并使用numpy進行了簡單的數(shù)據(jù)處理,最終使用?matplotlib?進行圖形繪制,實現(xiàn)了直觀的方式展示制造業(yè)和非制造業(yè)指數(shù)圖形,需要的朋友可以參考一下2022-05-05一文教你如何用Python輕輕松松操作Excel,Word,CSV
數(shù)據(jù)處理是 Python 的一大應(yīng)用場景,而 Excel 又是當前最流行的數(shù)據(jù)處理軟件。本文將為大家詳細介紹一下如何用Python輕輕松松操作Excel、Word、CSV,需要的可以參考一下2022-02-02python 監(jiān)測內(nèi)存和cpu的使用率實例
今天小編就為大家分享一篇python 監(jiān)測內(nèi)存和cpu的使用率實例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-11-11