Python中的進(jìn)程操作模塊(multiprocess.process)
一、multiprocess模塊
multiprocess不是一個模塊而是python中一個操作、管理進(jìn)程的包。
子模塊分為四個部分:
- 創(chuàng)建進(jìn)程部分(multiprocess.process)
- 進(jìn)程同步部分((multiprocess.Lock))
- 進(jìn)程池部分((multiprocess.Pool))
- 進(jìn)程之間數(shù)據(jù)共享(ThreadLocal、multiprocess.Queue、Pipes)
二、multiprocess.process模塊
process模塊是一個創(chuàng)建進(jìn)程的模塊,借助這個模塊,就可以完成進(jìn)程的創(chuàng)建。
在windows中使用process模塊的注意事項(xiàng)
在Windows操作系統(tǒng)中由于沒有fork(linux操作系統(tǒng)中創(chuàng)建進(jìn)程的機(jī)制),在創(chuàng)建子進(jìn)程的時候會自動 import 啟動它的這個文件,而在 import 的時候又執(zhí)行了整個文件。
因此如果將process()直接寫在文件中就會無限遞歸創(chuàng)建子進(jìn)程報錯。所以必須把創(chuàng)建子進(jìn)程的部分使用if \_\_name\_\_ =='\_\_main\_\_'
判斷保護(hù)起來,import 的時候,就不會遞歸運(yùn)行了。
1、使用process模塊創(chuàng)建進(jìn)程
在一個python進(jìn)程中開啟子進(jìn)程,start方法和并發(fā)效果。
1 在Python中啟動的第一個子進(jìn)程
import time from multiprocessing import Process def f(name): print('hello', name) time.sleep(1) print('我是子進(jìn)程') if __name__ == '__main__': p = Process(target=f, args=('bob',)) p.start() # p.join() print('我是父進(jìn)程')
2、 查看主進(jìn)程和子進(jìn)程的進(jìn)程號
import os from multiprocessing import Process def f(x): print('子進(jìn)程id :',os.getpid(),'父進(jìn)程id :',os.getppid()) return x*x if __name__ == '__main__': print('主進(jìn)程id :', os.getpid()) p_lst = [] for i in range(5): p = Process(target=f, args=(i,)) p.start()
3、 進(jìn)階,多個進(jìn)程同時運(yùn)行
注意,子進(jìn)程的執(zhí)行順序不是根據(jù)啟動順序決定的。
import time from multiprocessing import Process def f(name): print('hello', name) time.sleep(1) if __name__ == '__main__': p_lst = [] for i in range(5): p = Process(target=f, args=('bob',)) p.start() p_lst.append(p) p.join() # [p.join() for p in p_lst] print('父進(jìn)程在執(zhí)行')
4、 通過繼承Process類開啟進(jìn)程
import os from multiprocessing import Process class MyProcess(Process): def __init__(self,name): super().__init__() self.name=name def run(self): print(os.getpid()) print('%s 正在和女主播聊天' %self.name) p1=MyProcess('wupeiqi') p2=MyProcess('yuanhao') p3=MyProcess('nezha') p1.start() # start會自動調(diào)用run p2.start() # p2.run() p3.start() p1.join() p2.join() p3.join() print('主線程')
5、 進(jìn)程之間的數(shù)據(jù)隔離問題
from multiprocessing import Process def work(): global n n=0 print('子進(jìn)程內(nèi): ',n) if __name__ == '__main__': n = 100 p=Process(target=work) p.start() print('主進(jìn)程內(nèi): ',n)
2、守護(hù)進(jìn)程daemon
會隨著主進(jìn)程的結(jié)束而結(jié)束。
主進(jìn)程創(chuàng)建守護(hù)進(jìn)程
其一:守護(hù)進(jìn)程會在主進(jìn)程代碼執(zhí)行結(jié)束后就終止
其二:守護(hù)進(jìn)程內(nèi)無法再開啟子進(jìn)程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children
注意:進(jìn)程之間是互相獨(dú)立的,主進(jìn)程代碼運(yùn)行結(jié)束,守護(hù)進(jìn)程隨即終止。
1、 守護(hù)進(jìn)程的啟動
import os import time from multiprocessing import Process class Myprocess(Process): def __init__(self,person): super().__init__() self.person = person def run(self): print(os.getpid(),self.name) print('%s正在和女主播聊天' %self.person) p=Myprocess('哪吒') p.daemon=True # 一定要在p.start()前設(shè)置,設(shè)置p為守護(hù)進(jìn)程,禁止p創(chuàng)建子進(jìn)程,并且父進(jìn)程代碼執(zhí)行結(jié)束,p即終止運(yùn)行 p.start() time.sleep(10) # 在sleep時查看進(jìn)程id對應(yīng)的進(jìn)程ps -ef|grep id print('主')
2、 主進(jìn)程代碼執(zhí)行結(jié)束守護(hù)進(jìn)程立即結(jié)束
from multiprocessing import Process def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") p1=Process(target=foo) p2=Process(target=bar) p1.daemon=True p1.start() p2.start() time.sleep(0.1) print("main-------") # 打印該行則主進(jìn)程代碼結(jié)束,則守護(hù)進(jìn)程p1應(yīng)該被終止.#可能會有p1任務(wù)執(zhí)行的打印信息123,因?yàn)橹鬟M(jìn)程打印main----時,p1也執(zhí)行了,但是隨即被終止,p2可以打印出來。
3、socket聊天并發(fā)實(shí)例
from socket import * from multiprocessing import Process server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn,client_addr): while True: try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': # windows下start進(jìn)程一定要寫到這下面 while True: conn,client_addr=server.accept() p=Process(target=talk,args=(conn,client_addr))
4、進(jìn)程對象的其他方法:terminate和is_alive
from multiprocessing import Process import time import random class Myprocess(Process): def __init__(self,person): self.name=person super().__init__() def run(self): print('%s正在和網(wǎng)紅臉聊天' %self.name) time.sleep(random.randrange(1,5)) print('%s還在和網(wǎng)紅臉聊天' %self.name) p1=Myprocess('哪吒') p1.start() p1.terminate()#關(guān)閉進(jìn)程,不會立即關(guān)閉,所以is_alive立刻查看的結(jié)果可能還是存活 print(p1.is_alive()) #結(jié)果為True print('開始') print(p1.is_alive()) #結(jié)果為False
5、進(jìn)程對象的其他屬性:pid和name
class Myprocess(Process): def __init__(self,person): self.name=person # name屬性是Process中的屬性,標(biāo)示進(jìn)程的名字 super().__init__() # 執(zhí)行父類的初始化方法會覆蓋name屬性 # self.name = person # 在這里設(shè)置就可以修改進(jìn)程名字了 # self.person = person # 如果不想覆蓋進(jìn)程名,就修改屬性名稱就可以了 def run(self): print('%s正在和網(wǎng)紅臉聊天' %self.name) # print('%s正在和網(wǎng)紅臉聊天' %self.person) time.sleep(random.randrange(1,5)) print('%s正在和網(wǎng)紅臉聊天' %self.name) # print('%s正在和網(wǎng)紅臉聊天' %self.person) p1=Myprocess('哪吒') p1.start() print(p1.pid) #可以查看子進(jìn)程的進(jìn)程id
6、參考:
Process([group [, target [, name [, args [, kwargs]]]]]),由該類實(shí)例化得到的對象,表示一個子進(jìn)程中的任務(wù)(尚未啟動)
強(qiáng)調(diào):
1.需要使用關(guān)鍵字的方式來指定參數(shù)
2.args指定的為傳給target函數(shù)的位置參數(shù),是一個元組形式,必須有逗號
參數(shù)介紹:
•group參數(shù)未使用,值始終為None
•target表示調(diào)用對象,即子進(jìn)程要執(zhí)行的任務(wù)
•args表示調(diào)用對象的位置參數(shù)元組,args=(1,2,'egon',)
•kwargs表示調(diào)用對象的字典,kwargs={'name':'egon','age':18}
•name為子進(jìn)程的名稱
1、 方法介紹
•p.start():啟動進(jìn)程,并調(diào)用該子進(jìn)程中的p.run()
•p.run():進(jìn)程啟動時運(yùn)行的方法,正是它去調(diào)用target指定的函數(shù),我們自定義類的類中一定要實(shí)現(xiàn)該方法
•p.terminate():強(qiáng)制終止進(jìn)程p,不會進(jìn)行任何清理操作,如果p創(chuàng)建了子進(jìn)程,該子進(jìn)程就成了僵尸進(jìn)程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那么也將不會被釋放,進(jìn)而導(dǎo)致死鎖
•p.is_alive():如果p仍然運(yùn)行,返回True
•p.join([timeout]):主線程等待p終止(強(qiáng)調(diào):是主線程處于等的狀態(tài),而p是處于運(yùn)行的狀態(tài))。timeout是可選的超時時間,需要強(qiáng)調(diào)的是,p.join只能join住start開啟的進(jìn)程,而不能join住run開啟的進(jìn)程
2、 屬性介紹
•p.daemon:默認(rèn)值為False,如果設(shè)為True,代表p為后臺運(yùn)行的守護(hù)進(jìn)程,當(dāng)p的父進(jìn)程終止時,p也隨之終止,并且設(shè)定為True后,p不能創(chuàng)建自己的新進(jìn)程,必須在p.start()之前設(shè)置
•p.name:進(jìn)程的名稱
•p.pid:進(jìn)程的pid
•p.exitcode:進(jìn)程在運(yùn)行時為None、如果為–N,表示被信號N結(jié)束(了解即可)
•p.authkey:進(jìn)程的身份驗(yàn)證鍵,默認(rèn)是由os.urandom()隨機(jī)生成的32字符的字符串。這個鍵的用途是為涉及網(wǎng)絡(luò)連接的底層進(jìn)程間通信提供安全性,這類連接只有在具有相同的身份驗(yàn)證鍵時才能成功(了解即可)
二、進(jìn)程同步(multiprocess.Lock)
當(dāng)多個進(jìn)程使用同一份數(shù)據(jù)資源的時候,就會引發(fā)數(shù)據(jù)安全或順序混亂問題。
1、多進(jìn)程搶占輸出資源
import os import time import random from multiprocessing import Process def work(n): print('%s: %s is running' % (n, os.getpid())) time.sleep(random.random()) print('%s:%s is done' % (n, os.getpid())) if __name__ == '__main__': for i in range(3): p = Process(target=work, args=(i,)) p.start() # 0: 15620 is running # 1: 19688 is running # 2: 15892 is running # 1:19688 is done # 0:15620 is done # 2:15892 is done
2、使用鎖維護(hù)執(zhí)行順序
由并發(fā)變成了串行,犧牲了運(yùn)行效率,但避免了競爭,確實(shí)會浪費(fèi)了時間,卻保證了數(shù)據(jù)的安全。
import os import time import random from multiprocessing import Process,Lock def work(lock,n): lock.acquire() print('%s: %s is running' % (n, os.getpid())) time.sleep(random.random()) print('%s: %s is done' % (n, os.getpid())) lock.release() if __name__ == '__main__': lock=Lock() for i in range(3): p=Process(target=work,args=(lock,i)) p.start() # 1: 24776 is running # 1: 24776 is done # 0: 23588 is running # 0: 23588 is done # 2: 27308 is running # 2: 27308 is done
3、多進(jìn)程同時搶購余票
# 文件db的內(nèi)容為:{"count":5} # 注意一定要用雙引號,不然json無法識別 # 并發(fā)運(yùn)行,效率高,但競爭寫同一文件,數(shù)據(jù)寫入錯亂 from multiprocessing import Process, Lock import time, json, random def search(): dic = json.load(open('db')) print('剩余票數(shù)%s' % dic['count']) def get(): dic = json.load(open('db')) time.sleep(random.random()) # 模擬讀數(shù)據(jù)的網(wǎng)絡(luò)延遲 if dic['count'] > 0: dic['count'] -= 1 time.sleep(random.random()) # 模擬寫數(shù)據(jù)的網(wǎng)絡(luò)延遲 json.dump(dic, open('db', 'w')) print('購票成功') else: print('購票失敗') def task(lock): search() lock.acquire() get() lock.release() if __name__ == '__main__': lock = Lock() for i in range(10): # 模擬并發(fā)10個客戶端搶票 p = Process(target=task, args=(lock,)) p.start()
雖然可以用文件共享數(shù)據(jù)實(shí)現(xiàn)進(jìn)程間通信,但問題是:
- 效率低(共享數(shù)據(jù)基于文件,而文件是硬盤上的數(shù)據(jù))
- 需要自己加鎖處理
因此我們最好找尋一種解決方案能夠兼顧:
- 效率高(多個進(jìn)程共享一塊內(nèi)存的數(shù)據(jù))
- 幫我們處理好鎖問題。這就是mutiprocessing模塊為我們提供的基于消息的IPC通信機(jī)制:隊列和管道。
隊列和管道都是將數(shù)據(jù)存放于內(nèi)存中,隊列又是基于(管道+鎖)實(shí)現(xiàn)的,可以讓我們從復(fù)雜的鎖問題中解脫出來,我們應(yīng)該盡量避免使用共享數(shù)據(jù),盡可能使用消息傳遞和隊列,避免處理復(fù)雜的同步和鎖問題,而且在進(jìn)程數(shù)目增多時,往往可以獲得更好的可獲展性。
三、進(jìn)程間通信IPC(Inter-Process Communication) (multiprocess.Queue)
1、 概念介紹——隊列multiprocess.Queue
創(chuàng)建共享的進(jìn)程隊列,Queue是多進(jìn)程安全的隊列,可以使用Queue實(shí)現(xiàn)多進(jìn)程之間的數(shù)據(jù)傳遞。
Queue([maxsize])
創(chuàng)建共享的進(jìn)程隊列。
參數(shù) :maxsize是隊列中允許的最大項(xiàng)數(shù)。如果省略此參數(shù),則無大小限制。
底層隊列使用管道和鎖定實(shí)現(xiàn)。另外,還需要運(yùn)行支持線程以便隊列中的數(shù)據(jù)傳輸?shù)降讓庸艿乐小?/p>
2、 方法介紹
q.get( [ block [ ,timeout ] ] )
:返回q中的一個項(xiàng)目。如果q為空,此方法將阻塞,直到隊列中有項(xiàng)目可用為止。block用于控制阻塞行為,默認(rèn)為True. 如果設(shè)置為False,將引發(fā)Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內(nèi)沒有項(xiàng)目變?yōu)榭捎?,將引發(fā)Queue.Empty異常。q.get_nowait()
:同q.get(False)
方法。q.put(item [, block [,timeout ] ] )
:將item放入隊列。如果隊列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,默認(rèn)為True。如果設(shè)置為False,將引發(fā)Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時后將引發(fā)Queue.Full異常。q.qsize()
:返回隊列中目前項(xiàng)目的正確數(shù)量。此函數(shù)的結(jié)果并不可靠,因?yàn)樵诜祷亟Y(jié)果和在稍后程序中使用結(jié)果之間,隊列中可能添加或刪除了項(xiàng)目。在某些系統(tǒng)上,此方法可能引發(fā)NotImplementedError異常。q.empty()
:如果調(diào)用此方法時 q為空,返回True。如果其他進(jìn)程或線程正在往隊列中添加項(xiàng)目,結(jié)果是不可靠的。也就是說,在返回和使用結(jié)果之間,隊列中可能已經(jīng)加入新的項(xiàng)目。q.full()
:如果q已滿,返回為True. 由于線程的存在,結(jié)果也可能是不可靠的(參考q.empty()
方法)。q.close()
:關(guān)閉隊列,防止隊列中加入更多數(shù)據(jù)。調(diào)用此方法時,后臺線程將繼續(xù)寫入那些已入隊列但尚未寫入的數(shù)據(jù),但將在此方法完成時馬上關(guān)閉。如果q被垃圾收集,將自動調(diào)用此方法。關(guān)閉隊列不會在隊列使用者中生成任何類型的數(shù)據(jù)結(jié)束信號或異常。例如,如果某個使用者正被阻塞在get()
操作上,關(guān)閉生產(chǎn)者中的隊列不會導(dǎo)致get()
方法返回錯誤。q.cancel_join_thread()
:不會再進(jìn)程退出時自動連接后臺線程。這可以防止join_thread()
方法阻塞。q.join_thread()
:連接隊列的后臺線程。此方法用于在調(diào)用q.close()
方法后,等待所有隊列項(xiàng)被消耗。默認(rèn)情況下,此方法由不是q的原始創(chuàng)建者的所有進(jìn)程調(diào)用。調(diào)用q.cancel_join_thread()
方法可以禁止這種行為。
3、代碼實(shí)例——multiprocess.Queue
1、 單看隊列用法
這個例子還沒有加入進(jìn)程通信,只是先來看看隊列為我們提供的方法,以及這些方法的使用和現(xiàn)象。
''' multiprocessing模塊支持進(jìn)程間通信的兩種主要形式:管道和隊列 都是基于消息傳遞實(shí)現(xiàn)的,但是隊列接口 ''' from multiprocessing import Queue q = Queue(3) # put ,get ,put_nowait,get_nowait,full,empty q.put(3) q.put(2) q.put(1) # q.put(3) # 如果隊列已經(jīng)滿了,程序就會停在這里,等待數(shù)據(jù)被別人取走,再將數(shù)據(jù)放入隊列。 # 如果隊列中的數(shù)據(jù)一直不被取走,程序就會永遠(yuǎn)停在這里。 try: q.put_nowait(3) # 可以使用put_nowait,如果隊列滿了不會阻塞,但是會因?yàn)殛犃袧M了而報錯。 except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去,但是會丟掉這個消息。 print('隊列已經(jīng)滿了') # 因此,我們再放入數(shù)據(jù)之前,可以先看一下隊列的狀態(tài),如果已經(jīng)滿了,就不繼續(xù)put了。 print(q.full()) # True print(q.get()) # 3 print(q.get()) # 2 print(q.get()) # 1 # print(q.get()) # 同put方法一樣,如果隊列已經(jīng)空了,那么繼續(xù)取就會出現(xiàn)阻塞。 try: q.get_nowait(3) # 可以使用get_nowait,如果隊列滿了不會阻塞,但是會因?yàn)闆]取到值而報錯。 except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去。 print('隊列已經(jīng)空了') print(q.empty()) # True,空了
2、 子進(jìn)程發(fā)送數(shù)據(jù)給父進(jìn)程
一個queue的簡單應(yīng)用,使用隊列q對象調(diào)用get函數(shù)來取得隊列中最先進(jìn)入的數(shù)據(jù)。
import time from multiprocessing import Process, Queue def f(q): q.put([time.asctime(), 'from Eva', 'hello']) # 調(diào)用主函數(shù)中p進(jìn)程傳遞過來的進(jìn)程參數(shù) put函數(shù)為向隊列中添加一條數(shù)據(jù)。 if __name__ == '__main__': q = Queue() # 創(chuàng)建一個Queue對象 p = Process(target=f, args=(q,)) # 創(chuàng)建一個進(jìn)程 p.start() print(q.get()) p.join() # ['Mon Dec 9 18:27:08 2019', 'from Eva', 'hello']
3、 批量生產(chǎn)數(shù)據(jù)放入隊列再批量獲取結(jié)果
import os import time import multiprocessing # 向queue中輸入數(shù)據(jù)的函數(shù) def inputQ(queue): info = str(os.getpid()) + '(put):' + str(time.asctime()) queue.put(info) # 向queue中輸出數(shù)據(jù)的函數(shù) def outputQ(queue): info= queue.get() print('%s%s%s' % (str(os.getpid()), '(get):', info)) # Main if __name__ == '__main__': multiprocessing.freeze_support() record1 = [] # store input processes record2 = [] # store output processes queue = multiprocessing.Queue(3) # 輸入進(jìn)程 for i in range(10): process = multiprocessing.Process(target=inputQ, args=(queue,)) process.start() record1.append(process) # 輸出進(jìn)程 for i in range(10): process = multiprocessing.Process(target=outputQ, args=(queue,)) process.start() record2.append(process) for p in record1: p.join() for p in record2: p.join() # 17568(get):3208(put):Mon Dec 9 18:29:17 2019 # 27620(get):24024(put):Mon Dec 9 18:29:17 2019 # 19780(get):21716(put):Mon Dec 9 18:29:17 2019 # 27576(get):27608(put):Mon Dec 9 18:29:17 2019 # 11304(get):10668(put):Mon Dec 9 18:29:18 2019 # 19732(get):20548(put):Mon Dec 9 18:29:18 2019 # 18120(get):25360(put):Mon Dec 9 18:29:18 2019 # 24752(get):21764(put):Mon Dec 9 18:29:18 2019 # 19848(get):7604(put):Mon Dec 9 18:29:18 2019 # 13888(get):10376(put):Mon Dec 9 18:29:18 2019
4、生產(chǎn)者消費(fèi)者模型
在并發(fā)編程中使用生產(chǎn)者和消費(fèi)者模式能夠解決絕大多數(shù)并發(fā)問題。該模式通過平衡生產(chǎn)線程和消費(fèi)線程的工作能力來提高程序的整體處理數(shù)據(jù)的速度。
1、 為什么要使用生產(chǎn)者和消費(fèi)者模式
在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費(fèi)者就是消費(fèi)數(shù)據(jù)的線程。在多線程開發(fā)當(dāng)中,如果生產(chǎn)者處理速度很快,而消費(fèi)者處理速度很慢,那么生產(chǎn)者就必須等待消費(fèi)者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費(fèi)者的處理能力大于生產(chǎn)者,那么消費(fèi)者就必須等待生產(chǎn)者。為了解決這個問題于是引入了生產(chǎn)者和消費(fèi)者模式。
2、 什么是生產(chǎn)者消費(fèi)者模式
生產(chǎn)者消費(fèi)者模式是通過一個容器來解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問題。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而通過阻塞隊列來進(jìn)行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理,直接扔給阻塞隊列,消費(fèi)者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊列里取,阻塞隊列就相當(dāng)于一個緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的處理能力。
3、 基于隊列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型
import os import random import time from multiprocessing import Process, Queue def producer(q): for i in range(10): time.sleep(random.randint(1, 3)) res = '包子%s' % i q.put(res) print('%s 生產(chǎn)了 %s' % (os.getpid(), res)) def consumer(q): while True: res = q.get() time.sleep(random.randint(1, 3)) print('%s 吃 %s' % (os.getpid(), res)) if __name__ == '__main__': q = Queue() # 生產(chǎn)者們:即廚師們 p1 = Process(target=producer, args=(q,)) # 消費(fèi)者們:即吃貨們 c1 = Process(target=consumer, args=(q,)) # 開始 p1.start() c1.start() print('主')
此時的問題是主進(jìn)程永遠(yuǎn)不會結(jié)束,原因是:生產(chǎn)者p在生產(chǎn)完后就結(jié)束了,但是消費(fèi)者c在取空了q之后,則一直處于死循環(huán)中且卡在q.get()這一步。
解決方式無非是讓生產(chǎn)者在生產(chǎn)完畢后,往隊列中再發(fā)一個結(jié)束信號,這樣消費(fèi)者在接收到結(jié)束信號后就可以break出死循環(huán)。
4、 改良版——生產(chǎn)者消費(fèi)者模型
注意:結(jié)束信號None,不一定要由生產(chǎn)者發(fā),主進(jìn)程里同樣可以發(fā),但主進(jìn)程需要等生產(chǎn)者結(jié)束后才應(yīng)該發(fā)送該信號。
from multiprocessing import Process, Queue import time, random, os def producer(q): for i in range(10): time.sleep(random.randint(1, 3)) res = '包子%s' % i q.put(res) print('%s 生產(chǎn)了 %s' % (os.getpid(), res)) q.put(None)# 發(fā)送結(jié)束信號 def consumer(q): while True: res = q.get() if res is None: break # 收到結(jié)束信號則結(jié)束 time.sleep(random.randint(1, 3)) print('%s 吃 %s' % (os.getpid(), res)) if __name__ == '__main__': q = Queue() # 生產(chǎn)者們:即廚師們 p1 = Process(target=producer, args=(q,)) # 消費(fèi)者們:即吃貨們 c1 = Process(target=consumer, args=(q,)) # 開始 p1.start() c1.start() print('主')
5、 主進(jìn)程在生產(chǎn)者生產(chǎn)完畢后發(fā)送結(jié)束信號None
from multiprocessing import Process, Queue import time, random, os def producer(q): for i in range(2): time.sleep(random.randint(1, 3)) res = '包子%s' % i q.put(res) print('%s 生產(chǎn)了 %s' % (os.getpid(), res)) def consumer(q): while True: res = q.get() if res is None: break # 收到結(jié)束信號則結(jié)束 time.sleep(random.randint(1, 3)) print('%s 吃 %s' % (os.getpid(), res)) if __name__ == '__main__': q = Queue() # 生產(chǎn)者們:即廚師們 p1 = Process(target=producer, args=(q,)) # 消費(fèi)者們:即吃貨們 c1 = Process(target=consumer, args=(q,)) # 開始 p1.start() c1.start() p1.join() q.put(None)# 發(fā)送結(jié)束信號 print('主')
但上述解決方式,在有多個生產(chǎn)者和多個消費(fèi)者時,我們則需要用一個很low的方式去解決
6、 多個消費(fèi)者的例子:有幾個消費(fèi)者就需要發(fā)送幾次結(jié)束信號
from multiprocessing import Process, Queue import time, random, os def producer(name, q): for i in range(2): time.sleep(random.randint(1, 3)) res = '%s%s' % (name, i) q.put(res) print('%s 生產(chǎn)了 %s' % (os.getpid(), res)) def consumer(q): while True: res = q.get() if res is None: break # 收到結(jié)束信號則結(jié)束 time.sleep(random.randint(1, 3)) print('%s 吃 %s' % (os.getpid(), res)) if __name__ == '__main__': q = Queue() # 生產(chǎn)者們:即廚師們 p1 = Process(target=producer, args=('包子', q)) p2 = Process(target=producer, args=('骨頭', q)) p3 = Process(target=producer, args=('泔水', q)) # 消費(fèi)者們:即吃貨們 c1 = Process(target=consumer, args=(q,)) c2 = Process(target=consumer, args=(q,)) # 開始 p1.start() p2.start() p3.start() c1.start() p1.join() # 必須保證生產(chǎn)者全部生產(chǎn)完畢,才應(yīng)該發(fā)送結(jié)束信號 p2.join() p3.join() q.put(None) # 有幾個消費(fèi)者就應(yīng)該發(fā)送幾次結(jié)束信號None q.put(None) # 發(fā)送結(jié)束信號 print('主')
5、JoinableQueue([maxsize])可連接的共享進(jìn)程隊列
創(chuàng)建可連接的共享進(jìn)程隊列。這就像是一個Queue對象,但隊列允許項(xiàng)目的使用者通知生產(chǎn)者項(xiàng)目已經(jīng)被成功處理。通知進(jìn)程是使用共享的信號和條件變量來實(shí)現(xiàn)的。
1、 方法介紹
JoinableQueue的實(shí)例p除了與Queue對象相同的方法之外,還具有以下方法:
q.task_done()
:使用者使用此方法發(fā)出信號,表示q.get()返回的項(xiàng)目已經(jīng)被處理。如果調(diào)用此方法的次數(shù)大于從隊列中刪除的項(xiàng)目數(shù)量,將引發(fā)ValueError異常。q.join()
:生產(chǎn)者將使用此方法進(jìn)行阻塞,直到隊列中所有項(xiàng)目均被處理。阻塞將持續(xù)到為隊列中的每個項(xiàng)目均調(diào)用q.task_done()方法為止。
下面的例子說明如何建立永遠(yuǎn)運(yùn)行的進(jìn)程,使用和處理隊列上的項(xiàng)目。生產(chǎn)者將項(xiàng)目放入隊列,并等待它們被處理。
2、 JoinableQueue隊列實(shí)現(xiàn)消費(fèi)之生產(chǎn)者模型
from multiprocessing import Process, JoinableQueue import time, random, os def producer(name, q): for i in range(10): time.sleep(random.randint(1, 3)) res = '%s%s' % (name, i) q.put(res) print('%s 生產(chǎn)了 %s' % (os.getpid(), res)) q.join() # 生產(chǎn)完畢,使用此方法進(jìn)行阻塞,直到隊列中所有項(xiàng)目均被處理。 def consumer(q): while True: res = q.get() time.sleep(random.randint(1, 3)) print('%s 吃 %s' % (os.getpid(), res)) q.task_done() # 向q.join()發(fā)送一次信號,證明一個數(shù)據(jù)已經(jīng)被取走了 if __name__ == '__main__': q = JoinableQueue() # 生產(chǎn)者們:即廚師們 p1 = Process(target=producer, args=('包子', q)) p2 = Process(target=producer, args=('骨頭', q)) p3 = Process(target=producer, args=('泔水', q)) # 消費(fèi)者們:即吃貨們 c1 = Process(target=consumer, args=(q,)) c2 = Process(target=consumer, args=(q,)) c1.daemon = True c2.daemon = True # 開始 p_l = [p1, p2, p3, c1, c2] for p in p_l: p.start() p1.join() p2.join() p3.join() print('主') # 主進(jìn)程等--->p1,p2,p3等---->c1,c2 # p1,p2,p3結(jié)束了,證明c1,c2肯定全都收完了p1,p2,p3發(fā)到隊列的數(shù)據(jù) # 因而c1,c2也沒有存在的價值了,不需要繼續(xù)阻塞在進(jìn)程中影響主進(jìn)程了。應(yīng)該隨著主進(jìn)程的結(jié)束而結(jié)束,所以設(shè)置成守護(hù)進(jìn)程就可以了。
四、進(jìn)程池(multiprocess.Pool)
1、概念介紹——multiprocess.Pool
Pool([numprocess [,initializer [, initargs]]])
:創(chuàng)建進(jìn)程池
- numprocess:要創(chuàng)建的進(jìn)程數(shù),如果省略,將默認(rèn)使用
cpu_count()
的值 - initializer:是每個工作進(jìn)程啟動時要執(zhí)行的可調(diào)用對象,默認(rèn)為None
- initargs:是要傳給initializer的參數(shù)組
2、主要方法
p.apply(func [, args [, kwargs]])
:在一個池工作進(jìn)程中執(zhí)行func(*args,**kwargs),然后返回結(jié)果。需要強(qiáng)調(diào)的是:此操作并不會在所有池工作進(jìn)程中并執(zhí)行func函數(shù)。如果要通過不同參數(shù)并發(fā)地執(zhí)行func函數(shù),必須從不同線程調(diào)用p.apply()
函數(shù)或者使用p.apply_async()
p.apply_async(func [, args [, kwargs]])
:在一個池工作進(jìn)程中執(zhí)行func(*args,**kwargs),然后返回結(jié)果。此方法的結(jié)果是AsyncResult類的實(shí)例,callback是可調(diào)用對象,接收輸入?yún)?shù)。當(dāng)func的結(jié)果變?yōu)榭捎脮r,將理解傳遞給callback。callback禁止執(zhí)行任何阻塞操作,否則將接收其他異步操作中的結(jié)果。p.close()
:關(guān)閉進(jìn)程池,防止進(jìn)一步操作。如果所有操作持續(xù)掛起,它們將在工作進(jìn)程終止前完成P.join()
:等待所有工作進(jìn)程退出。此方法只能在close()
或teminate()
之后調(diào)用obj.get()
:返回結(jié)果,如果有必要則等待結(jié)果到達(dá)。timeout是可選的。如果在指定時間內(nèi)還沒有到達(dá),將引發(fā)一場。如果遠(yuǎn)程操作中引發(fā)了異常,它將在調(diào)用此方法時再次被引發(fā)。obj.ready()
:如果調(diào)用完成,返回Trueobj.successful()
:如果調(diào)用完成且沒有引發(fā)異常,返回True,如果在結(jié)果就緒之前調(diào)用此方法,引發(fā)異常obj.wait([timeout])
:等待結(jié)果變?yōu)榭捎谩?/li>obj.terminate()
:立即終止所有工作進(jìn)程,同時不執(zhí)行任何清理或結(jié)束任何掛起工作。如果p被垃圾回收,將自動調(diào)用此函數(shù)。
3、代碼實(shí)例——multiprocess.Pool
1、 同步
import os,time from multiprocessing import Pool def work(n): print('%s run' %os.getpid()) time.sleep(3) return n**2 if __name__ == '__main__': p=Pool(3) #進(jìn)程池中從無到有創(chuàng)建三個進(jìn)程,以后一直是這三個進(jìn)程在執(zhí)行任務(wù) res_l=[] for i in range(10): res=p.apply(work,args=(i,)) # 同步調(diào)用,直到本次任務(wù)執(zhí)行完畢拿到res,等待任務(wù)work執(zhí)行的過程中可能有阻塞也可能沒有阻塞 # 但不管該任務(wù)是否存在阻塞,同步調(diào)用都會在原地等著 print(res_l)
2、 異步
import os import time import random from multiprocessing import Pool def work(n): print('%s run' % os.getpid()) time.sleep(random.random()) return n ** 2 if __name__ == '__main__': p = Pool(3) # 進(jìn)程池中從無到有創(chuàng)建三個進(jìn)程,以后一直是這三個進(jìn)程在執(zhí)行任務(wù) res_l = [] for i in range(10): res= p.apply_async(work, args=(i,)) # 異步運(yùn)行,根據(jù)進(jìn)程池中有的進(jìn)程數(shù),每次最多3個子進(jìn)程在異步執(zhí)行 # 返回結(jié)果之后,將結(jié)果放入列表,歸還進(jìn)程,之后再執(zhí)行新的任務(wù) # 需要注意的是,進(jìn)程池中的三個進(jìn)程不會同時開啟或者同時結(jié)束 # 而是執(zhí)行完一個就釋放一個進(jìn)程,這個進(jìn)程就去接收新的任務(wù)。 res_l.append(res) # 異步apply_async用法:如果使用異步提交的任務(wù),主進(jìn)程需要使用join,等待進(jìn)程池內(nèi)任務(wù)都處理完,然后可以用get收集結(jié)果 # 否則,主進(jìn)程結(jié)束,進(jìn)程池可能還沒來得及執(zhí)行,也就跟著一起結(jié)束了 p.close() p.join() for res in res_l: print(res.get() ) # 使用get來獲取apply_aync的結(jié)果,如果是apply,則沒有g(shù)et方法,因?yàn)閍pply是同步執(zhí)行,立刻獲取結(jié)果,也根本無需get
4、進(jìn)程池版socket并發(fā)聊天練習(xí)
1、 server
#Pool內(nèi)的進(jìn)程數(shù)默認(rèn)是cpu核數(shù),假設(shè)為4(查看方法os.cpu_count()) #開啟6個客戶端,會發(fā)現(xiàn)2個客戶端處于等待狀態(tài) #在每個進(jìn)程內(nèi)查看pid,會發(fā)現(xiàn)pid使用為4個,即多個客戶端公用4個進(jìn)程 from socket import * from multiprocessing import Pool import os server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn): print('進(jìn)程pid: %s' %os.getpid()) while True: try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': p=Pool(4) while True: conn,*_=server.accept() p.apply_async(talk,args=(conn,)) # p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間只有一個客戶端能訪問
2、 client
from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))
發(fā)現(xiàn):并發(fā)開啟多個客戶端,服務(wù)端同一時間只有4個不同的pid,只能結(jié)束一個客戶端,另外一個客戶端才會進(jìn)來。
5、回調(diào)函數(shù)
需要回調(diào)函數(shù)的場景:進(jìn)程池中任何一個任務(wù)一旦處理完了,就立即告知主進(jìn)程:我好了額,你可以處理我的結(jié)果了。主進(jìn)程則調(diào)用一個函數(shù)去處理該結(jié)果,該函數(shù)即回調(diào)函數(shù)
我們可以把耗時間(阻塞)的任務(wù)放到進(jìn)程池中,然后指定回調(diào)函數(shù)(主進(jìn)程負(fù)責(zé)執(zhí)行),這樣主進(jìn)程在執(zhí)行回調(diào)函數(shù)時就省去了I/O的過程,直接拿到的是任務(wù)的結(jié)果。
1、 使用多進(jìn)程請求多個url來減少網(wǎng)絡(luò)等待浪費(fèi)的時間
from multiprocessing import Pool import requests import json import os def get_page(url): print('<進(jìn)程%s> get %s' % (os.getpid(), url)) respone = requests.get(url) if respone.status_code == 200: return {'url': url, 'text': respone.text} def pasrse_page(res): print('<進(jìn)程%s> parse %s' % (os.getpid(), res['url'])) parse_res = 'url:<%s> size:[%s]\n' % (res['url'], len(res['text'])) with open('db.txt', 'a') as f: f.write(parse_res) if __name__ == '__main__': urls = [ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] p = Pool(3) res_l = [] for url in urls: res =p.apply_async(get_page, args=(url,), callback=pasrse_page) res_l.append(res) p.close() p.join() print([res.get() for res in res_l]) # 拿到的是get_page的結(jié)果,其實(shí)完全沒必要拿該結(jié)果,該結(jié)果已經(jīng)傳給回調(diào)函數(shù)處理了 ''' 打印結(jié)果: <進(jìn)程3388> get https://www.baidu.com <進(jìn)程3389> get https://www.python.org <進(jìn)程3390> get https://www.openstack.org <進(jìn)程3388> get https://help.github.com/ <進(jìn)程3387> parse https://www.baidu.com <進(jìn)程3389> get http://www.sina.com.cn/ <進(jìn)程3387> parse https://www.python.org <進(jìn)程3387> parse https://help.github.com/ <進(jìn)程3387> parse http://www.sina.com.cn/ <進(jìn)程3387> parse https://www.openstack.org [{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}] '''
2、 爬蟲實(shí)例
import re from urllib.request import urlopen from multiprocessing import Pool def get_page(url,pattern): response=urlopen(url).read().decode('utf-8') return pattern,response def parse_page(info): pattern,page_content=info res=re.findall(pattern,page_content) for item in res: dic={ 'index':item[0].strip(), 'title':item[1].strip(), 'actor':item[2].strip(), 'time':item[3].strip(), } print(dic) if __name__ == '__main__': regex = r'<dd>.*?<.*?class="board-index.*?>(\d+)</i>.*?title="(.*?)".*?class="movie-item-info".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>' pattern1=re.compile(regex,re.S) url_dic={ 'http://maoyan.com/board/7':pattern1, } p=Pool() res_l=[] for url,pattern in url_dic.items(): res=p.apply_async(get_page,args=(url,pattern), callback=parse_page) res_l.append(res) for i in res_l: i.get()
6、無需回調(diào)函數(shù)
如果在主進(jìn)程中等待進(jìn)程池中所有任務(wù)都執(zhí)行完畢后,再統(tǒng)一處理結(jié)果,則無需回調(diào)函數(shù)。
from multiprocessing import Pool import time,random,os def work(n): time.sleep(1) return n**2 if __name__ == '__main__': p=Pool() res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) res_l.append(res) p.close() p.join() #等待進(jìn)程池中所有進(jìn)程執(zhí)行完畢 nums=[] for res in res_l: nums.append(res.get() ) #拿到所有結(jié)果 print(nums) #主進(jìn)程拿到所有的處理結(jié)果,可以在主進(jìn)程中進(jìn)行統(tǒng)一進(jìn)行處理
進(jìn)程池的其他實(shí)現(xiàn)方法:https://docs.python.org/dev/library/concurrent.futures.html
到此這篇關(guān)于Python進(jìn)程操作模塊的文章就介紹到這了。希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
- python?包之?multiprocessing?多進(jìn)程
- python multiprocessing 多進(jìn)程并行計算的操作
- Python進(jìn)程間通信multiprocess代碼實(shí)例
- Python多進(jìn)程編程multiprocessing代碼實(shí)例
- Python進(jìn)程Multiprocessing模塊原理解析
- Python3多進(jìn)程 multiprocessing 模塊實(shí)例詳解
- 簡單學(xué)習(xí)Python多進(jìn)程Multiprocessing
- Python使用multiprocessing創(chuàng)建進(jìn)程的方法
- Python標(biāo)準(zhǔn)庫之多進(jìn)程(multiprocessing包)介紹
相關(guān)文章
python批量實(shí)現(xiàn)Word文件轉(zhuǎn)換為PDF文件
這篇文章主要為大家詳細(xì)介紹了python批量實(shí)現(xiàn)Word文件轉(zhuǎn)換為PDF文件的方法,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-03-03python基礎(chǔ)之變量和數(shù)據(jù)類型
這篇文章主要介紹了python的變量和數(shù)據(jù)類型,實(shí)例分析了Python中返回一個返回值與多個返回值的方法,需要的朋友可以參考下2021-10-10Python+Matplotlib實(shí)現(xiàn)繪制三維折線圖
立體圖視覺上層次分明色彩鮮艷,具有很強(qiáng)的視覺沖擊力,讓觀看的人駐景時間長,留下深刻的印象。今天我們就通過這篇文章來了解如何用python中的matplotlib庫繪制漂亮的三維折線圖吧2023-03-03python smtplib模塊自動收發(fā)郵件功能(一)
這篇文章主要為大家詳細(xì)介紹了python smtplib模塊自動收發(fā)郵件功能,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-05-05python判斷all函數(shù)輸出結(jié)果是否為true的方法
在本篇內(nèi)容里小編給各位整理的是一篇關(guān)于python判斷all函數(shù)輸出結(jié)果是否為true的方法,有需要的朋友們可以學(xué)習(xí)下。2020-12-12Python基于smtplib協(xié)議實(shí)現(xiàn)發(fā)送郵件
這篇文章主要介紹了Python基于smtplib協(xié)議實(shí)現(xiàn)發(fā)送郵件,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-06-06Python數(shù)學(xué)建模PuLP庫線性規(guī)劃實(shí)際案例編程詳解
本節(jié)以一個實(shí)際數(shù)學(xué)建模案例,來為大家講解PuLP求解線性規(guī)劃問題的建模與編程。來鞏固加深大家對Python數(shù)學(xué)建模PuLP庫線性規(guī)劃的運(yùn)用理解2021-10-10