python多進(jìn)程中的生產(chǎn)者和消費(fèi)者模型詳解
Python生產(chǎn)者消費(fèi)者模型
一、消費(fèi)模式
生產(chǎn)者消費(fèi)者模式 是Controlnet網(wǎng)絡(luò)中特有的一種傳輸數(shù)據(jù)的模式。用于兩個CPU之間傳輸數(shù)據(jù),即使是不同類型同一廠家的CPU也可以通過設(shè)置來使用。
二、傳輸原理
- 類似與點(diǎn)對點(diǎn)傳送,又略有不同,一個生產(chǎn)者可以對應(yīng)N個消費(fèi)者,但是一個消費(fèi)者只能對應(yīng)一個生產(chǎn)者;
- 每個生產(chǎn)者消費(fèi)者對應(yīng)一個地址,占一個網(wǎng)絡(luò)節(jié)點(diǎn),屬于預(yù)定性數(shù)據(jù),在網(wǎng)絡(luò)中優(yōu)先級最高;
- 此模式如果在網(wǎng)絡(luò)中設(shè)置過多會影響網(wǎng)絡(luò)傳輸速度,一般用在傳輸比較重要的信息上,比如設(shè)備的啟動、停止、故障、急停等等;
- 在Controlnet網(wǎng)絡(luò)中節(jié)點(diǎn)數(shù)是有限制的,最高節(jié)點(diǎn)數(shù)為99。
- 如果兩個控制器之前建立了多個生產(chǎn)者消費(fèi)者的連接,只要一個失敗,則所有的均失敗,將數(shù)據(jù)整合到用戶自定義結(jié)構(gòu)或數(shù)組中 ,兩個控制器中只保留一個連接。
- 生產(chǎn)者消費(fèi)者信息可以通過以太網(wǎng)和Controlnet傳輸,但是同時(shí)只能通過一種途徑傳輸;
- 建立標(biāo)簽時(shí)必須建立在全局變量里面,不能建立在局部變量里標(biāo)簽的大小不能超過500B;
- 如果生產(chǎn)者幾個數(shù)據(jù)傳輸?shù)降酵粋€控制器的的幾個消費(fèi)者中,將幾個數(shù)據(jù)合并在一個用戶自定義標(biāo)簽中,可以減少連接數(shù),但合并后的數(shù)據(jù)將會會用相同的RPI。
- 生產(chǎn)者消費(fèi)者標(biāo)簽只能用DINT和REAL,或它們的數(shù)組,或用戶自定義結(jié)構(gòu)數(shù)據(jù),因?yàn)閷ν獠僮鲾?shù)據(jù)必須是32位的,如果有SINT和INT的數(shù)據(jù)要傳輸,必須將它們組合在用戶自定義結(jié)構(gòu)中傳送,生產(chǎn)者和消費(fèi)者的標(biāo)簽數(shù)據(jù)格式必須一致,才能確保數(shù)據(jù)的準(zhǔn)確性,如果數(shù)據(jù)打包后超過了 32位,那么生產(chǎn)者和消費(fèi)者雙方必須使用一個復(fù)制緩沖指令,以獲得數(shù)據(jù)的同步,例如Control Logix中的CPS指令。
- 如果生產(chǎn)者要發(fā)送的32位數(shù)據(jù),與非Control Logix的對方設(shè)備的數(shù)據(jù)結(jié)構(gòu)不匹配,例如對方是16位的數(shù)據(jù),為避免偏差,改為用戶自定義結(jié)構(gòu)。
- 消費(fèi)者的 RPI必須大于等于網(wǎng)絡(luò)刷新時(shí)間NUT,如果幾個消費(fèi)者請求同一個生產(chǎn)者,則會以最小最快的RPI為準(zhǔn)。
三、實(shí)現(xiàn)方式
方法一:
import threading,queue,time # 創(chuàng)建一個隊(duì)列,隊(duì)列最大長度為2 q = queue.Queue(maxsize=2) def product(): while True: # 生產(chǎn)者往隊(duì)列塞數(shù)據(jù) q.put('money') print('生產(chǎn)了money, 生產(chǎn)時(shí)間:', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) def consume(): while True: time.sleep(0.5) # 消費(fèi)者取出數(shù)據(jù) data = q.get() print('消費(fèi)了%s, 消費(fèi)時(shí)間%s' % (data, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))) t = threading.Thread(target=product) t1 = threading.Thread(target=consume) t.start() t1.start()
缺點(diǎn):
實(shí)現(xiàn)了多少個消費(fèi)者consumer進(jìn)程,就需要在最后往隊(duì)列中添加多少個None標(biāo)識,方便生產(chǎn)完畢結(jié)束消費(fèi)者consumer進(jìn)程。否則,p.get() 不到任務(wù)會阻塞子進(jìn)程,因?yàn)閣hile循環(huán),直到隊(duì)列q中有新的任務(wù)加進(jìn)來,才會再次執(zhí)行。而我們的生產(chǎn)者只能生產(chǎn)這么多東西,所以相當(dāng)于程序卡死。
方法二:
from multiprocessing import JoinableQueue,Process import time def producer(q): for i in range(4): time.sleep(0.5) f = '生產(chǎn)者:已經(jīng)生產(chǎn)' q.put(f) print(f) q.join() # 一直阻塞,等待消耗完所有的數(shù)據(jù)后才釋放 def consumer(name, q): while True: food = q.get() print('\033[消費(fèi)者:消費(fèi)了%s\033' % name) time.sleep(0.5) q.task_done() # 每次消耗減1 if __name__ == '__main__': q = JoinableQueue() # 創(chuàng)建隊(duì)列 # 模擬生產(chǎn)者隊(duì)列 p1 = Process(target=producer, args=(q, )) p1.start() # 模擬消費(fèi)者隊(duì)列 c1 = Process(target=consumer, args=('money', q)) c1.daemon = True # 守護(hù)進(jìn)程:主進(jìn)程結(jié)束,子進(jìn)程也會結(jié)束 c1.start() p1.join() # 阻塞主進(jìn)程,等到p1子進(jìn)程結(jié)束才往下執(zhí)行
優(yōu)點(diǎn):參考地址
- 使用JoinableQueue組件,是因?yàn)镴oinableQueue中有兩個方法:task_done()和join() 。首先說join()和Process中的join()的效果類似,都是阻塞當(dāng)前進(jìn)程,防止當(dāng)前進(jìn)程結(jié)束。但是JoinableQueue的join()是和task_down()配合使用的。
- Process中的join()是等到子進(jìn)程中的代碼執(zhí)行完畢,就會執(zhí)行主進(jìn)程join()下面的代碼。而JoinableQueue中的join()是等到隊(duì)列中的任務(wù)數(shù)量為0的時(shí)候才會執(zhí)行q.join()下面的代碼,否則會一直阻塞。
- task_down()方法是每獲取一次隊(duì)列中的任務(wù),就需要執(zhí)行一次。直到隊(duì)列中的任務(wù)數(shù)為0的時(shí)候,就會執(zhí)行JoinableQueue的join()后面的方法了。所以生產(chǎn)者生產(chǎn)完所有的數(shù)據(jù)后,會一直阻塞著。不讓p1和p2進(jìn)程結(jié)束。等到消費(fèi)者get()一次數(shù)據(jù),就會執(zhí)行一次task_down()方法,從而隊(duì)列中的任務(wù)數(shù)量減1,當(dāng)數(shù)量為0后,執(zhí)行JoinableQueue的join()后面代碼,從而p1和p2進(jìn)程結(jié)束。
- 因?yàn)閜1和p2添加了join()方法,所以當(dāng)子進(jìn)程中的consumer方法執(zhí)行完后,才會往下執(zhí)行。從而主進(jìn)程結(jié)束。因?yàn)檫@里把消費(fèi)者進(jìn)程c1和c2 設(shè)置成了守護(hù)進(jìn)程,主進(jìn)程結(jié)束的同時(shí),c1和c2 進(jìn)程也會隨之結(jié)束,進(jìn)程都結(jié)束了。所以消費(fèi)者consumer方法也會結(jié)束。
到此這篇關(guān)于python多進(jìn)程中的生產(chǎn)者和消費(fèi)者模型詳解的文章就介紹到這了,更多相關(guān)python生產(chǎn)者和消費(fèi)者模型內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python實(shí)現(xiàn)數(shù)組求和與平均值
這篇文章主要介紹了python實(shí)現(xiàn)數(shù)組求和與平均值方式,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-05-05Python 25行代碼實(shí)現(xiàn)的RSA算法詳解
這篇文章主要介紹了Python 25行代碼實(shí)現(xiàn)的RSA算法,結(jié)合實(shí)例形式詳細(xì)分析了rsa加密算法的概念、原理、相關(guān)實(shí)現(xiàn)技巧與注意事項(xiàng),需要的朋友可以參考下2018-04-04python實(shí)現(xiàn)得到當(dāng)前登錄用戶信息的方法
這篇文章主要介紹了python實(shí)現(xiàn)得到當(dāng)前登錄用戶信息的方法,結(jié)合實(shí)例形式分析了Python在Linux平臺以及Windows平臺使用相關(guān)模塊獲取用戶信息的相關(guān)操作技巧,需要的朋友可以參考下2019-06-06pyTorch深度學(xué)習(xí)softmax實(shí)現(xiàn)解析
這篇文章主要介紹了pytorch深度學(xué)習(xí)中對softmax實(shí)現(xiàn)進(jìn)行了詳細(xì)解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步2021-09-09