一文了解Python并發(fā)編程的工程實(shí)現(xiàn)方法
上一篇文章介紹了線(xiàn)程的使用。然而 Python 中由于 Global Interpreter Lock
(全局解釋鎖 GIL )的存在,每個(gè)線(xiàn)程在在執(zhí)行時(shí)需要獲取到這個(gè) GIL ,在同一時(shí)刻中只有一個(gè)線(xiàn)程得到解釋鎖的執(zhí)行, Python 中的線(xiàn)程并沒(méi)有真正意義上的并發(fā)執(zhí)行,多線(xiàn)程的執(zhí)行效率也不一定比單線(xiàn)程的效率更高。 如果要充分利用現(xiàn)代多核 CPU 的并發(fā)能力,就要使用 multipleprocessing 模塊了。
0x01 multipleprocessing
與使用線(xiàn)程的 threading 模塊類(lèi)似, multipleprocessing
模塊提供許多高級(jí) API 。最常見(jiàn)的是 Pool 對(duì)象了,使用它的接口能很方便地寫(xiě)出并發(fā)執(zhí)行的代碼。
from multiprocessing import Pool def f(x): return x * x if __name__ == '__main__': with Pool(5) as p: # map方法的作用是將f()方法并發(fā)地映射到列表中的每個(gè)元素 print(p.map(f, [1, 2, 3])) # 執(zhí)行結(jié)果 # [1, 4, 9]
關(guān)于 Pool 下文中還會(huì)提到,這里我們先來(lái)看 Process 。
Process
要?jiǎng)?chuàng)建一個(gè)進(jìn)程可以使用 Process 類(lèi),使用 start() 方法啟動(dòng)進(jìn)程。
from multiprocessing import Process import os def echo(text): # 父進(jìn)程ID print("Process Parent ID : ", os.getppid()) # 進(jìn)程ID print("Process PID : ", os.getpid()) print('echo : ', text) if __name__ == '__main__': p = Process(target=echo, args=('hello process',)) p.start() p.join() # 執(zhí)行結(jié)果 # Process Parent ID : 27382 # Process PID : 27383 # echo : hello process
進(jìn)程池
正如開(kāi)篇提到的 multiprocessing
模塊提供了 Pool 類(lèi)可以很方便地實(shí)現(xiàn)一些簡(jiǎn)單多進(jìn)程場(chǎng)景。 它主要有以下接口
- apply(func[, args[, kwds]])
- 執(zhí)行 func(args,kwds) 方法,在方法結(jié)束返回前會(huì)阻塞。
- apply_async(func[, args[, kwds[, callback[, error_callback]]]])
- 異步執(zhí)行 func(args,kwds) ,會(huì)立即返回一個(gè) result 對(duì)象,如果指定了 callback 參數(shù),結(jié)果會(huì)通過(guò)回調(diào)方法返回,還可以指定執(zhí)行出錯(cuò)的回調(diào)方法 error_callback()
- map(func, iterable[, chunksize])
- 類(lèi)似內(nèi)置函數(shù) map() ,可以并發(fā)執(zhí)行 func ,是同步方法
- map_async(func, iterable[, chunksize[, callback[, error_callback]]])
- 異步版本的 map
- close()
- 關(guān)閉進(jìn)程池。當(dāng)池中的所有工作進(jìn)程都執(zhí)行完畢時(shí),進(jìn)程會(huì)退出。
- terminate()
- 終止進(jìn)程池
- join()
- 等待工作進(jìn)程執(zhí)行完,必需先調(diào)用 close() 或者 terminate()
from multiprocessing import Pool def f(x): return x * x if __name__ == '__main__': with Pool(5) as p: # map方法的作用是將f()方法并發(fā)地映射到列表中的每個(gè)元素 a = p.map(f, [1, 2, 3]) print(a) # 異步執(zhí)行map b = p.map_async(f, [3, 5, 7, 11]) # b 是一個(gè)result對(duì)象,代表方法的執(zhí)行結(jié)果 print(b) # 為了拿到結(jié)果,使用join方法等待池中工作進(jìn)程退出 p.close() # 調(diào)用join方法前,需先執(zhí)行close或terminate方法 p.join() # 獲取執(zhí)行結(jié)果 print(b.get()) # 執(zhí)行結(jié)果 # [1, 4, 9] # <multiprocessing.pool.MapResult object at 0x10631b710> # [9, 25, 49, 121]
map_async() 和 apply_async() 執(zhí)行后會(huì)返回一個(gè) class multiprocessing.pool.AsyncResult 對(duì)象,通過(guò)它的 get() 可以獲取到執(zhí)行結(jié)果, ready() 可以判斷 AsyncResult 的結(jié)果是否準(zhǔn)備好。
進(jìn)程間數(shù)據(jù)的傳輸
multiprocessing 模塊提供了兩種方式用于進(jìn)程間的數(shù)據(jù)共享:隊(duì)列( Queue )和管道( Pipe )
Queue 是線(xiàn)程安全,也是進(jìn)程安全的。使用 Queue 可以實(shí)現(xiàn)進(jìn)程間的數(shù)據(jù)共享,例如下面的 demo 中子進(jìn)程 put 一個(gè)對(duì)象,在主進(jìn)程中就能 get 到這個(gè)對(duì)象。 任何可以序列化的對(duì)象都可以通過(guò) Queue 來(lái)傳輸。
from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': # 使用Queue進(jìn)行數(shù)據(jù)通信 q = Queue() p = Process(target=f, args=(q,)) p.start() # 主進(jìn)程取得子進(jìn)程中的數(shù)據(jù) print(q.get()) # prints "[42, None, 'hello']" p.join() # 執(zhí)行結(jié)果 # [42, None, 'hello']
Pipe() 返回一對(duì)通過(guò)管道連接的 Connection 對(duì)象。這兩個(gè)對(duì)象可以理解為管道的兩端,它們通過(guò) send() 和 recv() 發(fā)送和接收數(shù)據(jù)。
from multiprocessing import Process, Pipe def write(conn): # 子進(jìn)程中發(fā)送一個(gè)對(duì)象 conn.send([42, None, 'hello']) conn.close() def read(conn): # 在讀的進(jìn)程中通過(guò)recv接收對(duì)象 data = conn.recv() print(data) if __name__ == '__main__': # Pipe()方法返回一對(duì)連接對(duì)象 w_conn, r_conn = Pipe() wp = Process(target=write, args=(w_conn,)) rp = Process(target=read, args=(r_conn,)) wp.start() rp.start() # 執(zhí)行結(jié)果 # [42, None, 'hello']
需要注意的是,兩個(gè)進(jìn)程不能同時(shí)對(duì)一個(gè)連接對(duì)象進(jìn)行 send 或 recv 操作。
同步
我們知道線(xiàn)程間的同步是通過(guò)鎖機(jī)制來(lái)實(shí)現(xiàn)的,進(jìn)程也一樣。
from multiprocessing import Process, Lock import time def print_with_lock(l, i): l.acquire() try: time.sleep(1) print('hello world', i) finally: l.release() def print_without_lock(i): time.sleep(1) print('hello world', i) if __name__ == '__main__': lock = Lock() # 先執(zhí)行有鎖的 for num in range(5): Process(target=print_with_lock, args=(lock, num)).start() # 再執(zhí)行無(wú)鎖的 # for num in range(5): # Process(target=print_without_lock, args=(num,)).start()
有鎖的代碼將每秒依次打印
hello world 0
hello world 1
hello world 2
hello world 3
hello world 4
如果執(zhí)行無(wú)鎖的代碼,則在我的電腦上執(zhí)行結(jié)果是這樣的
hello worldhello world 0
1
hello world 2
hello world 3
hello world 4
除了 Lock ,還包括 RLock 、 Condition 、 Semaphore 和 Event 等進(jìn)程間的同步原語(yǔ)。其用法也與線(xiàn)程間的同步原語(yǔ)很類(lèi)似。 API 使用可以參考文末中引用的文檔鏈接。
在工程中實(shí)現(xiàn)進(jìn)程間的數(shù)據(jù)共享應(yīng)當(dāng)優(yōu)先使用 隊(duì)列或管道。
0x02 總結(jié)
本文對(duì) multiprocessing 模塊中常見(jiàn)的 API 作了簡(jiǎn)單的介紹。講述了 Process 和 Pool 的常見(jiàn)用法,同時(shí)介紹了進(jìn)程間的數(shù)據(jù)方式:隊(duì)列和管道。最后簡(jiǎn)單了解了進(jìn)程間的同步原語(yǔ)。
相關(guān)文章
python 成功引入包但無(wú)法正常調(diào)用的解決
這篇文章主要介紹了python 成功引入包但無(wú)法正常調(diào)用的解決,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-03-03Python實(shí)現(xiàn)快速查找并替換Excel中的數(shù)據(jù)
Excel中的查找替換是一個(gè)非常實(shí)用的功能,能夠幫助用戶(hù)快速完成大量數(shù)據(jù)的整理和處理工作,避免手動(dòng)逐一修改數(shù)據(jù)的麻煩,提高工作效率,所以本文給大家介紹了Python實(shí)現(xiàn)快速查找并替換Excel中的數(shù)據(jù),需要的朋友可以參考下2024-06-06python3光學(xué)字符識(shí)別模塊tesserocr與pytesseract的使用詳解
這篇文章主要介紹了python3光學(xué)字符識(shí)別模塊tesserocr與pytesseract的使用詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-02-02matplotlib中l(wèi)egend位置調(diào)整解析
這篇文章主要介紹了matplotlib中l(wèi)egend位置調(diào)整解析,具有一定借鑒價(jià)值,需要的朋友可以參考下。2017-12-12python UDF 實(shí)現(xiàn)對(duì)csv批量md5加密操作
這篇文章主要介紹了python UDF 實(shí)現(xiàn)對(duì)csv批量md5加密操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-01-01Python使用Selenium模擬瀏覽器自動(dòng)操作功能
這篇文章主要介紹了Python使用Selenium模擬瀏覽器自動(dòng)操作功能,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-09-09快速解決jupyter notebook啟動(dòng)需要密碼的問(wèn)題
這篇文章主要介紹了快速解決jupyter notebook啟動(dòng)需要密碼的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-04-04Python多線(xiàn)程同步---文件讀寫(xiě)控制方法
今天小編就為大家分享一篇Python多線(xiàn)程同步---文件讀寫(xiě)控制方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-02-02Python使用grequests(gevent+requests)并發(fā)發(fā)送請(qǐng)求過(guò)程解析
這篇文章主要介紹了Python使用grequests并發(fā)發(fā)送請(qǐng)求過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-09-09