Python實現(xiàn)進程同步和通信的方法
Python中的多線程其實并不是真正的多線程,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多進程。Python提供了非常好用的多進程包multiprocessing,只需要定義一個函數(shù),Python會完成其他所有事情。借助這個包,可以輕松完成從單進程到并發(fā)執(zhí)行的轉換。multiprocessing支持子進程、通信和共享數(shù)據(jù)、執(zhí)行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。
引例:
如之前創(chuàng)建多進程的例子
# -*- coding:utf-8 -*- from multiprocessing import Process,Pool import os,time def run_proc(name): ##定義一個函數(shù)用于進程調用 for i in range(5): time.sleep(0.2) #休眠0.2秒 print 'Run child process %s (%s)' % (name, os.getpid()) #執(zhí)行一次該函數(shù)共需1秒的時間 if __name__ =='__main__': #執(zhí)行主進程 print 'Run the main process (%s).' % (os.getpid()) mainStart = time.time() #記錄主進程開始的時間 p = Pool(8) #開辟進程池 for i in range(16): #開辟14個進程 p.apply_async(run_proc,args=('Process'+str(i),))#每個進程都調用run_proc函數(shù), #args表示給該函數(shù)傳遞的參數(shù)。 print 'Waiting for all subprocesses done ...' p.close() #關閉進程池 p.join() #等待開辟的所有進程執(zhí)行完后,主進程才繼續(xù)往下執(zhí)行 print 'All subprocesses done' mainEnd = time.time() #記錄主進程結束時間 print 'All process ran %0.2f seconds.' % (mainEnd-mainStart) #主進程執(zhí)行時間
運行結果:
Run the main process (36652). Waiting for all subprocesses done … Run child process Process0 (36708)Run child process Process1 (36748) Run child process Process3 (36736) Run child process Process2 (36716) Run child process Process4 (36768)
如第3行的輸出,偶爾會出現(xiàn)這樣不如意的輸入格式,為什么呢?
原因是多個進程爭用打印輸出資源的結果。前一個進程為來得急輸出換行符,該資源就切換給了另一個進程使用,致使兩個進程輸出在同一行上,而前一個進程的換行符在下一次獲得資源時才打印輸出。
Lock
為了避免這種情況,需在進程進入臨界區(qū)(使進程進入臨界資源的那段代碼,稱為臨界區(qū))時加鎖。
可以向如下這樣添加鎖后看看執(zhí)行效果:
# -*- coding:utf-8 -*- lock = Lock() #申明一個全局的lock對象 def run_proc(name): global lock #引用全局鎖 for i in range(5): time.sleep(0.2) lock.acquire() #申請鎖 print 'Run child process %s (%s)' % (name, os.getpid()) lock.release() #釋放鎖
Semaphore
Semaphore為信號量機制。當共享的資源擁有多個時,可用Semaphore來實現(xiàn)進程同步。其用法和Lock差不多,s = Semaphore(N),每執(zhí)行一次s.acquire(),該資源的可用個數(shù)將減少1,當資源個數(shù)已為0時,就進入阻塞;每執(zhí)行一次s.release(),占用的資源被釋放,該資源的可用個數(shù)增加1。
多進程的通信(信息交互)
不同進程之間進行數(shù)據(jù)交互,可能不少剛開始接觸多進程的同學會想到共享全局變量的方式,這樣通過向全局變量寫入和讀取信息便能實現(xiàn)信息交互。但是很遺憾,并不能這樣實現(xiàn)。
下面通過例子,加深對那篇文章的理解:
# -*- coding:utf-8 -*- from multiprocessing import Process, Pool import os import time L1 = [1, 2, 3] def add(a, b): global L1 L1 += range(a, b) print L1 if __name__ == '__main__': p1 = Process(target=add, args=(20, 30)) p2 = Process(target=add, args=(30, 40)) p1.start() p2.start() p1.join() p2.join() print L1
輸出結果:
[1, 2, 3, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
[1, 2, 3, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
[1, 2, 3]
該程序的原本目的是想將兩個子進程生成的列表加到全局變量L1中,但用該方法并不能達到想要的效果。既然不能通過全局變量來實現(xiàn)不同進程間的信息交互,那有什么辦法呢。
mutiprocessing為我們可以通過Queue和Pipe來實現(xiàn)進程間的通信。
Queue
按上面的例子通過Queue來實現(xiàn):
# -*- coding:utf-8 -*- from multiprocessing import Process, Queue, Lock L = [1, 2, 3] def add(q, lock, a, b): lock.acquire() # 加鎖避免寫入時出現(xiàn)不可預知的錯誤 L1 = range(a, b) lock.release() q.put(L1) print L1 if __name__ == '__main__': q = Queue() lock = Lock() p1 = Process(target=add, args=(q, lock, 20, 30)) p2 = Process(target=add, args=(q, lock, 30, 40)) p1.start() p2.start() p1.join() p2.join() L += q.get() + q.get() print L
執(zhí)行結果:
[20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
[30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
[1, 2, 3, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
下面介紹Queue的常用方法:
- 定義時可用q = Queue(maxsize = 10)來指定隊列的長度,默認時或maxsize值小于1時隊列為無限長度。
- q.put(item)方法向隊列放入元素,其還有一個可選參數(shù)block,默認為True,此時若隊列已滿則會阻塞等待,直到有空閑位置。而當black值為 False,在該情況下就會拋出Full異 常
- Queue是不可迭代的對象,不能通過for循環(huán)取值,取值時每次調用q.get()方法。同樣也有可選參數(shù)block,默認為True,若此時隊列為空則會阻塞等待。而black值為False時,在該情況下就會拋出Empty異常
- Queue.qsize() 返回隊列的大小
- Queue.empty() 如果隊列為空,返回True,反之False
- Queue.full() 如果隊列滿了,返回True,反之False
- Queue.get([block[, timeout]]) 獲取隊列,timeout等待時間Queue.get_nowait() 相當Queue.get(False) 非阻塞 Queue.put(item) 寫入隊列,timeout等待時間
- Queue.put_nowait(item) 相當Queue.put(item, False)
Pipe
Pipe管道,可以是單向(half-duplex),也可以是雙向(duplex)。我們通過mutiprocessing.Pipe(duplex=False)創(chuàng)建單向管道 (默認為雙向)。雙向Pipe允許兩端的進即可以發(fā)送又可以接受;單向的Pipe只允許前面的端口用于接收,后面的端口用于發(fā)送。
下面給出例子:
# -*- coding:utf-8 -*- from multiprocessing import Process, Pipe def proc1(pipe): s = 'Hello,This is proc1' pipe.send(s) def proc2(pipe): while True: print "proc2 recieve:", pipe.recv() if __name__ == "__main__": pipe = Pipe() p1 = Process(target=proc1, args=(pipe[0],)) p2 = Process(target=proc2, args=(pipe[1],)) p1.start() p2.start() p1.join() p2.join(2) #限制執(zhí)行時間最多為2秒 print '\nend all processes.'
執(zhí)行結果如下:
proc2 recieve: Hello,This is proc1
proc2 recieve:
end all processes.
當?shù)诙休敵龊?,因為管道中沒有數(shù)據(jù)傳來,Proc2處于阻塞狀態(tài),2秒后被強制結束。
以下是單向管道的例子,注意pipe[0],pipe[1]的分配。
# -*- coding:utf-8 -*- from multiprocessing import Process, Pipe def proc1(pipe): s = 'Hello,This is proc1' pipe.send(s) def proc2(pipe): while True: print "proc2 recieve:", pipe.recv() if __name__ == "__main__": pipe = Pipe(duplex=False) p1 = Process(target=proc1, args=(pipe[1],)) #pipe[1]為發(fā)送端 p2 = Process(target=proc2, args=(pipe[0],)) #pipe[0]為接收端 p1.start() p2.start() p1.join() p2.join(2) # 限制執(zhí)行時間最多為2秒 print '\nend all processes.'
執(zhí)行結果同上。
強大的Manage
Queue和Pipe實現(xiàn)的數(shù)據(jù)共享方式只支持兩種結構 Value 和 Array。Python中提供了強大的Manage專門用來做數(shù)據(jù)共享,其支持的類型非常多,包括: Value,Array,list, dict,Queue, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event等
其用法如下:
from multiprocessing import Process, Manager def func(dt, lt): for i in range(10): key = 'arg' + str(i) dt[key] = i * i lt += range(11, 16) if __name__ == "__main__": manager = Manager() dt = manager.dict() lt = manager.list() p = Process(target=func, args=(dt, lt)) p.start() p.join() print dt, '\n', lt
執(zhí)行結果:
{‘a(chǎn)rg8': 64, ‘a(chǎn)rg9': 81, ‘a(chǎn)rg0': 0, ‘a(chǎn)rg1': 1, ‘a(chǎn)rg2': 4, ‘a(chǎn)rg3': 9, ‘a(chǎn)rg4': 16, ‘a(chǎn)rg5': 25, ‘a(chǎn)rg6': 36, ‘a(chǎn)rg7': 49}
[11, 12, 13, 14, 15]
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
相關文章
Python中定時任務框架APScheduler的快速入門指南
APScheduler是基于Quartz的一個Python定時任務框架,實現(xiàn)了Quartz的所有功能,使用起來十分方便。下面這篇文章主要跟大家介紹了Python中定時任務框架APScheduler的快速入門指南,需要的朋友可以參考借鑒,下面來一起看看吧。2017-07-07Python?tkinter中l(wèi)abel控件動態(tài)改變值問題
這篇文章主要介紹了Python?tkinter中l(wèi)abel控件動態(tài)改變值問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-01-01django-rest-swagger對API接口注釋的方法
今天小編就為大家分享一篇django-rest-swagger對API接口注釋的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-08-08一文掌握python中的__init__的意思及使用場景分析
__init__是構造方法,誰調用,表示誰(更直觀的理解就是類的方法中,誰調用,表示誰,見下面第一個代碼)??!并不是必選項,也就是說在類中,這個不是必須用的,那什么場景需要用到,什么場景不需要用到呢,感興趣的朋友跟隨小編一起看看吧2023-02-02