簡單談?wù)刾ython中的Queue與多進程
最近接觸一個項目,要在多個虛擬機中運行任務(wù),參考別人之前項目的代碼,采用了多進程來處理,于是上網(wǎng)查了查python中的多進程
一、先說說Queue(隊列對象)
Queue是python中的標準庫,可以直接import 引用,之前學習的時候有聽過著名的“先吃先拉”與“后吃先吐”,其實就是這里說的隊列,隊列的構(gòu)造的時候可以定義它的容量,別吃撐了,吃多了,就會報錯,構(gòu)造的時候不寫或者寫個小于1的數(shù)則表示無限多
import Queue
q = Queue.Queue(10)
向隊列中放值(put)
q.put(‘yang')
q.put(4)
q.put([‘yan','xing'])
在隊列中取值get()
默認的隊列是先進先出的
>>> q.get()
‘yang'
>>> q.get()
4
>>> q.get()
[‘yan', ‘xing']
當一個隊列為空的時候如果再用get取則會堵塞,所以取隊列的時候一般是用到
get_nowait()方法,這種方法在向一個空隊列取值的時候會拋一個Empty異常
所以更常用的方法是先判斷一個隊列是否為空,如果不為空則取值
隊列中常用的方法
Queue.qsize() 返回隊列的大小
Queue.empty() 如果隊列為空,返回True,反之False
Queue.full() 如果隊列滿了,返回True,反之False
Queue.get([block[, timeout]]) 獲取隊列,timeout等待時間
Queue.get_nowait() 相當Queue.get(False)
非阻塞 Queue.put(item) 寫入隊列,timeout等待時間
Queue.put_nowait(item) 相當Queue.put(item, False)
二、multiprocessing中使用子進程概念
from multiprocessing import Process
可以通過Process來構(gòu)造一個子進程
p = Process(target=fun,args=(args))
再通過p.start()來啟動子進程
再通過p.join()方法來使得子進程運行結(jié)束后再執(zhí)行父進程
from multiprocessing import Process import os # 子進程要執(zhí)行的代碼 def run_proc(name): print 'Run child process %s (%s)...' % (name, os.getpid()) if __name__=='__main__': print 'Parent process %s.' % os.getpid() p = Process(target=run_proc, args=('test',)) print 'Process will start.' p.start() p.join() print 'Process end.'
三、在multiprocessing中使用pool
如果需要多個子進程時可以考慮使用進程池(pool)來管理
from multiprocessing import Pool
from multiprocessing import Pool import os, time def long_time_task(name): print 'Run task %s (%s)...' % (name, os.getpid()) start = time.time() time.sleep(3) end = time.time() print 'Task %s runs %0.2f seconds.' % (name, (end - start)) if __name__=='__main__': print 'Parent process %s.' % os.getpid() p = Pool() for i in range(5): p.apply_async(long_time_task, args=(i,)) print 'Waiting for all subprocesses done...' p.close() p.join() print 'All subprocesses done.'
pool創(chuàng)建子進程的方法與Process不同,是通過
p.apply_async(func,args=(args))實現(xiàn),一個池子里能同時運行的任務(wù)是取決你電腦的cpu數(shù)量,如我的電腦現(xiàn)在是有4個cpu,那會子進程task0,task1,task2,task3可以同時啟動,task4則在之前的一個某個進程結(jié)束后才開始
上面的程序運行后的結(jié)果其實是按照上圖中1,2,3分開進行的,先打印1,3秒后打印2,再3秒后打印3
代碼中的p.close()是關(guān)掉進程池子,是不再向里面添加進程了,對Pool對象調(diào)用join()方法會等待所有子進程執(zhí)行完畢,調(diào)用join()之前必須先調(diào)用close(),調(diào)用close()之后就不能繼續(xù)添加新的Process了。
當時也可以是實例pool的時候給它定義一個進程的多少
如果上面的代碼中p=Pool(5)那么所有的子進程就可以同時進行
三、多個子進程間的通信
多個子進程間的通信就要采用第一步中說到的Queue,比如有以下的需求,一個子進程向隊列中寫數(shù)據(jù),另外一個進程從隊列中取數(shù)據(jù),
#coding:gbk from multiprocessing import Process, Queue import os, time, random # 寫數(shù)據(jù)進程執(zhí)行的代碼: def write(q): for value in ['A', 'B', 'C']: print 'Put %s to queue...' % value q.put(value) time.sleep(random.random()) # 讀數(shù)據(jù)進程執(zhí)行的代碼: def read(q): while True: if not q.empty(): value = q.get(True) print 'Get %s from queue.' % value time.sleep(random.random()) else: break if __name__=='__main__': # 父進程創(chuàng)建Queue,并傳給各個子進程: q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) # 啟動子進程pw,寫入: pw.start() # 等待pw結(jié)束: pw.join() # 啟動子進程pr,讀取: pr.start() pr.join() # pr進程里是死循環(huán),無法等待其結(jié)束,只能強行終止: print print '所有數(shù)據(jù)都寫入并且讀完'
四、關(guān)于上面代碼的幾個有趣的問題
if __name__=='__main__': # 父進程創(chuàng)建Queue,并傳給各個子進程: q = Queue() p = Pool() pw = p.apply_async(write,args=(q,)) pr = p.apply_async(read,args=(q,)) p.close() p.join() print print '所有數(shù)據(jù)都寫入并且讀完'
如果main函數(shù)寫成上面的樣本,本來我想要的是將會得到一個隊列,將其作為參數(shù)傳入進程池子里的每個子進程,但是卻得到
RuntimeError: Queue objects should only be shared between processes through inheritance
的錯誤,查了下,大意是隊列對象不能在父進程與子進程間通信,這個如果想要使用進程池中使用隊列則要使用multiprocess的Manager類
if __name__=='__main__': manager = multiprocessing.Manager() # 父進程創(chuàng)建Queue,并傳給各個子進程: q = manager.Queue() p = Pool() pw = p.apply_async(write,args=(q,)) time.sleep(0.5) pr = p.apply_async(read,args=(q,)) p.close() p.join() print print '所有數(shù)據(jù)都寫入并且讀完'
這樣這個隊列對象就可以在父進程與子進程間通信,不用池則不需要Manager,以后再擴展multiprocess中的Manager類吧
關(guān)于鎖的應(yīng)用,在不同程序間如果有同時對同一個隊列操作的時候,為了避免錯誤,可以在某個函數(shù)操作隊列的時候給它加把鎖,這樣在同一個時間內(nèi)則只能有一個子進程對隊列進行操作,鎖也要在manager對象中的鎖
#coding:gbk from multiprocessing import Process,Queue,Pool import multiprocessing import os, time, random # 寫數(shù)據(jù)進程執(zhí)行的代碼: def write(q,lock): lock.acquire() #加上鎖 for value in ['A', 'B', 'C']: print 'Put %s to queue...' % value q.put(value) lock.release() #釋放鎖 # 讀數(shù)據(jù)進程執(zhí)行的代碼: def read(q): while True: if not q.empty(): value = q.get(False) print 'Get %s from queue.' % value time.sleep(random.random()) else: break if __name__=='__main__': manager = multiprocessing.Manager() # 父進程創(chuàng)建Queue,并傳給各個子進程: q = manager.Queue() lock = manager.Lock() #初始化一把鎖 p = Pool() pw = p.apply_async(write,args=(q,lock)) pr = p.apply_async(read,args=(q,)) p.close() p.join() print print '所有數(shù)據(jù)都寫入并且讀完'
- Python多進程并發(fā)(multiprocessing)用法實例詳解
- 淺析Python中的多進程與多線程的使用
- Python多進程通信Queue、Pipe、Value、Array實例
- Python中使用多進程來實現(xiàn)并行處理的方法小結(jié)
- Python多進程同步Lock、Semaphore、Event實例
- Python控制多進程與多線程并發(fā)數(shù)總結(jié)
- Python多線程、異步+多進程爬蟲實現(xiàn)代碼
- python 多進程通信模塊的簡單實現(xiàn)
- Python多進程庫multiprocessing中進程池Pool類的使用詳解
- 詳解python之多進程和進程池(Processing庫)
- Python自定義進程池實例分析【生產(chǎn)者、消費者模型問題】
- python多進程(加入進程池)操作常見案例
相關(guān)文章
基于python traceback實現(xiàn)異常的獲取與處理
這篇文章主要介紹了基于python traceback實現(xiàn)異常的獲取與處理,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2019-12-12PyTorch實現(xiàn)手寫數(shù)字識別的示例代碼
本文主要介紹了PyTorch實現(xiàn)手寫數(shù)字識別的示例代碼,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下<BR>2022-05-05Python+OpenCV編寫車輛計數(shù)器系統(tǒng)
本文,我們將使用歐幾里德距離跟蹤和輪廓的概念在 Python 中使用 OpenCV 構(gòu)建車輛計數(shù)器系統(tǒng),文中的示例代碼講解詳細,感興趣的可以了解一下2022-05-05Python?OpenCV實現(xiàn)圖片預(yù)處理的方法詳解
這篇文章主要為大家詳細介紹了Python?OpenCV實現(xiàn)圖片預(yù)處理的方法,文中的示例代碼講解詳細,具有一定的借鑒價值,感興趣的可以了解一下2022-09-09Python 更快進行探索性數(shù)據(jù)分析的四個方法
今天我給大家分享幾種更快的探索性數(shù)據(jù)分析方法,它們可以進一步加速 EDA。 我們以一個學生考試成績的例子,創(chuàng)建一個如下所示的 DataFrame 并繼續(xù)操作。歡迎收藏學習,喜歡點贊支持2021-11-11