Python實(shí)現(xiàn)進(jìn)程同步和通信的方法
Python中的多線程其實(shí)并不是真正的多線程,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多進(jìn)程。Python提供了非常好用的多進(jìn)程包multiprocessing,只需要定義一個(gè)函數(shù),Python會(huì)完成其他所有事情。借助這個(gè)包,可以輕松完成從單進(jìn)程到并發(fā)執(zhí)行的轉(zhuǎn)換。multiprocessing支持子進(jìn)程、通信和共享數(shù)據(jù)、執(zhí)行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。
引例:
如之前創(chuàng)建多進(jìn)程的例子
# -*- coding:utf-8 -*- from multiprocessing import Process,Pool import os,time def run_proc(name): ##定義一個(gè)函數(shù)用于進(jìn)程調(diào)用 for i in range(5): time.sleep(0.2) #休眠0.2秒 print 'Run child process %s (%s)' % (name, os.getpid()) #執(zhí)行一次該函數(shù)共需1秒的時(shí)間 if __name__ =='__main__': #執(zhí)行主進(jìn)程 print 'Run the main process (%s).' % (os.getpid()) mainStart = time.time() #記錄主進(jìn)程開(kāi)始的時(shí)間 p = Pool(8) #開(kāi)辟進(jìn)程池 for i in range(16): #開(kāi)辟14個(gè)進(jìn)程 p.apply_async(run_proc,args=('Process'+str(i),))#每個(gè)進(jìn)程都調(diào)用run_proc函數(shù), #args表示給該函數(shù)傳遞的參數(shù)。 print 'Waiting for all subprocesses done ...' p.close() #關(guān)閉進(jìn)程池 p.join() #等待開(kāi)辟的所有進(jìn)程執(zhí)行完后,主進(jìn)程才繼續(xù)往下執(zhí)行 print 'All subprocesses done' mainEnd = time.time() #記錄主進(jìn)程結(jié)束時(shí)間 print 'All process ran %0.2f seconds.' % (mainEnd-mainStart) #主進(jìn)程執(zhí)行時(shí)間
運(yùn)行結(jié)果:
Run the main process (36652). Waiting for all subprocesses done … Run child process Process0 (36708)Run child process Process1 (36748) Run child process Process3 (36736) Run child process Process2 (36716) Run child process Process4 (36768)
如第3行的輸出,偶爾會(huì)出現(xiàn)這樣不如意的輸入格式,為什么呢?
原因是多個(gè)進(jìn)程爭(zhēng)用打印輸出資源的結(jié)果。前一個(gè)進(jìn)程為來(lái)得急輸出換行符,該資源就切換給了另一個(gè)進(jìn)程使用,致使兩個(gè)進(jìn)程輸出在同一行上,而前一個(gè)進(jìn)程的換行符在下一次獲得資源時(shí)才打印輸出。
Lock
為了避免這種情況,需在進(jìn)程進(jìn)入臨界區(qū)(使進(jìn)程進(jìn)入臨界資源的那段代碼,稱為臨界區(qū))時(shí)加鎖。
可以向如下這樣添加鎖后看看執(zhí)行效果:
# -*- coding:utf-8 -*- lock = Lock() #申明一個(gè)全局的lock對(duì)象 def run_proc(name): global lock #引用全局鎖 for i in range(5): time.sleep(0.2) lock.acquire() #申請(qǐng)鎖 print 'Run child process %s (%s)' % (name, os.getpid()) lock.release() #釋放鎖
Semaphore
Semaphore為信號(hào)量機(jī)制。當(dāng)共享的資源擁有多個(gè)時(shí),可用Semaphore來(lái)實(shí)現(xiàn)進(jìn)程同步。其用法和Lock差不多,s = Semaphore(N),每執(zhí)行一次s.acquire(),該資源的可用個(gè)數(shù)將減少1,當(dāng)資源個(gè)數(shù)已為0時(shí),就進(jìn)入阻塞;每執(zhí)行一次s.release(),占用的資源被釋放,該資源的可用個(gè)數(shù)增加1。
多進(jìn)程的通信(信息交互)
不同進(jìn)程之間進(jìn)行數(shù)據(jù)交互,可能不少剛開(kāi)始接觸多進(jìn)程的同學(xué)會(huì)想到共享全局變量的方式,這樣通過(guò)向全局變量寫入和讀取信息便能實(shí)現(xiàn)信息交互。但是很遺憾,并不能這樣實(shí)現(xiàn)。
下面通過(guò)例子,加深對(duì)那篇文章的理解:
# -*- coding:utf-8 -*- from multiprocessing import Process, Pool import os import time L1 = [1, 2, 3] def add(a, b): global L1 L1 += range(a, b) print L1 if __name__ == '__main__': p1 = Process(target=add, args=(20, 30)) p2 = Process(target=add, args=(30, 40)) p1.start() p2.start() p1.join() p2.join() print L1
輸出結(jié)果:
[1, 2, 3, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
[1, 2, 3, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
[1, 2, 3]
該程序的原本目的是想將兩個(gè)子進(jìn)程生成的列表加到全局變量L1中,但用該方法并不能達(dá)到想要的效果。既然不能通過(guò)全局變量來(lái)實(shí)現(xiàn)不同進(jìn)程間的信息交互,那有什么辦法呢。
mutiprocessing為我們可以通過(guò)Queue和Pipe來(lái)實(shí)現(xiàn)進(jìn)程間的通信。
Queue
按上面的例子通過(guò)Queue來(lái)實(shí)現(xiàn):
# -*- coding:utf-8 -*- from multiprocessing import Process, Queue, Lock L = [1, 2, 3] def add(q, lock, a, b): lock.acquire() # 加鎖避免寫入時(shí)出現(xiàn)不可預(yù)知的錯(cuò)誤 L1 = range(a, b) lock.release() q.put(L1) print L1 if __name__ == '__main__': q = Queue() lock = Lock() p1 = Process(target=add, args=(q, lock, 20, 30)) p2 = Process(target=add, args=(q, lock, 30, 40)) p1.start() p2.start() p1.join() p2.join() L += q.get() + q.get() print L
執(zhí)行結(jié)果:
[20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
[30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
[1, 2, 3, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
下面介紹Queue的常用方法:
- 定義時(shí)可用q = Queue(maxsize = 10)來(lái)指定隊(duì)列的長(zhǎng)度,默認(rèn)時(shí)或maxsize值小于1時(shí)隊(duì)列為無(wú)限長(zhǎng)度。
- q.put(item)方法向隊(duì)列放入元素,其還有一個(gè)可選參數(shù)block,默認(rèn)為True,此時(shí)若隊(duì)列已滿則會(huì)阻塞等待,直到有空閑位置。而當(dāng)black值為 False,在該情況下就會(huì)拋出Full異 常
- Queue是不可迭代的對(duì)象,不能通過(guò)for循環(huán)取值,取值時(shí)每次調(diào)用q.get()方法。同樣也有可選參數(shù)block,默認(rèn)為True,若此時(shí)隊(duì)列為空則會(huì)阻塞等待。而black值為False時(shí),在該情況下就會(huì)拋出Empty異常
- Queue.qsize() 返回隊(duì)列的大小
- Queue.empty() 如果隊(duì)列為空,返回True,反之False
- Queue.full() 如果隊(duì)列滿了,返回True,反之False
- Queue.get([block[, timeout]]) 獲取隊(duì)列,timeout等待時(shí)間Queue.get_nowait() 相當(dāng)Queue.get(False) 非阻塞 Queue.put(item) 寫入隊(duì)列,timeout等待時(shí)間
- Queue.put_nowait(item) 相當(dāng)Queue.put(item, False)
Pipe
Pipe管道,可以是單向(half-duplex),也可以是雙向(duplex)。我們通過(guò)mutiprocessing.Pipe(duplex=False)創(chuàng)建單向管道 (默認(rèn)為雙向)。雙向Pipe允許兩端的進(jìn)即可以發(fā)送又可以接受;單向的Pipe只允許前面的端口用于接收,后面的端口用于發(fā)送。
下面給出例子:
# -*- coding:utf-8 -*- from multiprocessing import Process, Pipe def proc1(pipe): s = 'Hello,This is proc1' pipe.send(s) def proc2(pipe): while True: print "proc2 recieve:", pipe.recv() if __name__ == "__main__": pipe = Pipe() p1 = Process(target=proc1, args=(pipe[0],)) p2 = Process(target=proc2, args=(pipe[1],)) p1.start() p2.start() p1.join() p2.join(2) #限制執(zhí)行時(shí)間最多為2秒 print '\nend all processes.'
執(zhí)行結(jié)果如下:
proc2 recieve: Hello,This is proc1
proc2 recieve:
end all processes.
當(dāng)?shù)诙休敵龊螅驗(yàn)楣艿乐袥](méi)有數(shù)據(jù)傳來(lái),Proc2處于阻塞狀態(tài),2秒后被強(qiáng)制結(jié)束。
以下是單向管道的例子,注意pipe[0],pipe[1]的分配。
# -*- coding:utf-8 -*- from multiprocessing import Process, Pipe def proc1(pipe): s = 'Hello,This is proc1' pipe.send(s) def proc2(pipe): while True: print "proc2 recieve:", pipe.recv() if __name__ == "__main__": pipe = Pipe(duplex=False) p1 = Process(target=proc1, args=(pipe[1],)) #pipe[1]為發(fā)送端 p2 = Process(target=proc2, args=(pipe[0],)) #pipe[0]為接收端 p1.start() p2.start() p1.join() p2.join(2) # 限制執(zhí)行時(shí)間最多為2秒 print '\nend all processes.'
執(zhí)行結(jié)果同上。
強(qiáng)大的Manage
Queue和Pipe實(shí)現(xiàn)的數(shù)據(jù)共享方式只支持兩種結(jié)構(gòu) Value 和 Array。Python中提供了強(qiáng)大的Manage專門用來(lái)做數(shù)據(jù)共享,其支持的類型非常多,包括: Value,Array,list, dict,Queue, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event等
其用法如下:
from multiprocessing import Process, Manager def func(dt, lt): for i in range(10): key = 'arg' + str(i) dt[key] = i * i lt += range(11, 16) if __name__ == "__main__": manager = Manager() dt = manager.dict() lt = manager.list() p = Process(target=func, args=(dt, lt)) p.start() p.join() print dt, '\n', lt
執(zhí)行結(jié)果:
{‘a(chǎn)rg8': 64, ‘a(chǎn)rg9': 81, ‘a(chǎn)rg0': 0, ‘a(chǎn)rg1': 1, ‘a(chǎn)rg2': 4, ‘a(chǎn)rg3': 9, ‘a(chǎn)rg4': 16, ‘a(chǎn)rg5': 25, ‘a(chǎn)rg6': 36, ‘a(chǎn)rg7': 49}
[11, 12, 13, 14, 15]
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Python查詢Mysql時(shí)返回字典結(jié)構(gòu)的代碼
MySQLdb默認(rèn)查詢結(jié)果都是返回tuple,輸出時(shí)候不是很方便,必須按照0,1這樣讀取,無(wú)意中在網(wǎng)上找到簡(jiǎn)單的修改方法,就是傳遞一個(gè)cursors.DictCursor就行2012-06-06Python中定時(shí)任務(wù)框架APScheduler的快速入門指南
APScheduler是基于Quartz的一個(gè)Python定時(shí)任務(wù)框架,實(shí)現(xiàn)了Quartz的所有功能,使用起來(lái)十分方便。下面這篇文章主要跟大家介紹了Python中定時(shí)任務(wù)框架APScheduler的快速入門指南,需要的朋友可以參考借鑒,下面來(lái)一起看看吧。2017-07-07python機(jī)器學(xué)習(xí)庫(kù)常用匯總
這篇文章主要為大家匯總了常用python機(jī)器學(xué)習(xí)庫(kù),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-11-11Python?tkinter中l(wèi)abel控件動(dòng)態(tài)改變值問(wèn)題
這篇文章主要介紹了Python?tkinter中l(wèi)abel控件動(dòng)態(tài)改變值問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-01-01matplotlib畫(huà)混淆矩陣與正確率曲線的實(shí)例代碼
混淆矩陣也稱誤差矩陣,是表示精度評(píng)價(jià)的一種標(biāo)準(zhǔn)格式,下面這篇文章主要給大家介紹了關(guān)于matplotlib畫(huà)混淆矩陣與正確率曲線的相關(guān)資料,需要的朋友可以參考下2021-06-06django-rest-swagger對(duì)API接口注釋的方法
今天小編就為大家分享一篇django-rest-swagger對(duì)API接口注釋的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-08-08一文掌握python中的__init__的意思及使用場(chǎng)景分析
__init__是構(gòu)造方法,誰(shuí)調(diào)用,表示誰(shuí)(更直觀的理解就是類的方法中,誰(shuí)調(diào)用,表示誰(shuí),見(jiàn)下面第一個(gè)代碼)??!并不是必選項(xiàng),也就是說(shuō)在類中,這個(gè)不是必須用的,那什么場(chǎng)景需要用到,什么場(chǎng)景不需要用到呢,感興趣的朋友跟隨小編一起看看吧2023-02-02python3中使用__slots__限定實(shí)例屬性操作分析
這篇文章主要介紹了python3中使用__slots__限定實(shí)例屬性操作,結(jié)合實(shí)例形式分析了Python3定義類實(shí)例綁定屬性,以及使用__slots__限定實(shí)例屬性的相關(guān)操作技巧,需要的朋友可以參考下2020-02-02