Python語法學習之進程間的通信方式
什么是進程的通信
這里舉一個例子接介紹通信的機制:通信 一詞大家并不陌生,比如一個人要給他的女友打電話。當建立了通話之后,在這個通話的過程中就是建立了一條隱形的 隊列 (記住這個詞)。此時這個人就會通過對話的方式不停的將信息告訴女友,而這個人的女友也是在傾聽著。(嗯…我個人覺得大部分情況下可能是反著來的)。
這里可以將他們兩個人比作是兩個進程,"這個人"的進程需要將信息發(fā)送給"女友"的進程,就需要一個隊列的幫助。而女友需要不停的接收隊列的信息,可以做一些其他的事情,所以兩個進程之間的通信主要依賴于隊列。
這個隊列可以支持發(fā)送消息與接收消息,“這個人"負責發(fā)送消息,反之"女友” 負責的是接收消息。
既然隊列才是重點,那么來看一下隊列要如何創(chuàng)建。
隊列的創(chuàng)建 - multiprocessing
依然使用 multiprocessing 模塊,調用該模塊的 Queue 函數來實現隊列的創(chuàng)建。
函數名 | 介紹 | 參數 | 返回值 |
---|---|---|---|
Queue | 隊列的創(chuàng)建 | mac_count | 隊列對象 |
Queue 函數功能介紹:調用 Queue 可以創(chuàng)建隊列;它有一個參數 mac_count 代表隊列最大可以創(chuàng)建多少信息,如果不傳默認是無限長度。實例化一個隊列對象之后,需要操作這個隊列的對象進行放入與取出數據。
進程之間通信的方法
函數名 | 介紹 | 參數 | 返回值 |
---|---|---|---|
put | 將消息放入隊列 | message | 無 |
get | 獲取隊列消息 | 無 | str |
put 函數功能介紹:將數據傳入。它有一個參數 message ,是一個字符串類型。
get 函數功能介紹:用來接收隊列中的數據。(其實這里就是一個常用的json場景,有很多的數據傳輸都是 字符串 的,隊列的插入與獲取就是使用的字符串,所以 json 就非常適用這個場景。)
接下來就來練習一下 隊列的使用 。
進程間的通信 - 隊列演示案例
代碼示例如下:
# coding:utf-8 import json import multiprocessing class Work(object): # 定義一個 Work 類 def __init__(self, queue): # 構造函數傳入一個 '隊列對象' --> queue self.queue = queue def send(self, message): # 定義一個 send(發(fā)送) 函數,傳入 message # [這里有個隱藏的bug,就是只判斷了傳入的是否字符串類型;如果傳入的是函數、類、集合等依然會報錯] if not isinstance(message, str): # 判斷傳入的 message 是否為字符串,若不是,則進行 json 序列化 message = json.dumps(message) self.queue.put(message) # 利用 queue 的隊列實例化對象將 message 發(fā)送出去 def receive(self): # 定義一個 receive(接收) 函數,不需傳入參數,但是因為接收是一個源源不斷的過程,所以需要使用 while 循環(huán) while 1: result = self.queue.get() # 獲取 '隊列對象' --> queue 傳入的message # 由于我們接收的 message 可能不是一個字符串,所以要進程異常的捕獲 try: # 如果傳入的 message 符合 JSON 格式將賦值給 res ;若不符合,則直接使用 result 賦值 res res = json.loads(result) except: res = result print('接收到的信息為:{}'.format(res)) if __name__ == '__main__': queue = multiprocessing.Queue() work = Work(queue) send = multiprocessing.Process(target=work.send, args=({'message': '這是一條測試的消息'},)) receive = multiprocessing.Process(target=work.receive) send.start() receive.start()
使用隊列建立進程間通信遇到的異常
但是這里會出現一個 報錯,如下圖:
報錯截圖示例如下:
這里的報錯提示是 文件沒有被發(fā)現的意思 。其實這里是我們使用 隊列做 put() 和 get()的時候 有一把無形的鎖加了上去,就是上圖中圈中的 .SemLock 。我們不需要去關心造成這個錯誤的具體原因,要解決這個問題其實也很簡單。
FileNotFoundError: [Errno 2] No such file or directory 異常的解決
我們只需要給 send 或者 receive 其中一個子進程添加 join 阻塞進程即可,理論上如此。但是我們的 receive子進程是一個 while循環(huán),它會一直執(zhí)行,所以只需要給 send 子進程加上一個 join 即可。
解決示意圖如下:
PS:雖然解決了報錯問題,但是程序沒有正常退出。
實際上由于我們的 receive 進程是個 while循環(huán),并不知道要處理到什么時候,沒有辦法立刻終止。所以我們需要在 receive 進程 使用 terminate() 函數終結接收端。
運行結果如下:
批量給 send 函數加入數據
新建一個函數,寫入 for循環(huán) 模擬批量添加要發(fā)送的消息
然后再給這個模擬批量發(fā)送數據的函數添加一個線程。
示例代碼如下:
# coding:utf-8 import json import time import multiprocessing class Work(object): # 定義一個 Work 類 def __init__(self, queue): # 構造函數傳入一個 '隊列對象' --> queue self.queue = queue def send(self, message): # 定義一個 send(發(fā)送) 函數,傳入 message # [這里有個隱藏的bug,就是只判斷了傳入的是否字符串類型;如果傳入的是函數、類、集合等依然會報錯] if not isinstance(message, str): # 判斷傳入的 message 是否為字符串,若不是,則進行 json 序列化 message = json.dumps(message) self.queue.put(message) # 利用 queue 的隊列實例化對象將 message 發(fā)送出去 def send_all(self): # 定義一個 send_all(發(fā)送)函數,然后通過for循環(huán)模擬批量發(fā)送的 message for i in range(20): self.queue.put('第 {} 次循環(huán),發(fā)送的消息為:{}'.format(i, i)) time.sleep(1) def receive(self): # 定義一個 receive(接收) 函數,不需傳入參數,但是因為接收是一個源源不斷的過程,所以需要使用 while 循環(huán) while 1: result = self.queue.get() # 獲取 '隊列對象' --> queue 傳入的message # 由于我們接收的 message 可能不是一個字符串,所以要進程異常的捕獲 try: # 如果傳入的 message 符合 JSON 格式將賦值給 res ;若不符合,則直接使用 result 賦值 res res = json.loads(result) except: res = result print('接收到的信息為:{}'.format(res)) if __name__ == '__main__': queue = multiprocessing.Queue() work = Work(queue) send = multiprocessing.Process(target=work.send, args=({'message': '這是一條測試的消息'},)) receive = multiprocessing.Process(target=work.receive) send_all = multiprocessing.Process(target=work.send_all,) send_all.start() # 這里因為 send 只執(zhí)行了1次,然后就結束了。而 send_all 卻要循環(huán)20次,它的執(zhí)行時間是最長的,信息也是發(fā)送的最多的 send.start() receive.start() # send.join() # 使用 send 的阻塞會造成 send_all 循環(huán)還未結束 ,receive.terminate() 函數接收端就會終結。 send_all.join() # 所以我們只需要阻塞最長使用率的進程就可以了 receive.terminate()
運行結果如下:
從上圖中我們可以看到 send 與 send_all 兩個進程都可以通過 queue這個實例化的 Queue 對象發(fā)送消息,同樣的 receive接收函數也會將兩個進程傳入的 message 打印輸出出來。
小節(jié)
該章節(jié)我們通過隊列的方式實現了進程間通信的方法,并且了解了隊列的使用方法。一個隊列中,有一端(這里我們演示的是 send端)通過 put方法實現添加相關的信息,另一端使用 get 方法獲取相關的信息;兩個進程相互配合達到一個進程通信的效果。
其實進程之間的通信不僅僅只有隊列這一種方式,感興趣的話還可以通過 管道、信號量、共享內存的方式來實現??梢宰孕型卣挂幌隆?/p>
進程間通信的其他方式 - 補充
python提供了多種進程通信的方式,包括信號,管道,消息隊列,信號量,共享內存,socket等
主要Queue和Pipe這兩種方式,Queue用于多個進程間實現通信,Pipe是兩個進程的通信。
1.管道:分為匿名管道和命名管道
匿名管道:在內核中申請一塊固定大小的緩沖區(qū),程序擁有寫入和讀取的權利,一般使用fock函數實現父子進程的通信
命名管道:在內存中申請一塊固定大小的緩沖區(qū),程序擁有寫入和讀取的權利,沒有血緣關系的進程也可以進程間通信
特點:面向字節(jié)流;生命周期隨內核;自帶同步互斥機制;半雙工,單向通信,兩個管道實現雙向通信
2.消息隊列:在內核中創(chuàng)建一個隊列,隊列中每個元素是一個數據報,不同的進程可以通過句柄去訪問這個隊列。消息隊列提供了一個從一個進程向另外一個進程發(fā)送一塊數據的方法。每個數據塊都被認為是有一個類型,接收者進程接收的數據塊可以有不同的類型。消息隊列也有管道一樣的不足,就是每個消息的最大長度是有上限的,每個消息隊列的總的字節(jié)數是有上限的,系統(tǒng)上消息隊列的總數也有一個上限
特點:消息隊列可以被認為是一個全局的一個鏈表,鏈表節(jié)點中存放著數據報的類型和內容,有消息隊列的標識符進行標記;消息隊列允許一個或多個進程寫入或讀取消息;消息隊列的生命周期隨內核;消息隊列可實現雙向通信
3.信號量:在內核中創(chuàng)建一個信號量集合(本質上是數組),數組的元素(信號量)都是1,使用P操作進行-1,使用V操作+1
P(sv):如果sv的值大于零,就給它減1;如果它的值為零,就掛起該程序的執(zhí)行
V(sv):如果有其他進程因等待sv而被掛起,就讓它恢復運行,如果沒有進程因等待sv而掛起,就給它加1
PV操作用于同一個進程,實現互斥;PV操作用于不同進程,實現同步
功能:對臨界資源進行保護
4.共享內存:將同一塊物理內存一塊映射到不同的進程的虛擬地址空間中,實現不同進程間對同一資源的共享。共享內存可以說是最有用的進程間通信方式,也是最快的IPC形式
特點:不同從用戶態(tài)到內核態(tài)的頻繁切換和拷貝數據,直接從內存中讀取就可以;共享內存是臨界資源,所以需要操作時必須要保證原子性。使用信號量或者互斥鎖都可以.
以上就是Python語法學習之進程間的通信方式的詳細內容,更多關于Python進程通信方式的資料請關注腳本之家其它相關文章!
相關文章
15個應該掌握的Jupyter Notebook使用技巧(小結)
這篇文章主要介紹了15個應該掌握的Jupyter Notebook使用技巧(小結),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-09-09