Python進(jìn)程間的通信一起來了解下
通信方式
進(jìn)程彼此之間互相隔離,要實(shí)現(xiàn)進(jìn)程間通信(IPC),multiprocessing模塊主要通過隊(duì)列方式
隊(duì)列:隊(duì)列類似于一條管道,元素先進(jìn)先出
需要注意的一點(diǎn)是:隊(duì)列都是在內(nèi)存中操作,進(jìn)程退出,隊(duì)列清空,另外,隊(duì)列也是一個阻塞的形態(tài)
Queue介紹:
創(chuàng)建隊(duì)列的類(底層就是以管道和鎖定的方式實(shí)現(xiàn)):
Queue([maxsize]):創(chuàng)建共享的進(jìn)程隊(duì)列,Queue是多進(jìn)程安全的隊(duì)列,
可以使用Queue實(shí)現(xiàn)多進(jìn)程之間的數(shù)據(jù)傳遞。maxsize是隊(duì)列中允許最大項(xiàng)數(shù),省略則無大小限制。
方法介紹:
def put(self, obj, block=True, timeout=None):插入數(shù)據(jù)到隊(duì)列中 Block值默認(rèn)為True,代表當(dāng)隊(duì)列已滿時,會阻塞。如果block為False,則隊(duì)列滿會報異常Queue.Full timeout表示會阻塞到指定時間,直到有剩余的空間供插入,如果時間超時,則報異常Queue.Full def get(self, block=True, timeout=None):從隊(duì)列中取出數(shù)據(jù) Block值默認(rèn)為True,代表當(dāng)隊(duì)列為空時,會阻塞。如果block為False,則隊(duì)列空會報異常Queue.Empty timeout表示會等待到指定時間,直到取出數(shù)據(jù),如果時間超時,則報異常Queue.Empty def empty(self): 判斷隊(duì)列是否為空,如果空返回True def full(self): 判斷隊(duì)列是否已滿,如果滿返回True def qsize(self): 返回隊(duì)列的大小
應(yīng)用舉例:
from multiprocessing import Process, Manager q = Manager().Queue(2) q.put(1) q.put(2,block=False,timeout=2) def func(): print(q.get()) p = Process(target=func) print("size",q.qsize()) print("full",q.full()) p.start() p.join() print("empty",q.empty()) print("get", q.get()) print("get", q.get(block=False,timeout=2))
輸出結(jié)果
生產(chǎn)者和消費(fèi)者模型
在并發(fā)編程中使用生產(chǎn)者和消費(fèi)者模式能夠解決絕大多數(shù)并發(fā)問題。該模式通過平衡生產(chǎn)線程和消費(fèi)線程的工作能力來提高程序的整體處理數(shù)據(jù)的速度。
為什么要使用生產(chǎn)者和消費(fèi)者模式
在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費(fèi)者就是消費(fèi)數(shù)據(jù)的線程。在多線程開發(fā)當(dāng)中,如果生產(chǎn)者處理速度很快,而消費(fèi)者處理速度很慢,那么生產(chǎn)者就必須等待消費(fèi)者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費(fèi)者的處理能力大于生產(chǎn)者,那么消費(fèi)者就必須等待生產(chǎn)者。為了解決這個問題于是引入了生產(chǎn)者和消費(fèi)者模式。
什么是生產(chǎn)者消費(fèi)者模式
生產(chǎn)者消費(fèi)者模式是通過一個容器來解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問題。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而通過阻塞隊(duì)列來進(jìn)行通訊:
生產(chǎn)者,只需要往隊(duì)列里面丟東西(生產(chǎn)者不需要關(guān)心消費(fèi)者)
消費(fèi)者,只需要從隊(duì)列里面拿東西(消費(fèi)者也不需要關(guān)心生產(chǎn)者)
阻塞隊(duì)列就相當(dāng)于一個緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的處理能力。
實(shí)現(xiàn)方式一:Queue
from multiprocessing import Process,Manager,active_children import random import queue import time class Producer(Process): def __init__(self,queue): super().__init__() self.queue = queue def run(self): for i in range(6): r = random.randint(0, 99) time.sleep(1) self.queue.put(r) print("add data{}".format(r)) class Consumer(Process): def __init__(self,queue): super().__init__() self.queue = queue def run(self): while True: if not self.queue.empty(): data = self.queue.get() print("minus data{}".format(data)) if __name__ == '__main__': q = Manager().Queue() # 創(chuàng)建隊(duì)列 p = Producer(q) c = Consumer(q) p.start() c.start() print(active_children()) # 查看現(xiàn)有的進(jìn)程 p.join() c.join() print("結(jié)束")
實(shí)現(xiàn)方式二:利用JoinableQueue
JoinableQueue([maxsize]):一個Queue對象,但隊(duì)列允許項(xiàng)目的使用者通知生成者項(xiàng)目已經(jīng)被成功處理。通知進(jìn)程是使用共享的信號和條件變量來實(shí)現(xiàn)的。
JoinableQueue的實(shí)例除了與Queue對象相同的方法之外還具有:
task_done():使用者使用此方法發(fā)出信號,表示get()的返回項(xiàng)目已經(jīng)被處理。如果調(diào)用此方法的次數(shù)大于從隊(duì)列中刪除項(xiàng)目的數(shù)量,將引發(fā)ValueError異常
join():生產(chǎn)者調(diào)用此方法進(jìn)行阻塞,直到隊(duì)列中所有的項(xiàng)目均被處理。阻塞將持續(xù)到隊(duì)列中的每個項(xiàng)目均調(diào)用task_done()方法為止
from multiprocessing import Process,JoinableQueue import os import time import random def print_log(msg, log_type="prod"): if log_type == 'prod': print("\033[32;1m%s\033[0m" %msg) elif log_type == 'con': print("\033[31;1m%s\033[0m" %msg) def producer(q): """ 生產(chǎn)者 :param q: :return: """ for i in range(10): data = random.randint(1,200) time.sleep(2) q.put(data) # 放入隊(duì)列 msg = "add data {}".format(data) print_log(msg) q.join() # 生產(chǎn)者調(diào)用此方法進(jìn)行阻塞,直到隊(duì)列中所有的項(xiàng)目均被處理。 # 阻塞將持續(xù)到隊(duì)列中的每個項(xiàng)目均調(diào)用q.task_done()方法為止 def consumer(q): """ 消費(fèi)者 :param q: :return: """ while True: if not q.empty(): time.sleep(5) data = q.get() msg = "minus data{}".format(data) print_log(msg,"con") q.task_done() # q.get()的返回項(xiàng)目已經(jīng)被處理 if __name__ == '__main__': q = JoinableQueue() prod = Process(target=producer, args=(q,)) con = Process(target=consumer, args=(q,)) con.daemon = True # 設(shè)置為守護(hù)進(jìn)程,但是不用擔(dān)心,producer內(nèi)調(diào)用q.join保證了consumer已經(jīng)處理完隊(duì)列中的所有元素 # 開啟進(jìn)程 prod.start() con.start() prod.join() # 等待生產(chǎn)和消費(fèi)完成,主線程結(jié)束 print("結(jié)束")
輸出結(jié)果
總結(jié)
本篇文章就到這里了,希望能夠給你帶來幫助,也希望您能夠多多關(guān)注腳本之家的更多內(nèi)容!
相關(guān)文章
Python+OpenCV實(shí)現(xiàn)圖片中的圓形檢測
這篇文章主要介紹了如何利用Python+OpenCV實(shí)現(xiàn)檢測圖片中的圓形,文中的示例代碼講解詳細(xì),感興趣的小伙伴快跟隨小編一起學(xué)習(xí)一下2022-04-04python自帶緩存lru_cache用法及擴(kuò)展的使用
本篇博客將結(jié)合python官方文檔和源碼詳細(xì)講述lru_cache緩存方法是怎么實(shí)現(xiàn),具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-08-08python+html實(shí)現(xiàn)前后端數(shù)據(jù)交互界面顯示的全過程
最近項(xiàng)目中采用了前后端分離的技術(shù),感覺有必要給大家總結(jié)下,所以下面這篇文章主要給大家介紹了關(guān)于python+html實(shí)現(xiàn)前后端數(shù)據(jù)交互界面顯示的相關(guān)資料,需要的朋友可以參考下2022-06-06Python 26進(jìn)制計(jì)算實(shí)現(xiàn)方法
這篇文章主要介紹了Python 26進(jìn)制計(jì)算實(shí)現(xiàn)方法,涉及Python字符串與數(shù)值計(jì)算的相關(guān)操作技巧,需要的朋友可以參考下2015-05-05在MAC上搭建python數(shù)據(jù)分析開發(fā)環(huán)境
這篇文章主要介紹了在MAC上搭建python數(shù)據(jù)分析開發(fā)環(huán)境的相關(guān)資料,需要的朋友可以參考下2016-01-01Python實(shí)現(xiàn)string字符串連接的方法總結(jié)【8種方式】
這篇文章主要介紹了Python實(shí)現(xiàn)string字符串連接的方法,結(jié)合實(shí)例形式總結(jié)分析了Python實(shí)現(xiàn)字符串連接的8種常見操作技巧,需要的朋友可以參考下2018-07-07解決Pytorch中的神坑:關(guān)于model.eval的問題
這篇文章主要介紹了解決Pytorch中的神坑:關(guān)于model.eval的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-05-05