欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python并行庫joblib之delayed函數(shù)與Parallel函數(shù)詳解

 更新時間:2023年08月24日 10:16:15   作者:goodxin_ie  
這篇文章主要介紹了Python并行庫joblib之delayed函數(shù)與Parallel函數(shù)詳解,Joblib就是一個可以簡單地將Python代碼轉(zhuǎn)換為并行計算模式的軟件包,它可非常簡單并行我們的程序,從而提高計算速度,需要的朋友可以參考下

Joblib

Joblib就是一個可以簡單地將Python代碼轉(zhuǎn)換為并行計算模式的軟件包,它可非常簡單并行我們的程序,從而提高計算速度。

主要提供了以下功能

  • 程序并行
  • 用于在每次使用相同的輸入?yún)?shù)調(diào)用函數(shù)時將其返回值緩存
  • 數(shù)據(jù)存儲(包括不可哈希的數(shù)據(jù)和大規(guī)模numpy數(shù)組)

程序并行

joblib提供了一個簡單地程序并行方案,主要有Parallel函數(shù)實現(xiàn),并涉及了一個技巧性的函數(shù)delayed。

delayed函數(shù)

  以下為delayed函數(shù)的源碼

def delayed(function):
    """Decorator used to capture the arguments of a function."""
    def delayed_function(*args, **kwargs):
        return function, args, kwargs
    try:
        delayed_function = functools.wraps(function)(delayed_function)
    except AttributeError:
        " functools.wraps fails on some callable objects "
    return delayed_function

*functools.wraps 旨在消除裝飾器對原函數(shù)造成的影響,即對原函數(shù)的相關(guān)屬性進行拷貝,已達到裝飾器不修改原函數(shù)的目的。

從功能上來說,可以認為被wrap修飾后的函數(shù)與原函數(shù)功能完全相同,暫時忽略不計

delayed函數(shù)顧名思義就是延遲函數(shù)的執(zhí)行。

根據(jù)源碼來看,delayed函數(shù)保留被修飾的函數(shù)function和參數(shù)*args, **kwargs,在碰到調(diào)用時,并不直接執(zhí)行函數(shù)function(*args, **kwargs),而是返回返回元組(function,args,kwargs)。

返回的這個結(jié)果留待其他函數(shù)執(zhí)行,在joblib里具體是與Parallel配合的。

下面我們通過具體例子看一下delayed函數(shù)如何工作的

import functools
def delayed(function):
    """Decorator used to capture the arguments of a function."""
    def delayed_function(*args, **kwargs):
        return function, args, kwargs
    try:
        delayed_function = functools.wraps(function)(delayed_function)
    except AttributeError:
        " functools.wraps fails on some callable objects "
    return delayed_function
def f(x,y):
    return x+y
res = delayed(f)(1,y=3)
print(res)

執(zhí)行結(jié)果為:

(<function f at 0x00000000081C29D8>, (1,), {'y': 3})

返回了原始的函數(shù)f和調(diào)用它是的兩個參數(shù)。

上面也說過delayed函數(shù)其實是一個修飾器,因此上面的代碼與下面的寫法等價

@delayed    
def f(x,y):  
    return x+y
res = f(1,y=3)
print(res)

*delayed之后并未得到函數(shù)的執(zhí)行結(jié)果,我們?nèi)绻氲玫筋A(yù)期的執(zhí)行結(jié)果應(yīng)該怎么做呢?其實delayed函數(shù)主要是與其他函數(shù)配合的,我們可以再寫一個程序進行計算:

def f(x,y):    
    return x+y
res = delayed(f)(1,y=2)
print(res)
#out: (<function f at 0x00000000081C2510>, (1,), {'y': 2})
foo,args,kwargs = res
final_res = foo(*args,**kwargs)
print(final_res)
#out: 3

我們先解包了delayed之后的結(jié)果res,得到了函數(shù)f,和參數(shù)args,kwargs,然后調(diào)用foo(*args,**kwargs)得到最終得到期待的計算結(jié)果的final_res。

開始可能很難理解我們?yōu)槭裁匆舆t一個函數(shù)的調(diào)用呢,調(diào)用函數(shù)不就是為了得到執(zhí)行結(jié)果嗎,這樣延遲之后不就無法達到我們的預(yù)期了?

先別急,看看下面這個例子應(yīng)該就明白了

假設(shè)我們需要對x_list = [1,2,3], y_list = [ -1,-2,-3] 這兩個列表進行逐個元素相加。

可能會覺得這不是很容易實現(xiàn)嗎,依次遍歷兩個集合的元素,在循環(huán)過程中調(diào)用函數(shù)f進行元素相加就好了,可能寫出的代碼如下:

def f(x,y):    
    return x+y
x_list = [1,2,3]
y_list = [-1,-2,-3]
res = []
for x,y in zip(x_list,y_list):
    res.append(f(x,y))
print(res)

這樣線性的多次調(diào)用函數(shù)f當(dāng)然沒問題,但是如果我們并行的執(zhí)行f的三次調(diào)用,也就是同時執(zhí)行f(1,-1),f(2,-2),f(3,-3)時代碼該怎么寫呢?我們希望使用三個線程/進程,每個進程分別執(zhí)行一個函數(shù)調(diào)用。這時就可以看出delayed函數(shù)的用法了。

@delayed    
def f(x,y):    
    return x+y
x_list = [1,2,3]
y_list = [-1,-2,-3]
res = []
for x,y in zip(x_list,y_list):
    res.append(f(x,y))
print(res)

執(zhí)行之后的結(jié)果為:

[(<function f at 0x00000000081C2950>, (1, -1), {}), 
(<function f at 0x00000000081C2950>, (2, -2), {}), 
(<function f at 0x00000000081C2950>, (3, -3), {})]

我們獲得了由(函數(shù),args,kwargs)這樣的元組組成的列表,我們可以為列表中的每一個元組(f,args,kwargs)分配給不同的線程,在每個線程里面執(zhí)行一個f(args,kwargs)。這樣是不是就可以完成并行的目的了呢?如何將元組分配給不同的線程由函數(shù)Parallel實現(xiàn),后面再講。我們現(xiàn)在只需要知道的delayed函數(shù)只是為了生成(函數(shù),args,kwargs)這樣的元組,暫緩函數(shù)的執(zhí)行,方便將各個計算過程分配給不同的線程。

tips:一般我們只想得到一個這樣的列表,并不想改變原始函數(shù)f,因此上面的修飾器寫法一般等價的寫為:

 
def f(x,y):    
    return x+y
x_list = [1,2,3]
y_list = [-1,-2,-3]
res = [delayed(f)(x,y) for x,y in zip(x_list,y_list)]
print(res)

Parallel函數(shù)

看名字就知道Parallel主要的功能是實現(xiàn)程序并行。

因此在講Parallel之前,先看一下并行的實際處理流程:

Parallel實際上就是封裝了這個任務(wù)拆分,并行和結(jié)果合并的這個過程。其主要的功能是線程和進程的創(chuàng)建,和多任務(wù)執(zhí)行這個流程。

由于其源碼過于復(fù)雜,這里只看和調(diào)用相關(guān)的兩個部分:

(1)、初始化部分

Parallel的初始化主要是與程序并行的配置相關(guān),其函數(shù)定義為

class joblib.parallel(n_jobs=None, backend=None, verbose=0, timeout=None, pre_dispatch='2 * n_jobs', 
                   batch_size='auto',temp_folder=None, max_nbytes='1M', mmap_mode='r', prefer=None, require=None)

參數(shù)解釋:(參考:官方文檔

  • n_jobs: int, default: None —— 設(shè)置并行執(zhí)行任務(wù)的最大數(shù)量

當(dāng)backend="multiprocessing"時指python工作進程的數(shù)量,或者backend="threading"時指線程池大小。
 當(dāng)n_jobs=-1時,使用所有的CPU執(zhí)行并行計算;
當(dāng)n_jobs=1時,就不會使用并行代碼,即等同于順序執(zhí)行,可以在debug情況下使用;
當(dāng)n_jobs<-1時,將會使用(n_cpus + 1 + n_jobs)個CPU,例如n_jobs=-2時,將會使用n_cpus-1個CPU核,其中n_cpus為CPU核的數(shù)量;
當(dāng)n_jobs=None的情況等同于n_jobs=1。

  • backend: str,  default: 'loky' —— 指定并行化后端的實現(xiàn)方法
backend='loky': 在與Python進程交換輸入和輸出數(shù)據(jù)時,可導(dǎo)致一些通信和內(nèi)存開銷。
 
backend='multiprocessing': 基于multiprocessing.Pool的后端,魯棒性不如loky。
 
backend='threading': threading是一個開銷非常低的backend。但是如果被調(diào)用的函數(shù)大量依賴于Python對象,它就會受到Python全局解釋器(GIL)鎖的影響。當(dāng)執(zhí)行瓶頸是顯式釋放GIL的已編譯擴展時,
“threading”非常有用(例如,封裝在“with nogil”塊中的Cython循環(huán),或者對庫(如NumPy)的大量調(diào)用)。
  • verbose: int, 可選項 —— 執(zhí)行期間的信息顯示(ps:這個參數(shù)實際上有點類似log的日志級別,數(shù)字越大,顯示的程序執(zhí)行過程越詳細)
信息級別:如果非零,則打印進度消息。超過50,輸出被發(fā)送到stdout。消息的頻率隨著信息級別的增加而增加。如果大于10,則報告所有迭代。
  • timeout: float, 可選項 —— 任務(wù)運行時間限制
timeout僅用在n_jobs != 1的情況下,用來限制每個任務(wù)完成的時間,如果任何任務(wù)的執(zhí)行超過這個限制值,將會引發(fā)“TimeOutError”錯誤。
  • pre_dispatch: {'all', integer, or expression, as in '3*n_jobs'}
預(yù)先分派的(任務(wù)的)批數(shù)(batches)。默認設(shè)置是“2 * n_jobs”。

ps: 這個參數(shù)有點難理解,直觀來說一下吧。pre_dispatch是預(yù)派遣的意思,就是提前先把任務(wù)派遣給各個處理器。

注意一下,這里的派遣并不是口頭的安排任務(wù),而是把任務(wù)和任務(wù)對應(yīng)的數(shù)據(jù)(劃重點)也發(fā)送給處理器。假設(shè)我們總共有12,3個處理器,如果不設(shè)置pre_dispatch,那么程序在開始時會一次性將所有任務(wù)都分配出去,一個處理器領(lǐng)到了四個任務(wù)。但是這個派遣過程需要為每個任務(wù)準備相應(yīng)的數(shù)據(jù),需要處理時間和占據(jù)內(nèi)存空間。 

一次分配任務(wù)過多可能造成的后果就是準備數(shù)據(jù)的時間變長和可能造成內(nèi)存爆炸。所以設(shè)置一下預(yù)派遣任務(wù)的數(shù)量,減小預(yù)處理時間和內(nèi)存占用,保證處理器不空轉(zhuǎn)就行。

  • batch_size: int or 'auto', default: 'auto' —— 一次性分派給每個worker的原子任務(wù)的數(shù)量
當(dāng)單個原子任務(wù)執(zhí)行非??鞎r,由于開銷的原因,使用dispatching的worker可能比順序計算慢。一起進行批量快速計算可以緩解這種情況。
 
“auto”策略會跟蹤一個批處理完成所需的時間,并動態(tài)調(diào)整batch_size大小,使用啟發(fā)式方法將時間保持在半秒以內(nèi)。初始batch_size為1。
 
batch_size="auto"且backend="threading時,將一次分派一個任務(wù)的batches,因為threading后端有非常小的開銷,使用更大的batch_size在這種情況下沒有證明帶來任何好處。

這個可以簡單的理解為每個處理器同時處理的任務(wù)數(shù)量,也就是一次分配每個處理的任務(wù)數(shù)量。

ps:插一句,嚴格從執(zhí)行順序來看實際上每個batch內(nèi)的原子任務(wù)還是順序執(zhí)行的(這點與深度學(xué)習(xí)框架中的batch是不同的),當(dāng)每個任務(wù)執(zhí)行時間非常短時,可以從邏輯上認為每個bacth內(nèi)的任務(wù)是同時執(zhí)行的。因此這個參數(shù)的應(yīng)用場景是每個任務(wù)單獨執(zhí)行的時間非常短,但是又希望使用并行加快速度的場景。

n_jobs,pre_dispatch,batch_size這三個參數(shù)要綜合理解。舉個例子,假設(shè)我們有24個任務(wù),設(shè)定n_jobs = 3(使用3個處理器),pre_disaptch = 2 * 3,batch_size = 2

那么實際上預(yù)派遣了2*3*bacth_size=12個任務(wù),也就是說隊列里有12個任務(wù)可供隨時選擇執(zhí)行。然后batch_size = 2,表示每個處理器需要拿走2個任務(wù),n_jobs=3的情況下,總共需要拿走n_jobs*batch_szie = 6個任務(wù)去執(zhí)行。這樣隊列里還有6個任務(wù)可供下次提取,保證了處理器不會空轉(zhuǎn)。

  • temp_folder: str, 可選項
這個看名字就知道是臨時文件夾,存儲進程中的一些臨時緩存數(shù)據(jù)
  • max_nbytes int, str, or None, optional, 1M by default

觸發(fā)緩存機制的閾值,當(dāng)worker中的numpy數(shù)組超過閾值時會觸發(fā)內(nèi)存映射

  • mmap_mode: {None, 'r+', 'r', 'w+', 'c'}
映射后的數(shù)據(jù)的打開模式,就是常規(guī)的讀寫權(quán)限控制
  • prefer: str in {'processes', 'threads'} or None, default: None
如果使用parallel_backend上下文管理器沒有選擇任何特定backend,則使用軟提示選擇默認backend。默認的基于進程(thread-based)的backend是“loky”,默認的基于線程的backend是“threading”。
如果指定了“backend”參數(shù),則忽略。
  • require: 'sharedmem' or None, default None
硬約束選擇backend。如果設(shè)置為'sharedmem',即使用戶要求使用parallel_backend實現(xiàn)非基于線程的后端,所選backend也將是single-host和thread-based的。

使用示例:

from joblib import Parallel
#4個線程
works_4 = Parallel(n_jobs=4,backend = 'threading')
#2個進程
works_2 = Parallel(n_jobs=4,backend = 'multiprocessing')

(2)調(diào)用

Parallel實際上是一個類,但是實現(xiàn)了__call__方法,因此可以像函數(shù)一樣進行調(diào)用。

但是Parallel的輸入需要可迭代的對象,也就是一些任務(wù)的集合。這些任務(wù)由(function, args,kwargs)這種函數(shù)名和參數(shù)的方式表示。

from joblib import Parallel
def f(x,y):    
    return x+y
works_2 = Parallel(n_jobs=2,backend = 'threading')
task1 = [f, [2,3],{}]
task2 = [f,[4],{'y':5}]
res = works_2([task1,task2])
print(res)
#out: res = [5,9] 
 

上面的代碼我們先定義了一個2線程的并行處理器works_2,然后構(gòu)造了兩個任務(wù)task1和task2。[task1,task2]合并成一個任務(wù)集合作為works_2的參數(shù)進行執(zhí)行,最終我們也得到了2+3和4+5的正確結(jié)果。

我們可能會想任務(wù)集合中各個任務(wù)的函數(shù)可以不同嗎?答案當(dāng)然是可以的

from joblib import Parallel
def f1(x,y):    
    return x+y
def f2(x,y):    
    return y-x
task1 = [f1, [4,10],{}]
task2 = [f2,[4,10],{}]
works_2 = Parallel(n_jobs=2,backend = 'threading')
res = works_2([task1,task2])
print(res
#out: res = [14,6] 
 

講到這兒就可以基本寫出多任務(wù)并行的程序了,但是是不是覺得每次構(gòu)造一個任務(wù)集合非常的麻煩,需要指定函數(shù)名,參數(shù)啥的?;叵胍幌轮罢f的Dealyed函數(shù)不就是幫我們做這件事的嗎,自動創(chuàng)建一個由(函數(shù),args,kwargs)這樣的原子組成的列表。

from joblib import delayed
def f(x,y):    
    return x+y
x_list = [1,2,3]
y_list = [-1,-2,-3]
tasks = [delayed(f)(x,y) for x,y in zip(x_list,y_list)]
print(tasks)

結(jié)果如下:

[(<function f at 0x00000000081C2950>, (1, -1), {}), 
(<function f at 0x00000000081C2950>, (2, -2), {}), 
(<function f at 0x00000000081C2950>, (3, -3), {})]

這下我們結(jié)合Parallel和delayed更為方便的創(chuàng)建并行程序

from joblib import Parallel,delayed
def f(x,y):    
    return x+y
x_list = [1,2,3]
y_list = [-1,-2,-3]
tasks = [delayed(f)(x,y) for x,y in zip(x_list,y_list)]    
print(tasks)
works_2 = Parallel(n_jobs=2,backend = 'threading')
res = works_2(tasks)
print(res)

為了程序簡潔,我們可以省略一些中間過程,上面的代碼也可以寫為:

from joblib import Parallel,delayed
def f(x,y):    
    return x+y
x_list = [1,2,3]
y_list = [-1,-2,-3]
res = Parallel(n_jobs=2,backend = 'threading')([delayed(f)(x,y) for x,y in zip(x_list,y_list)] )  
print(res)

這就是我們一般看到的示例了。

到此這篇關(guān)于Python并行庫joblib之delayed函數(shù)與Parallel函數(shù)詳解的文章就介紹到這了,更多相關(guān)python并行庫joblib內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • python上傳時包含boundary時的解決方法

    python上傳時包含boundary時的解決方法

    這篇文章主要介紹了python上傳時包含boundary時的解決方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-04-04
  • 基于python for in if 連著寫與分開寫的區(qū)別說明

    基于python for in if 連著寫與分開寫的區(qū)別說明

    這篇文章主要介紹了基于python for in if 連著寫與分開寫的區(qū)別說明,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-03-03
  • scratch3.0二次開發(fā)之用blocks生成python代碼

    scratch3.0二次開發(fā)之用blocks生成python代碼

    python是blockl.generator的一個實例,會調(diào)用generator里的方法,這篇文章主要介紹了scratch3.0二次開發(fā)之用blocks生成python代碼,需要的朋友可以參考下
    2021-08-08
  • pytest中conftest.py使用小結(jié)

    pytest中conftest.py使用小結(jié)

    conftest.py文件是Pytest框架里面一個很重要的東西,本文主要介紹了pytest中conftest.py使用小結(jié),具有一定的參考價值,感興趣的可以了解一下
    2023-09-09
  • Python離線安裝PIL 模塊的方法

    Python離線安裝PIL 模塊的方法

    今天小編就為大家分享一篇Python離線安裝PIL 模塊的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2019-01-01
  • python imread讀取文件失敗的問題及解決

    python imread讀取文件失敗的問題及解決

    這篇文章主要介紹了python imread讀取文件失敗的問題及解決方案,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-08-08
  • 在Mac OS上使用mod_wsgi連接Python與Apache服務(wù)器

    在Mac OS上使用mod_wsgi連接Python與Apache服務(wù)器

    這篇文章主要介紹了在Mac OS上使用mod_wsgi連接Python與Apache服務(wù)器的方法,同時文中還介紹了使用Python的Django框架時mod_wsgi連接方式下可能遇到的問題的一般解決方法,需要的朋友可以參考下
    2015-12-12
  • Python Matplotlib初階使用入門教程

    Python Matplotlib初階使用入門教程

    本文介紹Python Matplotlib庫的入門求生級使用方法,本文通過圖文實例相結(jié)合給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧
    2021-11-11
  • python中range()與xrange()用法分析

    python中range()與xrange()用法分析

    這篇文章主要介紹了python中range()與xrange()用法,結(jié)合實例形式分析了range()與xrange()使用與效率上的區(qū)別,需要的朋友可以參考下
    2016-09-09
  • flask的orm框架SQLAlchemy查詢實現(xiàn)解析

    flask的orm框架SQLAlchemy查詢實現(xiàn)解析

    這篇文章主要介紹了flask的orm框架SQLAlchemy查詢實現(xiàn)解析,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2019-12-12

最新評論