python實(shí)現(xiàn)多進(jìn)程通信實(shí)例分析
操作系統(tǒng)會(huì)為每一個(gè)創(chuàng)建的進(jìn)程分配一個(gè)獨(dú)立的地址空間,不同進(jìn)程的地址空間是完全隔離的,因此如果不加其他的措施,他們完全感覺(jué)不到彼此的存在。那么進(jìn)程之間怎么進(jìn)行通信?他們之間的關(guān)聯(lián)是怎樣的?實(shí)現(xiàn)原理是什么?本文就來(lái)借助Python簡(jiǎn)單的聊一下進(jìn)程之間的通信?還是那句話,原理是相同的,希望能透過(guò)具體的例子來(lái)體會(huì)一下本質(zhì)的東西。
下面盡量以簡(jiǎn)單的方式介紹一下每一類通信方式,具體的細(xì)節(jié)可以參照文檔使用;
1. 管道
先來(lái)看一下最簡(jiǎn)單、古老的一種IPC:管道。通常指的是無(wú)名管道,本質(zhì)上可以看做一種文件,只存在于內(nèi)存當(dāng)中,不會(huì)存盤(pán)。不同進(jìn)程通過(guò)系統(tǒng)提供的接口來(lái)向管道中讀取或者寫(xiě)入數(shù)據(jù)。
也就是說(shuō)我們通過(guò)這樣一個(gè)中間介質(zhì)為進(jìn)程提供交流的方式。無(wú)名管道的局限在于一般只用于有直接關(guān)聯(lián)關(guān)系的父子進(jìn)程。下面通過(guò)一個(gè)簡(jiǎn)單的例子來(lái)看一下其用法。
from multiprocessing import Process, Pipe def pstart(pname, conn): conn.send("Data@subprocess") print(conn.recv()) # Data@parentprocess if __name__ == '__main__': conn1, conn2 = Pipe(True) sub_proc = Process(target=pstart, args=('subprocess', conn2,)) sub_proc.start() print (conn1.recv()) # Data@subprocess conn1.send("Data@parentprocess") sub_proc.join()
管道通信三步曲:
- 創(chuàng)建Pipe,得到兩個(gè)connection對(duì)象conn1和conn2;
- 父進(jìn)程持有conn1,將conn2傳遞給子進(jìn)程;
- 父子進(jìn)程通過(guò)對(duì)持有的connection對(duì)象進(jìn)行send和recv操作以進(jìn)行數(shù)據(jù)傳遞和接受;
上面我們創(chuàng)建的是全雙工管道,也可以創(chuàng)建半雙工管道,具體使用可以參照官網(wǎng)描述:
Returns a pair
(conn1, conn2)
ofConnection
objects representing the ends of a pipe.If duplex is
True
(the default) then the pipe is bidirectional. If duplex isFalse
then the pipe is unidirectional:conn1
can only be used for receiving messages andconn2
can only be used for sending messages.
2. 具名管道(FIFO)
上面介紹的管道主要用于有直接關(guān)系的進(jìn)程,局限性比較大。下面來(lái)看一下可以在任意進(jìn)程間進(jìn)行通信的具名管道。
由于window平臺(tái)上os模塊沒(méi)有mkfifo屬性,因此這個(gè)例子只能在linux上運(yùn)行(測(cè)試環(huán)境 CentOS 7, Python 2.7.5):
#!/usr/bin/python import os, time from multiprocessing import Process input_pipe = "./pipe.in" output_pipe = "./pipe.out" def consumer(): if os.path.exists(input_pipe): os.remove(input_pipe) if os.path.exists(output_pipe): os.remove(output_pipe) os.mkfifo(output_pipe) os.mkfifo(input_pipe) in1 = os.open(input_pipe, os.O_RDONLY) # read from pipe.in out1 = os.open(output_pipe, os.O_SYNC | os.O_CREAT | os.O_RDWR) while True: read_data = os.read(in1, 1024) print("received data from pipe.in: %s @consumer" % read_data) if len(read_data) == 0: time.sleep(1) continue if "exit" in read_data: break os.write(out1, read_data) os.close(in1) os.close(out1) def producer(): in2 = None out2 = os.open(input_pipe, os.O_SYNC | os.O_CREAT | os.O_RDWR) for i in range(1, 4): msg = "msg " + str(i) len_send = os.write(out2, msg) print("------product msg: %s by producer------" % msg) if in2 is None: in2 = os.open(output_pipe, os.O_RDONLY) # read from pipe.out data = os.read(in2, 1024) if len(data) == 0: break print("received data from pipe.out: %s @producer" % data) time.sleep(1) os.write(out2, 'exit') os.close(in2) os.close(out2) if __name__ == '__main__': pconsumer = Process(target=consumer, args=()) pproducer = Process(target=producer, args=()) pconsumer.start() time.sleep(0.5) pproducer.start() pconsumer.join() pproducer.join()
運(yùn)行流程如下:
每一輪的過(guò)程如下:
- producer進(jìn)程往pipe.in文件中寫(xiě)入消息數(shù)據(jù);
- consumer進(jìn)程從pipe.in文件中讀入消息數(shù)據(jù);
- consumer進(jìn)程往pipe.out文件中寫(xiě)入回執(zhí)消息數(shù)據(jù);
- producer進(jìn)程從pipe.out文件中讀出回執(zhí)消息數(shù)據(jù);
結(jié)果如下:
[shijun@localhost python]$ python main.py ------product msg: msg 1 by producer------ received data from pipe.in: msg 1 @consumer received data from pipe.out: msg 1 @producer ------product msg: msg 2 by producer------ received data from pipe.in: msg 2 @consumer received data from pipe.out: msg 2 @producer ------product msg: msg 3 by producer------ received data from pipe.in: msg 3 @consumer received data from pipe.out: msg 3 @producer received data from pipe.in: exit @consumer
兩個(gè)進(jìn)程沒(méi)有直接的關(guān)系,每個(gè)進(jìn)程有一個(gè)讀文件和寫(xiě)文件,如果兩個(gè)進(jìn)程的讀寫(xiě)文件是關(guān)聯(lián)的,就可以進(jìn)行通信。
3. 消息隊(duì)列(Queue)
進(jìn)程之間通過(guò)向隊(duì)列中添加數(shù)據(jù)或者從隊(duì)列中獲取數(shù)據(jù)來(lái)進(jìn)行消息數(shù)據(jù)的傳遞。下面是一個(gè)簡(jiǎn)單的例子。
from multiprocessing import Process, Queue import time def producer(que): for product in ('Orange', 'Apple', ''): print('put product: %s to queue' % product) que.put(product) time.sleep(0.5) res = que.get() print('consumer result: %s' % res) def consumer(que): while True: product = que.get() print('get product:%s from queue' % product) que.put('suc!') time.sleep(0.5) if not product: break if __name__ == '__main__': que = Queue(1) p = Process(target=producer, args=(que,)) c = Process(target=consumer, args=(que,)) p.start() c.start() p.join() c.join()
這個(gè)例子比較簡(jiǎn)單,queue的具體用法可以參考一下官網(wǎng)。
結(jié)果:
put product: Orange to queue consumer result: suc! put product: Apple to queue consumer result: suc! put product: to queue consumer result: suc! get product:Orange from queue get product:Apple from queue get product: from queue
這里有幾點(diǎn)需要注意下:
- 可以指定隊(duì)列的容量,如果超出容量會(huì)有異常:raise Full;
- 默認(rèn)put和get均會(huì)阻塞當(dāng)前進(jìn)程;
- 如果put沒(méi)有設(shè)置成阻塞,那么可能自己從隊(duì)列中取出自己放入的數(shù)據(jù);
4. 共享內(nèi)存
共享內(nèi)存是一種常用的,高效的進(jìn)程之間的通信方式,為了保證共享內(nèi)存的有序訪問(wèn),需要對(duì)進(jìn)程采取額外的同步措施。
下面的這個(gè)例子僅僅簡(jiǎn)單的演示了Python中如何在不同進(jìn)程間使用共享內(nèi)存進(jìn)行通信的。
from multiprocessing import Process import mmap import contextlib import time def writer(): with contextlib.closing(mmap.mmap(-1, 1024, tagname='cnblogs', access=mmap.ACCESS_WRITE)) as mem: for share_data in ("Hello", "Alpha_Panda"): mem.seek(0) print('Write data:== %s == to share memory!' % share_data) mem.write(str.encode(share_data)) mem.flush() time.sleep(0.5) def reader(): while True: invalid_byte, empty_byte = str.encode('\x00'), str.encode('') with contextlib.closing(mmap.mmap(-1, 1024, tagname='cnblogs', access=mmap.ACCESS_READ)) as mem: share_data = mem.read(1024).replace(invalid_byte, empty_byte) if not share_data: """ 當(dāng)共享內(nèi)存沒(méi)有有效數(shù)據(jù)時(shí)結(jié)束reader """ break print("Get data:== %s == from share memory!" % share_data.decode()) time.sleep(0.5) if __name__ == '__main__': p_reader = Process(target=reader, args=()) p_writer = Process(target=writer, args=()) p_writer.start() p_reader.start() p_writer.join() p_reader.join()
執(zhí)行結(jié)果:
Write data:== Hello == to share memory! Write data:== Alpha_Panda == to share memory! Get data:== Hello == from share memory! Get data:== Alpha_Panda == from share memory!
下面簡(jiǎn)單的來(lái)說(shuō)明一下共享內(nèi)存的原理;
進(jìn)程虛擬地址到物理地址的一個(gè)映射關(guān)如下:
上面這個(gè)圖已經(jīng)很明白的展示了共享內(nèi)存的原理。
左邊是正常情況下,不同進(jìn)程的線性地址空間被映射到不同的物理內(nèi)存頁(yè),這樣不管其他進(jìn)程怎么修改物理內(nèi)存,都不會(huì)影響到其他進(jìn)程;
右邊表示的是進(jìn)程共享內(nèi)存的情況下,不同進(jìn)程的部分線性地址會(huì)被映射到同一物理頁(yè),一個(gè)進(jìn)程對(duì)這個(gè)物理頁(yè)的修改,會(huì)對(duì)另一個(gè)進(jìn)程立即可見(jiàn);
當(dāng)然潛在的問(wèn)題就是要采取進(jìn)程同步措施,也就是對(duì)共享內(nèi)存的訪問(wèn)必須是互斥的。這個(gè)可以借助信號(hào)量來(lái)實(shí)現(xiàn)。
5. socket通信
最后再來(lái)介紹一種可以跨主機(jī)的進(jìn)程間通信:socket。
懂網(wǎng)絡(luò)編程的人,對(duì)這個(gè)應(yīng)該都比較熟悉。socket不僅可以跨主機(jī)進(jìn)行通信,甚至有時(shí)候可以使用socket在同一主機(jī)的不同進(jìn)程間進(jìn)行通信。
這部分代碼比較簡(jiǎn)單常見(jiàn),這里僅僅使用流程圖來(lái)表示一下socket通信的流程及相關(guān)接口。
上圖表示客戶端上某進(jìn)程使用socket和服務(wù)器上監(jiān)聽(tīng)程序進(jìn)行socket通信的一個(gè)流程。
小結(jié)
到這里關(guān)于常見(jiàn)的進(jìn)程間通信相關(guān)的概念和實(shí)例均簡(jiǎn)單介紹了一下。希望本文能讓你對(duì)進(jìn)程間通信有一個(gè)更深入的理解和認(rèn)識(shí)。
結(jié)合之前幾篇介紹線程、進(jìn)程概念及線程間同步的一些措施的介紹,相信應(yīng)該對(duì)線程和進(jìn)程相關(guān)概念有一個(gè)簡(jiǎn)單清晰的認(rèn)識(shí)了。
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
windows、linux下打包Python3程序詳細(xì)方法
這篇文章主要介紹了windows、linux下打包Python3程序詳細(xì)方法,需要的朋友可以參考下2020-03-03Python+OpenCV圖像處理之直方圖統(tǒng)計(jì)
直方圖就是對(duì)圖像的另外一種解釋,它描述了整幅圖像的灰度分布。通過(guò)直方圖我們可以對(duì)圖像的亮度、灰度分布、對(duì)比度等有了一個(gè)直觀的認(rèn)識(shí)。本文將為大家詳細(xì)介紹一下如何通過(guò)OpenCV實(shí)現(xiàn)直方圖統(tǒng)計(jì),感興趣的可以了解一下2021-12-12python 通過(guò)SSHTunnelForwarder隧道連接redis的方法
今天小編就為大家分享一篇python 通過(guò)SSHTunnelForwarder隧道連接redis的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-02-02Python批量添加水印的優(yōu)雅實(shí)現(xiàn)與進(jìn)階
在日常圖像處理中,為圖片添加水印是一項(xiàng)常見(jiàn)任務(wù),有多種方法和工具可供選擇,本文將專注于使用Python語(yǔ)言結(jié)合PIL庫(kù)批量添加水印,感興趣的可以了解下2023-12-12Python鍵盤(pán)輸入轉(zhuǎn)換為列表的實(shí)例
今天小編就為大家分享一篇Python鍵盤(pán)輸入轉(zhuǎn)換為列表的實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-06-06Python繪圖系統(tǒng)之繪制散點(diǎn)圖,極坐標(biāo)和子圖
這篇文章主要為大家詳細(xì)介紹了如何基于Python實(shí)現(xiàn)一個(gè)繪圖系統(tǒng),可以支持繪制散點(diǎn)圖,極坐標(biāo)和子圖,文中的示例代碼講解詳細(xì),感興趣的可以了解下2023-09-09利用OpenCV中對(duì)圖像數(shù)據(jù)進(jìn)行64F和8U轉(zhuǎn)換的方式
這篇文章主要介紹了利用OpenCV中對(duì)圖像數(shù)據(jù)進(jìn)行64F和8U轉(zhuǎn)換的方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-06-06Python實(shí)現(xiàn)類的創(chuàng)建與使用方法示例
這篇文章主要介紹了Python實(shí)現(xiàn)類的創(chuàng)建與使用方法,結(jié)合簡(jiǎn)單計(jì)算器功能實(shí)例分析了Python類的定義與使用方法,需要的朋友可以參考下2017-07-07python實(shí)現(xiàn)mask矩陣示例(根據(jù)列表所給元素)
這篇文章主要介紹了python實(shí)現(xiàn)mask矩陣示例(根據(jù)列表所給元素),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-07-07