python Multiprocessing.Pool進程池模塊詳解
前言
Multiprocessing.Pool可以提供指定數(shù)量的進程供用戶調(diào)用,當(dāng)有新的請求提交到pool中時,如果池還沒有滿,那么就會創(chuàng)建一個新的進程用來執(zhí)行該請求;
但如果池中的進程數(shù)已經(jīng)達到規(guī)定最大值,那么該請求就會等待,直到池中有進程結(jié)束,才會創(chuàng)建新的進程來執(zhí)行它。
Pool類用于需要執(zhí)行的目標(biāo)很多,而手動限制進程數(shù)量又太繁瑣時,如果目標(biāo)少且不用控制進程數(shù)量則可以用Process類。
class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
- processes: 是要使用的工作進程數(shù)。如果進程是None,那么使用返回的數(shù)字os.cpu_count()。也就是說根據(jù)本地的cpu個數(shù)決定,processes小于等于本地的cpu個數(shù);
- initializer: 如果initializer是None,那么每一個工作進程在開始的時候會調(diào)用initializer(*initargs)。
- maxtasksperchild:工作進程退出之前可以完成的任務(wù)數(shù),完成后用一個新的工作進程來替代原進程,來讓閑置的資源被釋放。maxtasksperchild默認(rèn)是None,意味著只要Pool存在工作進程就會一直存活。
- context: 用在制定工作進程啟動時的上下文,一般使用 multiprocessing.Pool() 或者一個context對象的Pool()方法來創(chuàng)建一個池,兩種方法都適當(dāng)?shù)脑O(shè)置了context。
實例方法
(1)apply(func [,args [,kwds ] ] )
使用參數(shù)args和關(guān)鍵字參數(shù)kwds調(diào)用func。它會阻塞,直到結(jié)果準(zhǔn)備就緒。鑒于此塊,更適合并行執(zhí)行工作。此外,func 僅在池中的一個工作程序中執(zhí)行。
from multiprocessing import Pool import time def test(p): print(p) time.sleep(3) if __name__=="__main__": pool = Pool(processes=10) for i in range(500): ''' ('\n' ' (1)遍歷500個可迭代對象,往進程池放一個子進程\n' ' (2)執(zhí)行這個子進程,等子進程執(zhí)行完畢,再往進程池放一個子進程,再執(zhí)行。(同時只執(zhí)行一個子進程)\n' ' for循環(huán)執(zhí)行完畢,再執(zhí)行print函數(shù)。\n' ' ') ''' pool.apply(test, args=(i,)) #維持執(zhí)行的進程總數(shù)為10,當(dāng)一個進程執(zhí)行完后啟動一個新進程. print('test') pool.close() pool.join() ''' 1 2 3 4 5 6 7 8 Process finished with exit code -1 '''
for循環(huán)內(nèi)執(zhí)行的步驟順序,往進程池中添加一個子進程,執(zhí)行子進程,等待執(zhí)行完畢再添加一個子進程……等500個子進程都執(zhí)行完了,再執(zhí)行print。(從結(jié)果來看,并沒有多進程并發(fā))
(2)apply_async(func [,args [,kwds [,callback [,error_callback ] ] ] ] )
異步進程池(非阻塞),返回結(jié)果對象的方法的變體。如果指定了回調(diào),則它應(yīng)該是可調(diào)用的,它接受單個參數(shù)。當(dāng)結(jié)果變?yōu)榫途w時,將對其應(yīng)用回調(diào),即除非調(diào)用失敗,在這種情況下將應(yīng)用error_callback。如果指定了error_callback,那么它應(yīng)該是一個可調(diào)用的,它接受一個參數(shù)。如果目標(biāo)函數(shù)失敗,則使用異常實例調(diào)用error_callback。回調(diào)應(yīng)立即完成,否則處理結(jié)果的線程將被阻止。
from multiprocessing import Pool import time def test(p): print(p) time.sleep(3) if __name__=="__main__": pool = Pool(processes=2) for i in range(500): ''' (1)循環(huán)遍歷,將500個子進程添加到進程池(相對父進程會阻塞)\n' (2)每次執(zhí)行2個子進程,等一個子進程執(zhí)行完后,立馬啟動新的子進程。(相對父進程不阻塞)\n' ''' pool.apply_async(test, args=(i,)) #維持執(zhí)行的進程總數(shù)為10,當(dāng)一個進程執(zhí)行完后啟動一個新進程. print('test') pool.close() pool.join() ''' test 0 1 2 3 4 5 6 7 Process finished with exit code -1 '''
調(diào)用join之前,先調(diào)用close或者terminate方法,否則會出錯。執(zhí)行完close后不會有新的進程加入到pool,join函數(shù)等待所有子進程結(jié)束。
(3)map(func,iterable [,chunksize ] )
map()內(nèi)置函數(shù)的并行等價物(盡管它只支持一個可迭代的參數(shù))。它會阻塞,直到結(jié)果準(zhǔn)備就緒。此方法將iterable內(nèi)的每一個對象作為單獨的任務(wù)提交給進程池??梢酝ㄟ^將chunksize設(shè)置為正整數(shù)來指定這些塊的(近似)大小。
from multiprocessing import Pool def test(i): print(i) if __name__ == "__main__": lists = [1, 2, 3] pool = Pool(processes=2) #定義最大的進程數(shù) pool.map(test, lists) #p必須是一個可迭代變量。 pool.close() pool.join() ''' 1 2 3 '''
(4)map_async(func,iterable [,chunksize [,callback [,error_callback ] ] ] )
map()返回結(jié)果對象的方法的變體。需要傳入可迭代對象iterable
from multiprocessing import Pool import time def test(p): print(p) time.sleep(3) if __name__=="__main__": pool = Pool(processes=2) # for i in range(500): # ''' # (1)循環(huán)遍歷,將500個子進程添加到進程池(相對父進程會阻塞)\n' # (2)每次執(zhí)行2個子進程,等一個子進程執(zhí)行完后,立馬啟動新的子進程。(相對父進程不阻塞)\n' # ''' # pool.apply_async(test, args=(i,)) #維持執(zhí)行的進程總數(shù)為10,當(dāng)一個進程執(zhí)行完后啟動一個新進程. pool.map_async(test, range(500)) print('test') pool.close() pool.join() ''' test 0 63 1 64 2 65 3 66 Process finished with exit code -1 '''
(5)imap(func,iterable [,chunksize ] )
返回迭代器,next()調(diào)用返回的迭代器的方法得到結(jié)果,imap()方法有一個可選的超時參數(shù): next(timeout)將提高multiprocessing.TimeoutError如果結(jié)果不能內(nèi)退回超時秒。
(6)close()
防止任何更多的任務(wù)被提交到池中。 一旦完成所有任務(wù),工作進程將退出。
(7)terminate()
立即停止工作進程而不完成未完成的工作。當(dāng)池對象被垃圾收集時,terminate()將立即調(diào)用。
(8)join()
等待工作進程退出。必須打電話close()或 terminate()使用之前join()。
from multiprocessing import Pool import time def f(x): return x*x if __name__ == '__main__': with Pool(processes=4) as pool: # start 4 worker processes result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]" it = pool.imap(f, range(10)) print(next(it)) # prints "0" print(next(it)) # prints "1" print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow result = pool.apply_async(time.sleep, (10,)) print(result.get(timeout=1)) # raises multiprocessing.TimeoutError ''' 100 [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] 0 1 4 Traceback (most recent call last): File "C:/Users/BruceWong/Desktop/develop/multiprocessingpool.py", line 19, in <module> print(next(res)) TypeError: 'MapResult' object is not an iterator Process finished with exit code 1
到此這篇關(guān)于python Multiprocessing.Pool進程池模塊詳解的文章就介紹到這了,更多相關(guān)python Multiprocessing.Pool內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
如何打包Python Web項目實現(xiàn)免安裝一鍵啟動的方法
這篇文章主要介紹了如何打包Python Web項目,實現(xiàn)免安裝一鍵啟動,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-05-05python定時任務(wù)schedule庫用法詳細(xì)講解
python中有一個輕量級的定時任務(wù)調(diào)度的庫schedule,下面這篇文章主要給大家介紹了關(guān)于python定時任務(wù)schedule庫用法的相關(guān)資料,文中通過實例代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-01-01分析python并發(fā)網(wǎng)絡(luò)通信模型
隨著互聯(lián)網(wǎng)和物聯(lián)網(wǎng)的高速發(fā)展,使用網(wǎng)絡(luò)的人數(shù)和電子設(shè)備的數(shù)量急劇增長,其也對互聯(lián)網(wǎng)后臺服務(wù)程序提出了更高的性能和并發(fā)要求。本文主要分析比較了一些模型的優(yōu)缺點,并且用python來實現(xiàn)2021-06-06