Python通過隊(duì)列實(shí)現(xiàn)進(jìn)程間通信詳情
一、前言
在多進(jìn)程中,每個進(jìn)程之間是什么關(guān)系呢?其實(shí)每個進(jìn)程都有自己的地址空間、內(nèi)存、數(shù)據(jù)棧以及其他記錄其運(yùn)行狀態(tài)的輔助數(shù)據(jù)。下面通過一個例子,驗(yàn)證一下進(jìn)程之間能否直接共享信息。
定義一個全局變量g_num,分別創(chuàng)建2個子進(jìn)程對g_num執(zhí)行不同的操作,并輸出操作后的結(jié)果。
代碼如下:
# _*_ coding:utf-8 _*_ from multiprocessing import Process def plus(): print("-------子進(jìn)程1開始----------") global g_num g_num += 50 print("g_num is %d" % g_num) print("-------子進(jìn)程1結(jié)束----------") def minus(): print("-------子進(jìn)程2開始----------") global g_num g_num -= 50 print("g_num is %d" % g_num) print("-------子進(jìn)程2結(jié)束----------") g_num = 100 # 定義一個全局變量 if __name__ == "__main__": print("-------主進(jìn)程開始----------") print("g_num is %d" % g_num) p1 = Process(target=plus) # 實(shí)例化進(jìn)程p1 p2 = Process(target=minus) # 實(shí)例化進(jìn)程p2 p1.start() # 開啟p1進(jìn)程 p2.start() # 開啟p2進(jìn)程 p1.join() # 等待p1進(jìn)程結(jié)束 p2.join() # 等待p2進(jìn)程結(jié)束 print("-------主進(jìn)程結(jié)束----------")
運(yùn)行結(jié)果如圖所示:
上述代碼中,分別創(chuàng)建了2個子進(jìn)程,一個子進(jìn)程中令g_num加上50,另一個子進(jìn)程令g_num減去50。但是從運(yùn)行結(jié)果可以看出來,g_num在父進(jìn)程和2個子進(jìn)程中的初始值都是100。也就是全局變量g_num在一個進(jìn)程中的結(jié)果,沒有傳到下一個進(jìn)程中,即進(jìn)程之間沒有共享信息。
進(jìn)程間示意圖如圖所示:
要如何才能實(shí)現(xiàn)進(jìn)程間的通信呢?Python的multiprocessing模塊包裝了底層的機(jī)制,提供了Queue(隊(duì)列)、Pipes(管道)等多種方式來交換數(shù)據(jù)。本文將講解通過隊(duì)列(Queue)來實(shí)現(xiàn)進(jìn)程間的通信。
二、隊(duì)列簡介
隊(duì)列(Queue)就是模型仿現(xiàn)實(shí)中的排隊(duì)。例如學(xué)生在食堂排隊(duì)買飯。新來的學(xué)生排隊(duì)到隊(duì)伍最后,最前面的學(xué)生買完飯走開,后面的學(xué)生跟上。
可以看出隊(duì)列有兩個特點(diǎn):
- 新來的學(xué)生都排在隊(duì)尾。
- 最前的學(xué)生完成后離隊(duì),后面一個跟上。
根據(jù)以上特點(diǎn),可以歸納出隊(duì)列的結(jié)構(gòu)如圖所示:
三、多進(jìn)程隊(duì)列的使用
進(jìn)程之間有時需要通信,操作系統(tǒng)提供了很多機(jī)制來實(shí)現(xiàn)進(jìn)程間的通信??梢允褂胢ultiprocessing模塊的Queue實(shí)現(xiàn)多進(jìn)程之間的數(shù)據(jù)傳遞。Queue本身是一個消息隊(duì)列程序,下面介紹一下Queue的使用。
初始化Queue()對象時(例如:q=Queue(num)),若括號中沒有指定最大可接收的消息數(shù)量,或數(shù)量為負(fù)值,那么就代表可接收的消息數(shù)量沒有上限(直到內(nèi)存的盡頭)。
Queue的常用方法如下:
Queue.qsize():返回當(dāng)前隊(duì)列包含的消息數(shù)量。Queue.empty():如果隊(duì)列為空,返回True;返之返回False。Queue.full():如果隊(duì)列滿了,返回True;反之返回False。Queue.get(block[,timeout]):獲取隊(duì)列中的一條信息,然后將其從隊(duì)列中移除,block默認(rèn)值為True。
如果block使用默認(rèn)值,且沒有設(shè)置timeout(單位秒),消息隊(duì)列為空,此時程序?qū)⒈蛔枞ㄍT谧x取狀態(tài)),直到從消息隊(duì)列讀到消息為止。如果設(shè)置了timeout,則會等待timeout秒,若還沒有讀取任何消息,則拋出“Queue.Empty”異常。
如果block值為False,消息隊(duì)列為空,則會立刻拋出“Queue.Empty”異常。
Queue.get_nowait():相當(dāng)于Queue.get(False)。Queue.put(item,[block[,timeout]]):將item消息寫入隊(duì)列,block默認(rèn)值為True。
如果block使用默認(rèn)值,且沒有設(shè)置timeout(單位秒),消息隊(duì)列如果已經(jīng)沒有空間可以寫入,此時程序?qū)⒈蛔枞?停在寫入狀態(tài)),直到從消息隊(duì)列騰出空間為止,如果設(shè)置了timeout,則會等待timeout秒,若還沒有空間,則拋出“Queue.Full”異常。
如果block值為False,消息隊(duì)列沒有空間可寫入,則會立刻拋出“Queue.Full”異常
Queue.put_nowait(item):相當(dāng)Queue.put(item,False)。
下面,通過一個例子學(xué)習(xí)一下如何使用processing.Queue。
代碼如下:
# _*_ coding:utf-8 _*_ from multiprocessing import Queue if __name__ == "__main__": q = Queue(3) q.put("消息1") q.put("消息2") print(q.full()) # 返回False q.put("消息3") print(q.full()) # 返回True # 因?yàn)橄㈥?duì)列已滿,下面的try都會拋出異常 # 第一個try會等待2秒再拋出異常,第二個try會立刻拋出異常 try: q.put("消息4", True, 2) except: print("消息隊(duì)列已滿,現(xiàn)有消息數(shù)量:%s" % q.qsize()) try: q.put_nowait("消息4") except: print("消息隊(duì)列已滿,現(xiàn)有消息數(shù)量:%s" % q.qsize()) # 讀取消息時,先判斷消息隊(duì)列是否為空,再讀取 if not q.empty(): print("-----從隊(duì)列中獲取消息-------") for i in range(q.qsize()): print(q.get_nowait()) # 先判讀消息隊(duì)列是否已滿,再寫入: if not q.full(): q.put_nowait("消息4")
運(yùn)行結(jié)果如圖所示:
四、使用隊(duì)列在進(jìn)程間通信
我們知道使用multiprocessing.Process可以創(chuàng)建多進(jìn)程,使用multiprocessing.Queue可以實(shí)現(xiàn)隊(duì)列的操作。接下來,通過一個示例結(jié)合Process和Queue實(shí)現(xiàn)進(jìn)程間的通信。
創(chuàng)建2個子進(jìn)程,一個子進(jìn)程負(fù)責(zé)向隊(duì)列中寫入數(shù)據(jù),另外一個子進(jìn)程負(fù)責(zé)從隊(duì)列中讀取數(shù)據(jù)。為了保證能夠正確從隊(duì)列中讀取數(shù)據(jù),設(shè)置讀取數(shù)據(jù)的進(jìn)程等待時間為2秒。如果2秒后乃然無法讀取數(shù)據(jù),則拋出異常。
代碼如下:
# _*_ coding:utf-8 _*_ from multiprocessing import Process, Queue import time # 向隊(duì)列中寫入數(shù)據(jù) def write_task(q): if not q.full(): for i in range(5): message = "消息" + str(i) q.put(message) print("寫入:%s" % message) # 從隊(duì)列中讀取數(shù)據(jù) def read_task(q): time.sleep(1) # 休眠1秒 while not q.empty(): print("讀?。?s" % q.get(True, 2)) # 等待2秒中,如果沒有讀取到任何信息,則拋出異常 if __name__ == "__main__": print("--------父進(jìn)程開始---------") q = Queue() # 父進(jìn)程創(chuàng)建Queue,并傳給各個子進(jìn)程 pw = Process(target=write_task, args=(q,)) # 實(shí)例化寫入隊(duì)列的子進(jìn)程,并傳遞給隊(duì)列 pr = Process(target=read_task, args=(q,)) # 實(shí)例化讀取隊(duì)列的子進(jìn)程,并傳遞給隊(duì)列 pw.start() # 啟動子進(jìn)程pw,寫入 pr.start() # 啟動子進(jìn)程pr,讀取 pw.join() # 等待pw結(jié)束 pr.join() # 等待pr結(jié)束 print("-------父進(jìn)程結(jié)束-----------")
運(yùn)行結(jié)果如下:
到此這篇關(guān)于Python通過隊(duì)列實(shí)現(xiàn)進(jìn)程間通信詳情的文章就介紹到這了,更多相關(guān)Python進(jìn)程間通信 內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python和websocket構(gòu)建實(shí)時日志跟蹤器的步驟
這篇文章主要介紹了python和websocket構(gòu)建實(shí)時日志跟蹤器的步驟,幫助大家更好的理解和學(xué)習(xí)使用python,感興趣的朋友可以了解下2021-04-04python使用 multiprocessing 多進(jìn)程處理批量數(shù)據(jù)的示例代碼
這篇文章主要介紹了使用 multiprocessing 多進(jìn)程處理批量數(shù)據(jù)的示例代碼,本文通過示例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-09-09JupyterLab遠(yuǎn)程密碼訪問實(shí)現(xiàn)
本文主要介紹了JupyterLab遠(yuǎn)程密碼訪問實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-02-02Python matplotlib畫圖與中文設(shè)置操作實(shí)例分析
這篇文章主要介紹了Python matplotlib畫圖與中文設(shè)置操作,結(jié)合實(shí)例形式分析了Python使用matplotlib進(jìn)行圖形繪制及中文設(shè)置相關(guān)操作技巧,需要的朋友可以參考下2019-04-04