Python多進(jìn)程之進(jìn)程同步及通信詳解
上篇文章介紹了什么是進(jìn)程、進(jìn)程與程序的關(guān)系、進(jìn)程的創(chuàng)建與使用、創(chuàng)建進(jìn)程池等,接下來(lái)就來(lái)介紹一下進(jìn)程同步及進(jìn)程通信。
進(jìn)程同步
當(dāng)多個(gè)進(jìn)程使用同一份數(shù)據(jù)資源的時(shí)候,因?yàn)檫M(jìn)程的運(yùn)行沒(méi)有順序,運(yùn)行起來(lái)也無(wú)法控制,如果不加以干預(yù),往往會(huì)引發(fā)數(shù)據(jù)安全或順序混亂的問(wèn)題,所以要在多個(gè)進(jìn)程讀寫共享數(shù)據(jù)資源的時(shí)候加以適當(dāng)?shù)牟呗裕瑏?lái)保證數(shù)據(jù)的一致性問(wèn)題。
Lock(鎖)
一個(gè)Lock對(duì)象有兩個(gè)方法:acquire()和release()來(lái)控制共享數(shù)據(jù)的讀寫權(quán)限, 看下面這張圖片,使用多進(jìn)程的時(shí)候會(huì)經(jīng)常出現(xiàn)這種情況,這是因?yàn)槎鄠€(gè)進(jìn)程都在搶占輸出資源,共享同一打印終端,從而造成了輸出信息的錯(cuò)亂。
那么就可以使用Lock機(jī)制:
import multiprocessing import random import time def work(lock, i): lock.acquire() print("work'{}'執(zhí)行中......".format(i), multiprocessing.current_process().name, multiprocessing.current_process().pid) time.sleep(random.randint(0, 2)) print("work'{}'執(zhí)行完畢......".format(i)) lock.release() if __name__ == '__main__': lock = multiprocessing.Lock() for i in range(5): p = multiprocessing.Process(target=work, args=(lock, i)) p.start()
由于引入了Lock機(jī)制,同一時(shí)間只能有一個(gè)進(jìn)程搶占到輸出資源,其他進(jìn)程等待該進(jìn)程結(jié)束,鎖釋放到,才可以搶占,這樣會(huì)解決多進(jìn)程間資源競(jìng)爭(zhēng)導(dǎo)致數(shù)據(jù)錯(cuò)亂的問(wèn)題,但是由并發(fā)執(zhí)行變成了串行執(zhí)行,會(huì)犧牲運(yùn)行效率。
進(jìn)程通信
上篇文章說(shuō)過(guò),進(jìn)程之間互相隔離,數(shù)據(jù)是獨(dú)立的,默認(rèn)情況下互不影響,那要如何實(shí)現(xiàn)進(jìn)程間通信呢?Python提供了多種進(jìn)程通信的方式,下面就來(lái)說(shuō)一下。
Queue(隊(duì)列)
multiprocessing
模塊提供的Queue多進(jìn)程安全的消息隊(duì)列,可以實(shí)現(xiàn)多進(jìn)程之間的數(shù)據(jù)傳遞。
說(shuō)明
- 初始化Queue()對(duì)象時(shí)(例如:q=Queue()),若括號(hào)中沒(méi)有指定最?可接收的消息數(shù)量,或數(shù)量為負(fù)值,那么就代表可接受的消息數(shù)量沒(méi)有上限(直到內(nèi)存的盡頭)。
Queue.qsize()
:返回當(dāng)前隊(duì)列包含的消息數(shù)量。Queue.empty()
:如果隊(duì)列為空,返回True,反之False。Queue.full()
:如果隊(duì)列滿了,返回True,反之False。Queue.get(block, timeout)
:獲取隊(duì)列中的?條消息,然后將其從列隊(duì)中移除,block默認(rèn)值為True。如果block使?默認(rèn)值,且沒(méi)有設(shè)置timeout(單位秒),消息列隊(duì)如果為空,此時(shí)程序?qū)⒈蛔枞ㄍT谧x取狀態(tài)),直到從消息列隊(duì)讀到消息為?,如果設(shè)置了timeout,則會(huì)等待timeout秒,若還沒(méi)讀取到任何消息,則拋出Queue.Empty異常;如果block值為False,消息列隊(duì)如果為空,則會(huì)?刻拋出Queue.Empty異常。Queue.get_nowait()
:相當(dāng)Queue.get(False)。Queue.put(item, block, timeout)
:將item消息寫?隊(duì)列,block默認(rèn)值為True,如果block使?默認(rèn)值,且沒(méi)有設(shè)置timeout(單位秒),消息列隊(duì)如果已經(jīng)沒(méi)有空間可寫?,此時(shí)程序?qū)⒈蛔枞ㄍT趯?狀態(tài)),直到消息列隊(duì)騰出空間為?,如果設(shè)置了timeout,則會(huì)等待timeout秒,若還沒(méi)空間,則拋出Queue.Full異常;如果block值為False,消息列隊(duì)如果沒(méi)有空間可寫?,則會(huì)?刻拋出Queue.Full異常。Queue.put_nowait(item)
:相當(dāng)于Queue.put(item, False)。
from multiprocessing import Process, Queue import time def write_task(queue): """ 向隊(duì)列中寫入數(shù)據(jù) :param queue: 隊(duì)列 :return: """ for i in range(5): if queue.full(): print("隊(duì)列已滿!") message = "消息{}".format(str(i)) queue.put(message) print("消息{}寫入隊(duì)列".format(str(i))) def read_task(queue): """ 從隊(duì)列讀取數(shù)據(jù) :param queue: 隊(duì)列 :return: """ while True: print("從隊(duì)列讀取:{}".format(queue.get(True))) if __name__ == '__main__': print("主進(jìn)程執(zhí)行......") # 主進(jìn)程創(chuàng)建Queue,最大消息數(shù)量為3 queue = Queue(3) pw = Process(target=write_task, args=(queue, )) pr = Process(target=read_task, args=(queue, )) pw.start() pr.start()
運(yùn)行結(jié)果為:
從結(jié)果我們可以看出,隊(duì)列最大可以放入3條消息,后面再來(lái)消息,要等read_task從隊(duì)列里取出后才行。
Pipe(管道)
Pipe常用于兩個(gè)進(jìn)程,兩個(gè)進(jìn)程分別位于管道的兩端,Pipe(duplex)方法返回(conn1,conn2)代表一個(gè)管道的兩端,duplex參數(shù)默認(rèn)為True,即全雙工模式,若為False,conn1只負(fù)責(zé)接收信息,conn2負(fù)責(zé)發(fā)送。
send()和recv()方法分別是發(fā)送和接受消息的方法。
import multiprocessing import time import random def proc_send(pipe): """ 發(fā)送消息 :param pipe:管道一端 :return: """ for i in range(10): print("process send:{}".format(str(i))) pipe.send(i) time.sleep(random.random()) def proc_recv(pipe): """ 接收消息 :param pipe:管道一端 :return: """ while True: print("Process recv:{}".format(pipe.recv())) time.sleep(random.random()) if __name__ == '__main__': # 主進(jìn)程創(chuàng)建pipe pipe = multiprocessing.Pipe() p1 = multiprocessing.Process(target=proc_send,args=(pipe[0], )) p2 = multiprocessing.Process(target=proc_recv,args=(pipe[1], )) p1.start() p2.start() p1.join() p2.terminate()
執(zhí)行結(jié)果為:
Semaphore(信號(hào)量)
Semaphore用來(lái)控制對(duì)共享資源的訪問(wèn)數(shù)量,和進(jìn)程池的最大連接數(shù)類似。
import multiprocessing import random import time def work(s, i): s.acquire() print("work'{}'執(zhí)行中......".format(i), multiprocessing.current_process().name, multiprocessing.current_process().pid) time.sleep(i*2) print("work'{}'執(zhí)行完畢......".format(i)) s.release() if __name__ == '__main__': s = multiprocessing.Semaphore(2) for i in range(1, 7): p = multiprocessing.Process(target=work, args=(s, i)) p.start()
上面的代碼中使用Semaphore限制了最多有2個(gè)進(jìn)程同時(shí)執(zhí)行,那么來(lái)一個(gè)進(jìn)程獲得一把鎖,計(jì)數(shù)加1,當(dāng)計(jì)數(shù)等于2時(shí),后面再來(lái)的進(jìn)程均需要等待,等前面的進(jìn)程釋放掉,才可以獲得鎖。
信號(hào)量與進(jìn)程池的概念上類似,但是要區(qū)分開來(lái),信號(hào)量涉及到加鎖的概念。
Event(事件)
Event用來(lái)實(shí)現(xiàn)進(jìn)程間同步通信的。運(yùn)行的機(jī)制是:全局定義了一個(gè)flag,如果flag值為False,當(dāng)程序執(zhí)行event.wait()方法時(shí)就會(huì)阻塞,如果flag值為True時(shí),程序執(zhí)行event.wait()方法時(shí)不會(huì)阻塞繼續(xù)執(zhí)行。
Event常?函數(shù):
event.wait()
:在進(jìn)程中插入一個(gè)標(biāo)記(flag),默認(rèn)為False,可以設(shè)置timeout。event.set()
:使flag為Ture。event.clear()
:使flag為False。event.is_set()
:判斷flag是否為True。
import multiprocessing import time def wait_for_event(e): print("wait_for_event執(zhí)行") e.wait() print("wait_for_event: e.is_set():{}".format(e.is_set())) def wait_for_event_timeout(e, t): print("wait_for_event_timeout執(zhí)行") # 只會(huì)阻塞2s e.wait(t) print("wait_for_event_timeout:e.is_set:{}".format(e.is_set())) if __name__ == "__main__": e = multiprocessing.Event() p1 = multiprocessing.Process(target=wait_for_event, args=(e,)) p1.start() p2 = multiprocessing.Process(target=wait_for_event_timeout, args=(e, 2)) p2.start() time.sleep(4) # 4s之后使用e.set()將flag設(shè)為Ture e.set() print("主進(jìn)程:flag設(shè)置為True")
執(zhí)行結(jié)果如下:
總結(jié)
本篇文章就到這里了,希望能夠給你帶來(lái)幫助,也希望您能夠多多關(guān)注腳本之家的更多內(nèi)容!
相關(guān)文章
Python寫捕魚達(dá)人的游戲?qū)崿F(xiàn)
這篇文章主要介紹了Python寫捕魚達(dá)人的游戲?qū)崿F(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-03-03基于Python制作一個(gè)端午節(jié)相關(guān)的小游戲
端午節(jié)快樂(lè),今天我將為大家?guī)?lái)一篇有關(guān)端午節(jié)的編程文章,希望能夠?yàn)榇蠹耀I(xiàn)上一份小小的驚喜,我們將會(huì)使用Python來(lái)實(shí)現(xiàn)一個(gè)與端午粽子相關(guān)的小應(yīng)用程序,在本文中,我將會(huì)介紹如何用Python代碼制做一個(gè)“粽子拆解器”,感興趣的小伙伴歡迎閱讀2023-06-06解決linux下使用python打開terminal時(shí)報(bào)錯(cuò)的問(wèn)題
這篇文章主要介紹了linux下使用python打開terminal時(shí)報(bào)錯(cuò),本文通過(guò)兩種場(chǎng)景分析給大家詳細(xì)講解,需要的朋友可以參考下2023-03-03python 獲取utc時(shí)間轉(zhuǎn)化為本地時(shí)間的方法
今天小編就為大家分享一篇python 獲取utc時(shí)間轉(zhuǎn)化為本地時(shí)間的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-12-12Python爬蟲實(shí)現(xiàn)模擬點(diǎn)擊動(dòng)態(tài)頁(yè)面
這篇文章主要介紹了Python爬蟲實(shí)現(xiàn)模擬點(diǎn)擊動(dòng)態(tài)頁(yè)面,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-03-03Python3 文章標(biāo)題關(guān)鍵字提取的例子
今天小編就為大家分享一篇Python3 文章標(biāo)題關(guān)鍵字提取的例子,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-08-08解決windows下Sublime Text 2 運(yùn)行 PyQt 不顯示的方法分享
問(wèn)題描述:PyQt 環(huán)境正常,可以使用 Windows 的 虛擬 DOS 正常運(yùn)行,但在 Sublime Text 2 下使用 Ctrl + B 運(yùn)行后,界面不顯示,但查看任務(wù)管理器,有 python.exe 進(jìn)程。2014-06-06Scrapy框架爬取Boss直聘網(wǎng)Python職位信息的源碼
今天小編就為大家分享一篇關(guān)于Scrapy框架爬取Boss直聘網(wǎng)Python職位信息的源碼,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧2019-02-02Python Numpy庫(kù)常見(jiàn)用法入門教程
這篇文章主要介紹了Python Numpy庫(kù)常見(jiàn)用法,結(jié)合實(shí)例形式詳細(xì)Fenix了Python numpy庫(kù)基本功能、原理以及數(shù)組常見(jiàn)操作技巧,需要的朋友可以參考下2020-01-01