Python 多進(jìn)程原理及實(shí)現(xiàn)
1 進(jìn)程的基本概念
什么是進(jìn)程?
進(jìn)程就是一個(gè)程序在一個(gè)數(shù)據(jù)集上的一次動(dòng)態(tài)執(zhí)行過(guò)程。進(jìn)程一般由程序、數(shù)據(jù)集、進(jìn)程控制塊三部分組成。我們編寫(xiě)的程序用來(lái)描述進(jìn)程要完成哪些功能以及如何完成;數(shù)據(jù)集則是程序在執(zhí)行過(guò)程中所需要使用的資源;進(jìn)程控制塊用來(lái)記錄進(jìn)程的外部特征,描述進(jìn)程的執(zhí)行變化過(guò)程,系統(tǒng)可以利用它來(lái)控制和管理進(jìn)程,它是系統(tǒng)感知進(jìn)程存在的唯一標(biāo)志。
進(jìn)程的生命周期:創(chuàng)建(New)、就緒(Runnable)、運(yùn)行(Running)、阻塞(Block)、銷毀(Destroy)
進(jìn)程的狀態(tài)(分類):(Actived)活動(dòng)進(jìn)程、可見(jiàn)進(jìn)程(Visiable)、后臺(tái)進(jìn)程(Background)、服務(wù)進(jìn)程(Service)、空進(jìn)程
2 父進(jìn)程和子進(jìn)程
Linux 操作系統(tǒng)提供了一個(gè) fork() 函數(shù)用來(lái)創(chuàng)建子進(jìn)程,這個(gè)函數(shù)很特殊,調(diào)用一次,返回兩次,因?yàn)椴僮飨到y(tǒng)是將當(dāng)前的進(jìn)程(父進(jìn)程)復(fù)制了一份(子進(jìn)程),然后分別在父進(jìn)程和子進(jìn)程內(nèi)返回。子進(jìn)程永遠(yuǎn)返回0,而父進(jìn)程返回子進(jìn)程的 PID。我們可以通過(guò)判斷返回值是不是 0 來(lái)判斷當(dāng)前是在父進(jìn)程還是子進(jìn)程中執(zhí)行。
在 Python 中同樣提供了 fork() 函數(shù),此函數(shù)位于 os 模塊下。
# -*- coding: utf-8 -*- import os import time print("在創(chuàng)建子進(jìn)程前: pid=%s, ppid=%s" % (os.getpid(), os.getppid())) pid = os.fork() if pid == 0: print("子進(jìn)程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid())) time.sleep(5) else: print("父進(jìn)程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid())) # pid表示回收的子進(jìn)程的pid #pid, result = os.wait() # 回收子進(jìn)程資源 阻塞 time.sleep(5) #print("父進(jìn)程:回收的子進(jìn)程pid=%d" % pid) #print("父進(jìn)程:子進(jìn)程退出時(shí) result=%d" % result) # 下面的內(nèi)容會(huì)被打印兩次,一次是在父進(jìn)程中,一次是在子進(jìn)程中。 # 父進(jìn)程中拿到的返回值是創(chuàng)建的子進(jìn)程的pid,大于0 print("fork創(chuàng)建完后: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
2.1 父子進(jìn)程如何區(qū)分?
子進(jìn)程是父進(jìn)程通過(guò)fork()產(chǎn)生出來(lái)的,pid = os.fork()
通過(guò)返回值pid是否為0,判斷是否為子進(jìn)程,如果是0,則表示是子進(jìn)程
由于 fork() 是 Linux 上的概念,所以如果要跨平臺(tái),最好還是使用 subprocess 模塊來(lái)創(chuàng)建子進(jìn)程。
2.2 子進(jìn)程如何回收?
python中采用os.wait()方法用來(lái)回收子進(jìn)程占用的資源
pid, result = os.wait() # 回收子進(jìn)程資源 阻塞,等待子進(jìn)程執(zhí)行完成回收
如果有子進(jìn)程沒(méi)有被回收的,但是父進(jìn)程已經(jīng)死掉了,這個(gè)子進(jìn)程就是僵尸進(jìn)程。
3 Python進(jìn)程模塊
python的進(jìn)程multiprocessing模塊有多種創(chuàng)建進(jìn)程的方式,每種創(chuàng)建方式和進(jìn)程資源的回收都不太相同,下面分別針對(duì)Process,Pool及系統(tǒng)自帶的fork三種進(jìn)程分析。
3.1 fork()
import os pid = os.fork() # 創(chuàng)建一個(gè)子進(jìn)程 os.wait() # 等待子進(jìn)程結(jié)束釋放資源 pid為0的代表子進(jìn)程。
缺點(diǎn):
1.兼容性差,只能在類linux系統(tǒng)下使用,windows系統(tǒng)不可使用;
2.擴(kuò)展性差,當(dāng)需要多條進(jìn)程的時(shí)候,進(jìn)程管理變得很復(fù)雜;
3.會(huì)產(chǎn)生“孤兒”進(jìn)程和“僵尸”進(jìn)程,需要手動(dòng)回收資源。
優(yōu)點(diǎn):
是系統(tǒng)自帶的接近低層的創(chuàng)建方式,運(yùn)行效率高。
3.2 Process進(jìn)程
multiprocessing模塊提供Process類實(shí)現(xiàn)新建進(jìn)程
# -*- coding: utf-8 -*- import os from multiprocessing import Process import time def fun(name): print("2 子進(jìn)程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid())) print("hello " + name) def test(): print('ssss') if __name__ == "__main__": print("1 主進(jìn)程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid())) ps = Process(target=fun, args=('jingsanpang', )) print("111 ##### ps pid: " + str(ps.pid) + ", ident:" + str(ps.ident)) print("3 進(jìn)程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid())) print(ps.is_alive()) # 啟動(dòng)之前 is_alive為False(系統(tǒng)未創(chuàng)建) ps.start() print(ps.is_alive()) # 啟動(dòng)之后,is_alive為T(mén)rue(系統(tǒng)已創(chuàng)建) print("222 #### ps pid: " + str(ps.pid) + ", ident:" + str(ps.ident)) print("4 進(jìn)程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid())) ps.join() # 等待子進(jìn)程完成任務(wù) 類似于os.wait() print(ps.is_alive()) print("5 進(jìn)程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid())) ps.terminate() #終斷進(jìn)程 print("6 進(jìn)程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
特點(diǎn):
1.注意:Process對(duì)象可以創(chuàng)建進(jìn)程,但Process對(duì)象不是進(jìn)程,其刪除與否與系統(tǒng)資源是否被回收沒(méi)有直接的關(guān)系。
2.主進(jìn)程執(zhí)行完后會(huì)默認(rèn)等待子進(jìn)程結(jié)束后回收資源,不需要手動(dòng)回收資源;join()函數(shù)用來(lái)控制子進(jìn)程結(jié)束的順序,其內(nèi)部也有一個(gè)清除僵尸進(jìn)程的函數(shù),可以回收資源;
3.Process進(jìn)程創(chuàng)建時(shí),子進(jìn)程會(huì)將主進(jìn)程的Process對(duì)象完全復(fù)制一份,這樣在主進(jìn)程和子進(jìn)程各有一個(gè) Process對(duì)象,但是p.start()啟動(dòng)的是子進(jìn)程,主進(jìn)程中的Process對(duì)象作為一個(gè)靜態(tài)對(duì)象存在,不執(zhí)行。
4.當(dāng)子進(jìn)程執(zhí)行完畢后,會(huì)產(chǎn)生一個(gè)僵尸進(jìn)程,其會(huì)被join函數(shù)回收,或者再有一條進(jìn)程開(kāi)啟,start函數(shù)也會(huì)回收僵尸進(jìn)程,所以不一定需要寫(xiě)join函數(shù)。
5.windows系統(tǒng)在子進(jìn)程結(jié)束后會(huì)立即自動(dòng)清除子進(jìn)程的Process對(duì)象,而linux系統(tǒng)子進(jìn)程的Process對(duì)象如果沒(méi)有join函數(shù)和start函數(shù)的話會(huì)在主進(jìn)程結(jié)束后統(tǒng)一清除。
另外還可以通過(guò)繼承Process對(duì)象來(lái)重寫(xiě)run方法創(chuàng)建進(jìn)程
3.3 進(jìn)程池POOL (多個(gè)進(jìn)程)
import multiprocessing import time def work(msg): mult_proces_name = multiprocessing.current_process().name print('process: ' + mult_proces_name + '-' + msg) if __name__ == "__main__": pool = multiprocessing.Pool(processes=5) # 創(chuàng)建5個(gè)進(jìn)程 for i in range(20): msg = "process %d" %(i) pool.apply_async(work, (msg, )) pool.close() # 關(guān)閉進(jìn)程池,表示不能在往進(jìn)程池中添加進(jìn)程 pool.join() # 等待進(jìn)程池中的所有進(jìn)程執(zhí)行完畢,必須在close()之后調(diào)用 print("Sub-process all done.")
上述代碼中的pool.apply_async()是apply()函數(shù)的變體,apply_async()是apply()的并行版本,apply()是apply_async()的阻塞版本,使用apply()主進(jìn)程會(huì)被阻塞直到函數(shù)執(zhí)行結(jié)束,所以說(shuō)是阻塞版本。apply()既是Pool的方法,也是Python內(nèi)置的函數(shù),兩者等價(jià)??梢钥吹捷敵鼋Y(jié)果并不是按照代碼for循環(huán)中的順序輸出的。
多個(gè)子進(jìn)程并返回值
apply_async()本身就可以返回被進(jìn)程調(diào)用的函數(shù)的返回值。上一個(gè)創(chuàng)建多個(gè)子進(jìn)程的代碼中,如果在函數(shù)func中返回一個(gè)值,那么pool.apply_async(func, (msg, ))的結(jié)果就是返回pool中所有進(jìn)程的值的對(duì)象(注意是對(duì)象,不是值本身)。
import multiprocessing import time def func(msg): return multiprocessing.current_process().name + '-' + msg if __name__ == "__main__": pool = multiprocessing.Pool(processes=4) # 創(chuàng)建4個(gè)進(jìn)程 results = [] for i in range(20): msg = "process %d" %(i) results.append(pool.apply_async(func, (msg, ))) pool.close() # 關(guān)閉進(jìn)程池,表示不能再往進(jìn)程池中添加進(jìn)程,需要在join之前調(diào)用 pool.join() # 等待進(jìn)程池中的所有進(jìn)程執(zhí)行完畢 print ("Sub-process(es) done.") for res in results: print (res.get())
與之前的輸出不同,這次的輸出是有序的。
如果電腦是八核,建立8個(gè)進(jìn)程,在Ubuntu下輸入top命令再按下大鍵盤(pán)的1,可以看到每個(gè)CPU的使用率是比較平均的
4 進(jìn)程間通信方式
管道pipe:管道是一種半雙工的通信方式,數(shù)據(jù)只能單向流動(dòng),而且只能在具有親緣關(guān)系的進(jìn)程間使用。進(jìn)程的親緣關(guān)系通常是指父子進(jìn)程關(guān)系。
命名管道FIFO:有名管道也是半雙工的通信方式,但是它允許無(wú)親緣關(guān)系進(jìn)程間的通信。
消息隊(duì)列MessageQueue:消息隊(duì)列是由消息的鏈表,存放在內(nèi)核中并由消息隊(duì)列標(biāo)識(shí)符標(biāo)識(shí)。消息隊(duì)列克服了信號(hào)傳遞信息少、管道只能承載無(wú)格式字節(jié)流以及緩沖區(qū)大小受限等缺點(diǎn)。
共享存儲(chǔ)SharedMemory:共享內(nèi)存就是映射一段能被其他進(jìn)程所訪問(wèn)的內(nèi)存,這段共享內(nèi)存由一個(gè)進(jìn)程創(chuàng)建,但多個(gè)進(jìn)程都可以訪問(wèn)。共享內(nèi)存是最快的 IPC 方式,它是針對(duì)其他進(jìn)程間通信方式運(yùn)行效率低而專門(mén)設(shè)計(jì)的。它往往與其他通信機(jī)制,如信號(hào)兩,配合使用,來(lái)實(shí)現(xiàn)進(jìn)程間的同步和通信。
以上幾種進(jìn)程間通信方式中,消息隊(duì)列是使用的比較頻繁的方式。
(1)管道pipe
import multiprocessing def foo(conn): conn.send('hello father') #向管道pipe發(fā)消息 print(conn.recv()) if __name__ == '__main__': conn1,conn2=multiprocessing.Pipe(True) #開(kāi)辟兩個(gè)口,都是能進(jìn)能出,括號(hào)中如果False即單向通信 p=multiprocessing.Process(target=foo,args=(conn1,)) #子進(jìn)程使用sock口,調(diào)用foo函數(shù) p.start() print(conn2.recv()) #主進(jìn)程使用conn口接收,從管道(Pipe)中讀取消息 conn2.send('hi son') #主進(jìn)程使用conn口發(fā)送
(2)消息隊(duì)列Queue
Queue是多進(jìn)程的安全隊(duì)列,可以使用Queue實(shí)現(xiàn)多進(jìn)程之間的數(shù)據(jù)傳遞。
Queue的一些常用方法:
- Queue.qsize():返回當(dāng)前隊(duì)列包含的消息數(shù)量;
- Queue.empty():如果隊(duì)列為空,返回True,反之False ;
- Queue.full():如果隊(duì)列滿了,返回True,反之False;
- Queue.get():獲取隊(duì)列中的一條消息,然后將其從列隊(duì)中移除,可傳參超時(shí)時(shí)長(zhǎng)。
- Queue.get_nowait():相當(dāng)Queue.get(False),取不到值時(shí)觸發(fā)異常:Empty;
- Queue.put():將一個(gè)值添加進(jìn)數(shù)列,可傳參超時(shí)時(shí)長(zhǎng)。
- Queue.put_nowait():相當(dāng)于Queue.get(False),當(dāng)隊(duì)列滿了時(shí)報(bào)錯(cuò):Full。
案例:
from multiprocessing import Process, Queue import time def write(q): for i in ['A', 'B', 'C', 'D', 'E']: print('Put %s to queue' % i) q.put(i) time.sleep(0.5) def read(q): while True: v = q.get(True) print('get %s from queue' % v) if __name__ == '__main__': q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) print('write process = ', pw) print('read process = ', pr) pw.start() pr.start() pw.join() pr.join() pr.terminate() pw.terminate()
Queue和pipe只是實(shí)現(xiàn)了數(shù)據(jù)交互,并沒(méi)實(shí)現(xiàn)數(shù)據(jù)共享,即一個(gè)進(jìn)程去更改另一個(gè)進(jìn)程的數(shù)據(jù)。
注:進(jìn)程間通信應(yīng)該盡量避免使用共享數(shù)據(jù)的方式
5 多進(jìn)程實(shí)現(xiàn)生產(chǎn)者消費(fèi)者
以下通過(guò)多進(jìn)程實(shí)現(xiàn)生產(chǎn)者,消費(fèi)者模式
import multiprocessing from multiprocessing import Process from time import sleep import time class MultiProcessProducer(multiprocessing.Process): def __init__(self, num, queue): """Constructor""" multiprocessing.Process.__init__(self) self.num = num self.queue = queue def run(self): t1 = time.time() print('producer start ' + str(self.num)) for i in range(1000): self.queue.put((i, self.num)) # print 'producer put', i, self.num t2 = time.time() print('producer exit ' + str(self.num)) use_time = str(t2 - t1) print('producer ' + str(self.num) + ', use_time: '+ use_time) class MultiProcessConsumer(multiprocessing.Process): def __init__(self, num, queue): """Constructor""" multiprocessing.Process.__init__(self) self.num = num self.queue = queue def run(self): t1 = time.time() print('consumer start ' + str(self.num)) while True: d = self.queue.get() if d != None: # print 'consumer get', d, self.num continue else: break t2 = time.time() print('consumer exit ' + str(self.num)) print('consumer ' + str(self.num) + ', use time:' + str(t2 - t1)) def main(): # create queue queue = multiprocessing.Queue() # create processes producer = [] for i in range(5): producer.append(MultiProcessProducer(i, queue)) consumer = [] for i in range(5): consumer.append(MultiProcessConsumer(i, queue)) # start processes for i in range(len(producer)): producer[i].start() for i in range(len(consumer)): consumer[i].start() # wait for processs to exit for i in range(len(producer)): producer[i].join() for i in range(len(consumer)): queue.put(None) for i in range(len(consumer)): consumer[i].join() print('all done finish') if __name__ == "__main__": main()
6 總結(jié)
python中的多進(jìn)程創(chuàng)建有以下兩種方式:
(1)fork子進(jìn)程
(2)采用 multiprocessing 這個(gè)庫(kù)創(chuàng)建子進(jìn)程
需要注意的是隊(duì)列中queue.Queue是線程安全的,但并不是進(jìn)程安全,所以多進(jìn)程一般使用線程、進(jìn)程安全的multiprocessing.Queue()
另外, 進(jìn)程池使用 multiprocessing.Pool實(shí)現(xiàn),pool = multiprocessing.Pool(processes = 3),產(chǎn)生一個(gè)進(jìn)程池,pool.apply_async實(shí)現(xiàn)非租塞模式,pool.apply實(shí)現(xiàn)阻塞模式。
apply_async和 apply函數(shù),前者是非阻塞的,后者是阻塞??梢钥闯鲞\(yùn)行時(shí)間相差的倍數(shù)正是進(jìn)程池?cái)?shù)量。
同時(shí)可以通過(guò)result.append(pool.apply_async(func, (msg, )))獲取非租塞式調(diào)用結(jié)果信息的。
以上就是Python 多進(jìn)程原理及實(shí)現(xiàn)的詳細(xì)內(nèi)容,更多關(guān)于python 多進(jìn)程的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Python實(shí)現(xiàn)的企業(yè)粉絲抽獎(jiǎng)功能示例
這篇文章主要介紹了Python實(shí)現(xiàn)的企業(yè)粉絲抽獎(jiǎng)功能,涉及Python數(shù)值運(yùn)算與隨機(jī)數(shù)生成相關(guān)操作技巧,需要的朋友可以參考下2019-07-07通過(guò)實(shí)例解析Python文件操作實(shí)現(xiàn)步驟
這篇文章主要介紹了通過(guò)實(shí)例解析Python文件操作實(shí)現(xiàn)步驟,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-09-09Python數(shù)據(jù)分析之獲取雙色球歷史信息的方法示例
這篇文章主要介紹了Python數(shù)據(jù)分析之獲取雙色球歷史信息的方法,涉及Python網(wǎng)頁(yè)抓取、正則匹配、文件讀寫(xiě)及數(shù)值運(yùn)算等相關(guān)操作技巧,需要的朋友可以參考下2018-02-02python 爬取馬蜂窩景點(diǎn)翻頁(yè)文字評(píng)論的實(shí)現(xiàn)
這篇文章主要介紹了python 爬取馬蜂窩景點(diǎn)翻頁(yè)文字評(píng)論的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-01-01聊聊python中令人迷惑的duplicated和drop_duplicates()用法
這篇文章主要介紹了聊聊python中令人迷惑的duplicated和drop_duplicates()用法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-05-05Pycharm創(chuàng)建項(xiàng)目時(shí)如何自動(dòng)添加頭部信息
這篇文章主要介紹了Pycharm創(chuàng)建項(xiàng)目時(shí) 自動(dòng)添加頭部信息,需要的朋友可以參考下2019-11-11