一文了解Python并發(fā)編程的工程實現(xiàn)方法
上一篇文章介紹了線程的使用。然而 Python 中由于 Global Interpreter Lock
(全局解釋鎖 GIL )的存在,每個線程在在執(zhí)行時需要獲取到這個 GIL ,在同一時刻中只有一個線程得到解釋鎖的執(zhí)行, Python 中的線程并沒有真正意義上的并發(fā)執(zhí)行,多線程的執(zhí)行效率也不一定比單線程的效率更高。 如果要充分利用現(xiàn)代多核 CPU 的并發(fā)能力,就要使用 multipleprocessing 模塊了。
0x01 multipleprocessing
與使用線程的 threading 模塊類似, multipleprocessing
模塊提供許多高級 API 。最常見的是 Pool 對象了,使用它的接口能很方便地寫出并發(fā)執(zhí)行的代碼。
from multiprocessing import Pool def f(x): return x * x if __name__ == '__main__': with Pool(5) as p: # map方法的作用是將f()方法并發(fā)地映射到列表中的每個元素 print(p.map(f, [1, 2, 3])) # 執(zhí)行結(jié)果 # [1, 4, 9]
關于 Pool 下文中還會提到,這里我們先來看 Process 。
Process
要創(chuàng)建一個進程可以使用 Process 類,使用 start() 方法啟動進程。
from multiprocessing import Process import os def echo(text): # 父進程ID print("Process Parent ID : ", os.getppid()) # 進程ID print("Process PID : ", os.getpid()) print('echo : ', text) if __name__ == '__main__': p = Process(target=echo, args=('hello process',)) p.start() p.join() # 執(zhí)行結(jié)果 # Process Parent ID : 27382 # Process PID : 27383 # echo : hello process
進程池
正如開篇提到的 multiprocessing
模塊提供了 Pool 類可以很方便地實現(xiàn)一些簡單多進程場景。 它主要有以下接口
- apply(func[, args[, kwds]])
- 執(zhí)行 func(args,kwds) 方法,在方法結(jié)束返回前會阻塞。
- apply_async(func[, args[, kwds[, callback[, error_callback]]]])
- 異步執(zhí)行 func(args,kwds) ,會立即返回一個 result 對象,如果指定了 callback 參數(shù),結(jié)果會通過回調(diào)方法返回,還可以指定執(zhí)行出錯的回調(diào)方法 error_callback()
- map(func, iterable[, chunksize])
- 類似內(nèi)置函數(shù) map() ,可以并發(fā)執(zhí)行 func ,是同步方法
- map_async(func, iterable[, chunksize[, callback[, error_callback]]])
- 異步版本的 map
- close()
- 關閉進程池。當池中的所有工作進程都執(zhí)行完畢時,進程會退出。
- terminate()
- 終止進程池
- join()
- 等待工作進程執(zhí)行完,必需先調(diào)用 close() 或者 terminate()
from multiprocessing import Pool def f(x): return x * x if __name__ == '__main__': with Pool(5) as p: # map方法的作用是將f()方法并發(fā)地映射到列表中的每個元素 a = p.map(f, [1, 2, 3]) print(a) # 異步執(zhí)行map b = p.map_async(f, [3, 5, 7, 11]) # b 是一個result對象,代表方法的執(zhí)行結(jié)果 print(b) # 為了拿到結(jié)果,使用join方法等待池中工作進程退出 p.close() # 調(diào)用join方法前,需先執(zhí)行close或terminate方法 p.join() # 獲取執(zhí)行結(jié)果 print(b.get()) # 執(zhí)行結(jié)果 # [1, 4, 9] # <multiprocessing.pool.MapResult object at 0x10631b710> # [9, 25, 49, 121]
map_async() 和 apply_async() 執(zhí)行后會返回一個 class multiprocessing.pool.AsyncResult 對象,通過它的 get() 可以獲取到執(zhí)行結(jié)果, ready() 可以判斷 AsyncResult 的結(jié)果是否準備好。
進程間數(shù)據(jù)的傳輸
multiprocessing 模塊提供了兩種方式用于進程間的數(shù)據(jù)共享:隊列( Queue )和管道( Pipe )
Queue 是線程安全,也是進程安全的。使用 Queue 可以實現(xiàn)進程間的數(shù)據(jù)共享,例如下面的 demo 中子進程 put 一個對象,在主進程中就能 get 到這個對象。 任何可以序列化的對象都可以通過 Queue 來傳輸。
from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': # 使用Queue進行數(shù)據(jù)通信 q = Queue() p = Process(target=f, args=(q,)) p.start() # 主進程取得子進程中的數(shù)據(jù) print(q.get()) # prints "[42, None, 'hello']" p.join() # 執(zhí)行結(jié)果 # [42, None, 'hello']
Pipe() 返回一對通過管道連接的 Connection 對象。這兩個對象可以理解為管道的兩端,它們通過 send() 和 recv() 發(fā)送和接收數(shù)據(jù)。
from multiprocessing import Process, Pipe def write(conn): # 子進程中發(fā)送一個對象 conn.send([42, None, 'hello']) conn.close() def read(conn): # 在讀的進程中通過recv接收對象 data = conn.recv() print(data) if __name__ == '__main__': # Pipe()方法返回一對連接對象 w_conn, r_conn = Pipe() wp = Process(target=write, args=(w_conn,)) rp = Process(target=read, args=(r_conn,)) wp.start() rp.start() # 執(zhí)行結(jié)果 # [42, None, 'hello']
需要注意的是,兩個進程不能同時對一個連接對象進行 send 或 recv 操作。
同步
我們知道線程間的同步是通過鎖機制來實現(xiàn)的,進程也一樣。
from multiprocessing import Process, Lock import time def print_with_lock(l, i): l.acquire() try: time.sleep(1) print('hello world', i) finally: l.release() def print_without_lock(i): time.sleep(1) print('hello world', i) if __name__ == '__main__': lock = Lock() # 先執(zhí)行有鎖的 for num in range(5): Process(target=print_with_lock, args=(lock, num)).start() # 再執(zhí)行無鎖的 # for num in range(5): # Process(target=print_without_lock, args=(num,)).start()
有鎖的代碼將每秒依次打印
hello world 0
hello world 1
hello world 2
hello world 3
hello world 4
如果執(zhí)行無鎖的代碼,則在我的電腦上執(zhí)行結(jié)果是這樣的
hello worldhello world 0
1
hello world 2
hello world 3
hello world 4
除了 Lock ,還包括 RLock 、 Condition 、 Semaphore 和 Event 等進程間的同步原語。其用法也與線程間的同步原語很類似。 API 使用可以參考文末中引用的文檔鏈接。
在工程中實現(xiàn)進程間的數(shù)據(jù)共享應當優(yōu)先使用 隊列或管道。
0x02 總結(jié)
本文對 multiprocessing 模塊中常見的 API 作了簡單的介紹。講述了 Process 和 Pool 的常見用法,同時介紹了進程間的數(shù)據(jù)方式:隊列和管道。最后簡單了解了進程間的同步原語。
相關文章
Python實現(xiàn)快速查找并替換Excel中的數(shù)據(jù)
Excel中的查找替換是一個非常實用的功能,能夠幫助用戶快速完成大量數(shù)據(jù)的整理和處理工作,避免手動逐一修改數(shù)據(jù)的麻煩,提高工作效率,所以本文給大家介紹了Python實現(xiàn)快速查找并替換Excel中的數(shù)據(jù),需要的朋友可以參考下2024-06-06python3光學字符識別模塊tesserocr與pytesseract的使用詳解
這篇文章主要介紹了python3光學字符識別模塊tesserocr與pytesseract的使用詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-02-02matplotlib中l(wèi)egend位置調(diào)整解析
這篇文章主要介紹了matplotlib中l(wèi)egend位置調(diào)整解析,具有一定借鑒價值,需要的朋友可以參考下。2017-12-12python UDF 實現(xiàn)對csv批量md5加密操作
這篇文章主要介紹了python UDF 實現(xiàn)對csv批量md5加密操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-01-01Python使用grequests(gevent+requests)并發(fā)發(fā)送請求過程解析
這篇文章主要介紹了Python使用grequests并發(fā)發(fā)送請求過程解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2019-09-09