Python3多進程 multiprocessing 模塊實例詳解
本文實例講述了Python3多進程 multiprocessing 模塊。分享給大家供大家參考,具體如下:
多進程 Multiprocessing 模塊
Process 類
Process 類用來描述一個進程對象。創(chuàng)建子進程的時候,只需要傳入一個執(zhí)行函數(shù)和函數(shù)的參數(shù)即可完成 Process 示例的創(chuàng)建。
star() 方法啟動進程,
join() 方法實現(xiàn)進程間的同步,等待所有進程退出。
close() 用來阻止多余的進程涌入進程池 Pool 造成進程阻塞。
multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
target 是函數(shù)名字,需要調(diào)用的函數(shù)
args 函數(shù)需要的參數(shù),以 tuple 的形式傳入
示例:
import multiprocessing import os def run_proc(name): print('Child process {0} {1} Running '.format(name, os.getpid())) if __name__ == '__main__': print('Parent process {0} is Running'.format(os.getpid())) for i in range(5): p = multiprocessing.Process(target=run_proc, args=(str(i),)) print('process start') p.start() p.join() print('Process close')
結(jié)果:
Parent process 809 is Running
process start
process start
process start
process start
process start
Child process 0 810 Running
Child process 1 811 Running
Child process 2 812 Running
Child process 3 813 Running
Child process 4 814 Running
Process close
Pool
Pool 可以提供指定數(shù)量的進程供用戶使用,默認是 CPU 核數(shù)。當(dāng)有新的請求提交到 Poll 的時候,如果池子沒有滿,會創(chuàng)建一個進程來執(zhí)行,否則就會讓該請求等待。
- Pool 對象調(diào)用 join 方法會等待所有的子進程執(zhí)行完畢
- 調(diào)用 join 方法之前,必須調(diào)用 close
- 調(diào)用 close 之后就不能繼續(xù)添加新的 Process 了
pool.apply_async
apply_async
方法用來同步執(zhí)行進程,允許多個進程同時進入池子。
import multiprocessing import os import time def run_task(name): print('Task {0} pid {1} is running, parent id is {2}'.format(name, os.getpid(), os.getppid())) time.sleep(1) print('Task {0} end.'.format(name)) if __name__ == '__main__': print('current process {0}'.format(os.getpid())) p = multiprocessing.Pool(processes=3) for i in range(6): p.apply_async(run_task, args=(i,)) print('Waiting for all subprocesses done...') p.close() p.join() print('All processes done!')
結(jié)果:
current process 921
Waiting for all subprocesses done...
Task 0 pid 922 is running, parent id is 921
Task 1 pid 923 is running, parent id is 921
Task 2 pid 924 is running, parent id is 921
Task 0 end.
Task 3 pid 922 is running, parent id is 921
Task 1 end.
Task 4 pid 923 is running, parent id is 921
Task 2 end.
Task 5 pid 924 is running, parent id is 921
Task 3 end.
Task 4 end.
Task 5 end.
All processes done!
pool.apply
apply(func[, args[, kwds]])
該方法只能允許一個進程進入池子,在一個進程結(jié)束之后,另外一個進程才可以進入池子。
import multiprocessing import os import time def run_task(name): print('Task {0} pid {1} is running, parent id is {2}'.format(name, os.getpid(), os.getppid())) time.sleep(1) print('Task {0} end.'.format(name)) if __name__ == '__main__': print('current process {0}'.format(os.getpid())) p = multiprocessing.Pool(processes=3) for i in range(6): p.apply(run_task, args=(i,)) print('Waiting for all subprocesses done...') p.close() p.join() print('All processes done!')
結(jié)果:
Task 0 pid 928 is running, parent id is 927
Task 0 end.
Task 1 pid 929 is running, parent id is 927
Task 1 end.
Task 2 pid 930 is running, parent id is 927
Task 2 end.
Task 3 pid 928 is running, parent id is 927
Task 3 end.
Task 4 pid 929 is running, parent id is 927
Task 4 end.
Task 5 pid 930 is running, parent id is 927
Task 5 end.
Waiting for all subprocesses done...
All processes done!
Queue 進程間通信
Queue 用來在多個進程間通信。Queue 有兩個方法,get 和 put。
put 方法
Put 方法用來插入數(shù)據(jù)到隊列中,有兩個可選參數(shù),blocked 和 timeout。
- blocked = True(默認值),timeout 為正
該方法會阻塞 timeout 指定的時間,直到該隊列有剩余空間。如果超時,拋出 Queue.Full 異常。
blocked = False
如果 Queue 已滿,立刻拋出 Queue.Full 異常
get 方法
get 方法用來從隊列中讀取并刪除一個元素。有兩個參數(shù)可選,blocked 和 timeout
- blocked = False (默認),timeout 正值
等待時間內(nèi),沒有取到任何元素,會拋出 Queue.Empty 異常。
blocked = True
Queue 有一個值可用,立刻返回改值;Queue 沒有任何元素,
from multiprocessing import Process, Queue import os, time, random # 寫數(shù)據(jù)進程執(zhí)行的代碼: def proc_write(q,urls): print('Process(%s) is writing...' % os.getpid()) for url in urls: q.put(url) print('Put %s to queue...' % url) time.sleep(random.random()) # 讀數(shù)據(jù)進程執(zhí)行的代碼: def proc_read(q): print('Process(%s) is reading...' % os.getpid()) while True: url = q.get(True) print('Get %s from queue.' % url) if __name__=='__main__': # 父進程創(chuàng)建Queue,并傳給各個子進程: q = Queue() proc_writer1 = Process(target=proc_write, args=(q,['url_1', 'url_2', 'url_3'])) proc_writer2 = Process(target=proc_write, args=(q,['url_4','url_5','url_6'])) proc_reader = Process(target=proc_read, args=(q,)) # 啟動子進程proc_writer,寫入: proc_writer1.start() proc_writer2.start() # 啟動子進程proc_reader,讀取: proc_reader.start() # 等待proc_writer結(jié)束: proc_writer1.join() proc_writer2.join() # proc_reader進程里是死循環(huán),無法等待其結(jié)束,只能強行終止: proc_reader.terminate()
結(jié)果:
Process(1083) is writing...
Put url_1 to queue...
Process(1084) is writing...
Put url_4 to queue...
Process(1085) is reading...
Get url_1 from queue.
Get url_4 from queue.
Put url_5 to queue...
Get url_5 from queue.
Put url_2 to queue...
Get url_2 from queue.
Put url_6 to queue...
Get url_6 from queue.
Put url_3 to queue...
Get url_3 from queue.
Pipe 進程間通信
常用來在兩個進程間通信,兩個進程分別位于管道的兩端。
multiprocessing.Pipe([duplex])
示例一和示例二,也是網(wǎng)上找的別人的例子,嘗試?yán)斫獠⒃黾恿俗⑨尪选>W(wǎng)上的例子,大多是例子一和例子二在一起的,這里分開來看,比較容易理解。
示例一:
from multiprocessing import Process, Pipe def send(pipe): pipe.send(['spam'] + [42, 'egg']) # send 傳輸一個列表 pipe.close() if __name__ == '__main__': (con1, con2) = Pipe() # 創(chuàng)建兩個 Pipe 實例 sender = Process(target=send, args=(con1, )) # 函數(shù)的參數(shù),args 一定是實例化之后的 Pip 變量,不能直接寫 args=(Pip(),) sender.start() # Process 類啟動進程 print("con2 got: %s" % con2.recv()) # 管道的另一端 con2 從send收到消息 con2.close() # 關(guān)閉管道
結(jié)果:
con2 got: ['spam', 42, 'egg']
示例二:
from multiprocessing import Process, Pipe def talk(pipe): pipe.send(dict(name='Bob', spam=42)) # 傳輸一個字典 reply = pipe.recv() # 接收傳輸?shù)臄?shù)據(jù) print('talker got:', reply) if __name__ == '__main__': (parentEnd, childEnd) = Pipe() # 創(chuàng)建兩個 Pipe() 實例,也可以改成 conf1, conf2 child = Process(target=talk, args=(childEnd,)) # 創(chuàng)建一個 Process 進程,名稱為 child child.start() # 啟動進程 print('parent got:', parentEnd.recv()) # parentEnd 是一個 Pip() 管道,可以接收 child Process 進程傳輸?shù)臄?shù)據(jù) parentEnd.send({x * 2 for x in 'spam'}) # parentEnd 是一個 Pip() 管道,可以使用 send 方法來傳輸數(shù)據(jù) child.join() # 傳輸?shù)臄?shù)據(jù)被 talk 函數(shù)內(nèi)的 pip 管道接收,并賦值給 reply print('parent exit')
結(jié)果:
parent got: {'name': 'Bob', 'spam': 42}
talker got: {'ss', 'aa', 'pp', 'mm'}
parent exit
更多關(guān)于Python相關(guān)內(nèi)容感興趣的讀者可查看本站專題:《Python進程與線程操作技巧總結(jié)》、《Python Socket編程技巧總結(jié)》、《Python數(shù)據(jù)結(jié)構(gòu)與算法教程》、《Python函數(shù)使用技巧總結(jié)》、《Python字符串操作技巧匯總》、《Python入門與進階經(jīng)典教程》及《Python文件與目錄操作技巧匯總》
希望本文所述對大家Python程序設(shè)計有所幫助。
- Python多進程并發(fā)(multiprocessing)用法實例詳解
- Python多進程multiprocessing用法實例分析
- python multiprocessing多進程變量共享與加鎖的實現(xiàn)
- Python標(biāo)準(zhǔn)庫之多進程(multiprocessing包)介紹
- python基于multiprocessing的多進程創(chuàng)建方法
- python multiprocessing 多進程并行計算的操作
- 簡單學(xué)習(xí)Python多進程Multiprocessing
- Python使用multiprocessing實現(xiàn)多進程的詳細步驟記錄
相關(guān)文章
TensorFlow使用Graph的基本操作的實現(xiàn)
這篇文章主要介紹了TensorFlow使用Graph的基本操作的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-04-04Python入門Anaconda和Pycharm的安裝和配置詳解
這篇文章主要介紹了Python入門Anaconda和Pycharm的安裝和配置詳解,文章通過圖文介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-07-07python中readline判斷文件讀取結(jié)束的方法
這篇文章主要介紹了python中readline判斷文件讀取結(jié)束的方法,實例形式詳細分析了Python中readline的用法,需要的朋友可以參考下2014-11-11Python辦公自動化解決world文件批量轉(zhuǎn)換
本文分享如何用 Python 來讀取 Word、寫入 Word、將 Word 轉(zhuǎn)換為 pdf。學(xué)會之后,如果遇到大量 Word 文件需要處理的時候,就不慌了2021-09-09