Python并行庫joblib之delayed函數(shù)與Parallel函數(shù)詳解
Joblib
Joblib就是一個可以簡單地將Python代碼轉(zhuǎn)換為并行計算模式的軟件包,它可非常簡單并行我們的程序,從而提高計算速度。
主要提供了以下功能
- 程序并行
- 用于在每次使用相同的輸入?yún)?shù)調(diào)用函數(shù)時將其返回值緩存
- 數(shù)據(jù)存儲(包括不可哈希的數(shù)據(jù)和大規(guī)模numpy數(shù)組)
程序并行
joblib提供了一個簡單地程序并行方案,主要有Parallel函數(shù)實現(xiàn),并涉及了一個技巧性的函數(shù)delayed。
delayed函數(shù)
以下為delayed函數(shù)的源碼
def delayed(function): """Decorator used to capture the arguments of a function.""" def delayed_function(*args, **kwargs): return function, args, kwargs try: delayed_function = functools.wraps(function)(delayed_function) except AttributeError: " functools.wraps fails on some callable objects " return delayed_function
*functools.wraps 旨在消除裝飾器對原函數(shù)造成的影響,即對原函數(shù)的相關(guān)屬性進行拷貝,已達到裝飾器不修改原函數(shù)的目的。
從功能上來說,可以認為被wrap修飾后的函數(shù)與原函數(shù)功能完全相同,暫時忽略不計
delayed函數(shù)顧名思義就是延遲函數(shù)的執(zhí)行。
根據(jù)源碼來看,delayed函數(shù)保留被修飾的函數(shù)function和參數(shù)*args, **kwargs,在碰到調(diào)用時,并不直接執(zhí)行函數(shù)function(*args, **kwargs),而是返回返回元組(function,args,kwargs)。
返回的這個結(jié)果留待其他函數(shù)執(zhí)行,在joblib里具體是與Parallel配合的。
下面我們通過具體例子看一下delayed函數(shù)如何工作的
import functools def delayed(function): """Decorator used to capture the arguments of a function.""" def delayed_function(*args, **kwargs): return function, args, kwargs try: delayed_function = functools.wraps(function)(delayed_function) except AttributeError: " functools.wraps fails on some callable objects " return delayed_function def f(x,y): return x+y res = delayed(f)(1,y=3) print(res)
執(zhí)行結(jié)果為:
(<function f at 0x00000000081C29D8>, (1,), {'y': 3})
返回了原始的函數(shù)f和調(diào)用它是的兩個參數(shù)。
上面也說過delayed函數(shù)其實是一個修飾器,因此上面的代碼與下面的寫法等價
@delayed def f(x,y): return x+y res = f(1,y=3) print(res)
*delayed之后并未得到函數(shù)的執(zhí)行結(jié)果,我們?nèi)绻氲玫筋A(yù)期的執(zhí)行結(jié)果應(yīng)該怎么做呢?其實delayed函數(shù)主要是與其他函數(shù)配合的,我們可以再寫一個程序進行計算:
def f(x,y): return x+y res = delayed(f)(1,y=2) print(res) #out: (<function f at 0x00000000081C2510>, (1,), {'y': 2}) foo,args,kwargs = res final_res = foo(*args,**kwargs) print(final_res) #out: 3
我們先解包了delayed之后的結(jié)果res,得到了函數(shù)f,和參數(shù)args,kwargs,然后調(diào)用foo(*args,**kwargs)得到最終得到期待的計算結(jié)果的final_res。
開始可能很難理解我們?yōu)槭裁匆舆t一個函數(shù)的調(diào)用呢,調(diào)用函數(shù)不就是為了得到執(zhí)行結(jié)果嗎,這樣延遲之后不就無法達到我們的預(yù)期了?
先別急,看看下面這個例子應(yīng)該就明白了
假設(shè)我們需要對x_list = [1,2,3], y_list = [ -1,-2,-3] 這兩個列表進行逐個元素相加。
可能會覺得這不是很容易實現(xiàn)嗎,依次遍歷兩個集合的元素,在循環(huán)過程中調(diào)用函數(shù)f進行元素相加就好了,可能寫出的代碼如下:
def f(x,y): return x+y x_list = [1,2,3] y_list = [-1,-2,-3] res = [] for x,y in zip(x_list,y_list): res.append(f(x,y)) print(res)
這樣線性的多次調(diào)用函數(shù)f當(dāng)然沒問題,但是如果我們并行的執(zhí)行f的三次調(diào)用,也就是同時執(zhí)行f(1,-1),f(2,-2),f(3,-3)時代碼該怎么寫呢?我們希望使用三個線程/進程,每個進程分別執(zhí)行一個函數(shù)調(diào)用。這時就可以看出delayed函數(shù)的用法了。
@delayed def f(x,y): return x+y x_list = [1,2,3] y_list = [-1,-2,-3] res = [] for x,y in zip(x_list,y_list): res.append(f(x,y)) print(res)
執(zhí)行之后的結(jié)果為:
[(<function f at 0x00000000081C2950>, (1, -1), {}),
(<function f at 0x00000000081C2950>, (2, -2), {}),
(<function f at 0x00000000081C2950>, (3, -3), {})]
我們獲得了由(函數(shù),args,kwargs)這樣的元組組成的列表,我們可以為列表中的每一個元組(f,args,kwargs)分配給不同的線程,在每個線程里面執(zhí)行一個f(args,kwargs)。這樣是不是就可以完成并行的目的了呢?如何將元組分配給不同的線程由函數(shù)Parallel實現(xiàn),后面再講。我們現(xiàn)在只需要知道的delayed函數(shù)只是為了生成(函數(shù),args,kwargs)這樣的元組,暫緩函數(shù)的執(zhí)行,方便將各個計算過程分配給不同的線程。
tips:一般我們只想得到一個這樣的列表,并不想改變原始函數(shù)f,因此上面的修飾器寫法一般等價的寫為:
def f(x,y): return x+y x_list = [1,2,3] y_list = [-1,-2,-3] res = [delayed(f)(x,y) for x,y in zip(x_list,y_list)] print(res)
Parallel函數(shù)
看名字就知道Parallel主要的功能是實現(xiàn)程序并行。
因此在講Parallel之前,先看一下并行的實際處理流程:
Parallel實際上就是封裝了這個任務(wù)拆分,并行和結(jié)果合并的這個過程。其主要的功能是線程和進程的創(chuàng)建,和多任務(wù)執(zhí)行這個流程。
由于其源碼過于復(fù)雜,這里只看和調(diào)用相關(guān)的兩個部分:
(1)、初始化部分
Parallel的初始化主要是與程序并行的配置相關(guān),其函數(shù)定義為
class joblib.parallel(n_jobs=None, backend=None, verbose=0, timeout=None, pre_dispatch='2 * n_jobs', batch_size='auto',temp_folder=None, max_nbytes='1M', mmap_mode='r', prefer=None, require=None)
參數(shù)解釋:(參考:官方文檔)
- n_jobs: int, default: None —— 設(shè)置并行執(zhí)行任務(wù)的最大數(shù)量
當(dāng)backend="multiprocessing"時指python工作進程的數(shù)量,或者backend="threading"時指線程池大小。
當(dāng)n_jobs=-1時,使用所有的CPU執(zhí)行并行計算;
當(dāng)n_jobs=1時,就不會使用并行代碼,即等同于順序執(zhí)行,可以在debug情況下使用;
當(dāng)n_jobs<-1時,將會使用(n_cpus + 1 + n_jobs)個CPU,例如n_jobs=-2時,將會使用n_cpus-1個CPU核,其中n_cpus為CPU核的數(shù)量;
當(dāng)n_jobs=None的情況等同于n_jobs=1。
- backend: str, default: 'loky' —— 指定并行化后端的實現(xiàn)方法
backend='loky': 在與Python進程交換輸入和輸出數(shù)據(jù)時,可導(dǎo)致一些通信和內(nèi)存開銷。
backend='multiprocessing': 基于multiprocessing.Pool的后端,魯棒性不如loky。
backend='threading': threading是一個開銷非常低的backend。但是如果被調(diào)用的函數(shù)大量依賴于Python對象,它就會受到Python全局解釋器(GIL)鎖的影響。當(dāng)執(zhí)行瓶頸是顯式釋放GIL的已編譯擴展時,
“threading”非常有用(例如,封裝在“with nogil”塊中的Cython循環(huán),或者對庫(如NumPy)的大量調(diào)用)。
- verbose: int, 可選項 —— 執(zhí)行期間的信息顯示(ps:這個參數(shù)實際上有點類似log的日志級別,數(shù)字越大,顯示的程序執(zhí)行過程越詳細)
信息級別:如果非零,則打印進度消息。超過50,輸出被發(fā)送到stdout。消息的頻率隨著信息級別的增加而增加。如果大于10,則報告所有迭代。
- timeout: float, 可選項 —— 任務(wù)運行時間限制
timeout僅用在n_jobs != 1的情況下,用來限制每個任務(wù)完成的時間,如果任何任務(wù)的執(zhí)行超過這個限制值,將會引發(fā)“TimeOutError”錯誤。
- pre_dispatch: {'all', integer, or expression, as in '3*n_jobs'}
預(yù)先分派的(任務(wù)的)批數(shù)(batches)。默認設(shè)置是“2 * n_jobs”。
ps: 這個參數(shù)有點難理解,直觀來說一下吧。pre_dispatch是預(yù)派遣的意思,就是提前先把任務(wù)派遣給各個處理器。
注意一下,這里的派遣并不是口頭的安排任務(wù),而是把任務(wù)和任務(wù)對應(yīng)的數(shù)據(jù)(劃重點)也發(fā)送給處理器。假設(shè)我們總共有12,3個處理器,如果不設(shè)置pre_dispatch,那么程序在開始時會一次性將所有任務(wù)都分配出去,一個處理器領(lǐng)到了四個任務(wù)。但是這個派遣過程需要為每個任務(wù)準備相應(yīng)的數(shù)據(jù),需要處理時間和占據(jù)內(nèi)存空間。
一次分配任務(wù)過多可能造成的后果就是準備數(shù)據(jù)的時間變長和可能造成內(nèi)存爆炸。所以設(shè)置一下預(yù)派遣任務(wù)的數(shù)量,減小預(yù)處理時間和內(nèi)存占用,保證處理器不空轉(zhuǎn)就行。
- batch_size: int or 'auto', default: 'auto' —— 一次性分派給每個worker的原子任務(wù)的數(shù)量
當(dāng)單個原子任務(wù)執(zhí)行非??鞎r,由于開銷的原因,使用dispatching的worker可能比順序計算慢。一起進行批量快速計算可以緩解這種情況。
“auto”策略會跟蹤一個批處理完成所需的時間,并動態(tài)調(diào)整batch_size大小,使用啟發(fā)式方法將時間保持在半秒以內(nèi)。初始batch_size為1。
batch_size="auto"且backend="threading時,將一次分派一個任務(wù)的batches,因為threading后端有非常小的開銷,使用更大的batch_size在這種情況下沒有證明帶來任何好處。
這個可以簡單的理解為每個處理器同時處理的任務(wù)數(shù)量,也就是一次分配每個處理的任務(wù)數(shù)量。
ps:插一句,嚴格從執(zhí)行順序來看實際上每個batch內(nèi)的原子任務(wù)還是順序執(zhí)行的(這點與深度學(xué)習(xí)框架中的batch是不同的),當(dāng)每個任務(wù)執(zhí)行時間非常短時,可以從邏輯上認為每個bacth內(nèi)的任務(wù)是同時執(zhí)行的。因此這個參數(shù)的應(yīng)用場景是每個任務(wù)單獨執(zhí)行的時間非常短,但是又希望使用并行加快速度的場景。
n_jobs,pre_dispatch,batch_size這三個參數(shù)要綜合理解。舉個例子,假設(shè)我們有24個任務(wù),設(shè)定n_jobs = 3(使用3個處理器),pre_disaptch = 2 * 3,batch_size = 2
那么實際上預(yù)派遣了2*3*bacth_size=12個任務(wù),也就是說隊列里有12個任務(wù)可供隨時選擇執(zhí)行。然后batch_size = 2,表示每個處理器需要拿走2個任務(wù),n_jobs=3的情況下,總共需要拿走n_jobs*batch_szie = 6個任務(wù)去執(zhí)行。這樣隊列里還有6個任務(wù)可供下次提取,保證了處理器不會空轉(zhuǎn)。
- temp_folder: str, 可選項
這個看名字就知道是臨時文件夾,存儲進程中的一些臨時緩存數(shù)據(jù)
- max_nbytes int, str, or None, optional, 1M by default
觸發(fā)緩存機制的閾值,當(dāng)worker中的numpy數(shù)組超過閾值時會觸發(fā)內(nèi)存映射
- mmap_mode: {None, 'r+', 'r', 'w+', 'c'}
映射后的數(shù)據(jù)的打開模式,就是常規(guī)的讀寫權(quán)限控制
- prefer: str in {'processes', 'threads'} or None, default: None
如果使用parallel_backend上下文管理器沒有選擇任何特定backend,則使用軟提示選擇默認backend。默認的基于進程(thread-based)的backend是“loky”,默認的基于線程的backend是“threading”。
如果指定了“backend”參數(shù),則忽略。
- require: 'sharedmem' or None, default None
硬約束選擇backend。如果設(shè)置為'sharedmem',即使用戶要求使用parallel_backend實現(xiàn)非基于線程的后端,所選backend也將是single-host和thread-based的。
使用示例:
from joblib import Parallel #4個線程 works_4 = Parallel(n_jobs=4,backend = 'threading') #2個進程 works_2 = Parallel(n_jobs=4,backend = 'multiprocessing')
(2)調(diào)用
Parallel實際上是一個類,但是實現(xiàn)了__call__方法,因此可以像函數(shù)一樣進行調(diào)用。
但是Parallel的輸入需要可迭代的對象,也就是一些任務(wù)的集合。這些任務(wù)由(function, args,kwargs)這種函數(shù)名和參數(shù)的方式表示。
from joblib import Parallel def f(x,y): return x+y works_2 = Parallel(n_jobs=2,backend = 'threading') task1 = [f, [2,3],{}] task2 = [f,[4],{'y':5}] res = works_2([task1,task2]) print(res) #out: res = [5,9]
上面的代碼我們先定義了一個2線程的并行處理器works_2,然后構(gòu)造了兩個任務(wù)task1和task2。[task1,task2]合并成一個任務(wù)集合作為works_2的參數(shù)進行執(zhí)行,最終我們也得到了2+3和4+5的正確結(jié)果。
我們可能會想任務(wù)集合中各個任務(wù)的函數(shù)可以不同嗎?答案當(dāng)然是可以的
from joblib import Parallel def f1(x,y): return x+y def f2(x,y): return y-x task1 = [f1, [4,10],{}] task2 = [f2,[4,10],{}] works_2 = Parallel(n_jobs=2,backend = 'threading') res = works_2([task1,task2]) print(res #out: res = [14,6]
講到這兒就可以基本寫出多任務(wù)并行的程序了,但是是不是覺得每次構(gòu)造一個任務(wù)集合非常的麻煩,需要指定函數(shù)名,參數(shù)啥的?;叵胍幌轮罢f的Dealyed函數(shù)不就是幫我們做這件事的嗎,自動創(chuàng)建一個由(函數(shù),args,kwargs)這樣的原子組成的列表。
from joblib import delayed def f(x,y): return x+y x_list = [1,2,3] y_list = [-1,-2,-3] tasks = [delayed(f)(x,y) for x,y in zip(x_list,y_list)] print(tasks)
結(jié)果如下:
[(<function f at 0x00000000081C2950>, (1, -1), {}),
(<function f at 0x00000000081C2950>, (2, -2), {}),
(<function f at 0x00000000081C2950>, (3, -3), {})]
這下我們結(jié)合Parallel和delayed更為方便的創(chuàng)建并行程序
from joblib import Parallel,delayed def f(x,y): return x+y x_list = [1,2,3] y_list = [-1,-2,-3] tasks = [delayed(f)(x,y) for x,y in zip(x_list,y_list)] print(tasks) works_2 = Parallel(n_jobs=2,backend = 'threading') res = works_2(tasks) print(res)
為了程序簡潔,我們可以省略一些中間過程,上面的代碼也可以寫為:
from joblib import Parallel,delayed def f(x,y): return x+y x_list = [1,2,3] y_list = [-1,-2,-3] res = Parallel(n_jobs=2,backend = 'threading')([delayed(f)(x,y) for x,y in zip(x_list,y_list)] ) print(res)
這就是我們一般看到的示例了。
到此這篇關(guān)于Python并行庫joblib之delayed函數(shù)與Parallel函數(shù)詳解的文章就介紹到這了,更多相關(guān)python并行庫joblib內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
基于python for in if 連著寫與分開寫的區(qū)別說明
這篇文章主要介紹了基于python for in if 連著寫與分開寫的區(qū)別說明,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-03-03scratch3.0二次開發(fā)之用blocks生成python代碼
python是blockl.generator的一個實例,會調(diào)用generator里的方法,這篇文章主要介紹了scratch3.0二次開發(fā)之用blocks生成python代碼,需要的朋友可以參考下2021-08-08在Mac OS上使用mod_wsgi連接Python與Apache服務(wù)器
這篇文章主要介紹了在Mac OS上使用mod_wsgi連接Python與Apache服務(wù)器的方法,同時文中還介紹了使用Python的Django框架時mod_wsgi連接方式下可能遇到的問題的一般解決方法,需要的朋友可以參考下2015-12-12flask的orm框架SQLAlchemy查詢實現(xiàn)解析
這篇文章主要介紹了flask的orm框架SQLAlchemy查詢實現(xiàn)解析,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-12-12