Python中的Joblib庫(kù)使用學(xué)習(xí)總結(jié)
Joblib
實(shí)踐環(huán)境
- python 3.6.2
- Joblib
簡(jiǎn)介
Joblib是一組在Python中提供輕量級(jí)流水線的工具。特別是:
- 函數(shù)的透明磁盤緩存和延遲重新計(jì)算(記憶模式)
- 簡(jiǎn)單易用的并行計(jì)算
Joblib已被優(yōu)化得很快速,很健壯了,特別是在大數(shù)據(jù)上,并對(duì)numpy數(shù)組進(jìn)行了特定的優(yōu)化。
主要功能
1.輸出值的透明快速磁盤緩存(Transparent and fast disk-caching of output value):
Python函數(shù)的內(nèi)存化或類似make的功能,適用于任意Python對(duì)象,包括非常大的numpy數(shù)組。
通過(guò)將操作寫成一組具有定義良好的輸入和輸出的步驟:Python函數(shù),將持久性和流執(zhí)行邏輯與域邏輯或算法代碼分離開(kāi)來(lái)。
Joblib可以將其計(jì)算保存到磁盤上,并僅在必要時(shí)重新運(yùn)行:
>>> from joblib import Memory
>>> cachedir = 'your_cache_dir_goes_here'
>>> mem = Memory(cachedir)
>>> import numpy as np
>>> a = np.vander(np.arange(3)).astype(float)
>>> square = mem.cache(np.square)
>>> b = square(a)
______________________________________________________________________...
[Memory] Calling square...
square(array([[0., 0., 1.],
[1., 1., 1.],
[4., 2., 1.]]))
_________________________________________________...square - ...s, 0.0min
>>> c = square(a)
# The above call did not trigger an evaluation2.并行助手(parallel helper):
輕松編寫可讀的并行代碼并快速調(diào)試
>>> from joblib import Parallel, delayed >>> from math import sqrt >>> Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10)) [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] >>> res = Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10)) >>> res [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
3.快速壓縮的持久化(Fast compressed Persistence):
代替pickle在包含大數(shù)據(jù)的Python對(duì)象上高效工作( joblib.dump & joblib.load )。
parallel for loops
常見(jiàn)用法
Joblib提供了一個(gè)簡(jiǎn)單的助手類,用于使用多進(jìn)程為循環(huán)實(shí)現(xiàn)并行。核心思想是將要執(zhí)行的代碼編寫為生成器表達(dá)式,并將其轉(zhuǎn)換為并行計(jì)算
>>> from math import sqrt >>> [sqrt(i ** 2) for i in range(10)] [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
使用以下代碼,可以分布到2個(gè)CPU上:
>>> from math import sqrt >>> from joblib import Parallel, delayed >>> Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in range(10)) [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
輸出可以是一個(gè)生成器,在可以獲取結(jié)果時(shí)立即返回結(jié)果,即使后續(xù)任務(wù)尚未完成。輸出的順序始終與輸入的順序相匹配:輸出的順序總是匹配輸入的順序:
>>> from math import sqrt >>> from joblib import Parallel, delayed >>> parallel = Parallel(n_jobs=2, return_generator=True) # py3.7往后版本才支持return_generator參數(shù) >>> output_generator = parallel(delayed(sqrt)(i ** 2) for i in range(10)) >>> print(type(output_generator)) <class 'generator'> >>> print(next(output_generator)) 0.0 >>> print(next(output_generator)) 1.0 >>> print(list(output_generator)) [2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
此生成器允許減少joblib.Parallel的內(nèi)存占用調(diào)用
基于線程的并行VS基于進(jìn)程的并行
默認(rèn)情況下, joblib.Parallel 使用 'loky' 后端模塊啟動(dòng)單獨(dú)的Python工作進(jìn)程,以便在分散的CPU上同時(shí)執(zhí)行任務(wù)。
對(duì)于一般的Python程序來(lái)說(shuō),這是一個(gè)合理的默認(rèn)值,但由于輸入和輸出數(shù)據(jù)需要在隊(duì)列中序列化以便同工作進(jìn)程進(jìn)行通信,因此可能會(huì)導(dǎo)致大量開(kāi)銷(請(qǐng)參閱序列化和進(jìn)程)。
當(dāng)你知道你調(diào)用的函數(shù)是基于一個(gè)已編譯的擴(kuò)展,并且該擴(kuò)展在大部分計(jì)算過(guò)程中釋放了Python全局解釋器鎖(GIL)時(shí),使用線程而不是Python進(jìn)程作為并發(fā)工作者會(huì)更有效。
例如,在Cython函數(shù)的with nogil 塊中編寫CPU密集型代碼。
如果希望代碼有效地使用線程,只需傳遞 preferre='threads' 作為 joblib.Parallel 構(gòu)造函數(shù)的參數(shù)即可。在這種情況下,joblib將自動(dòng)使用 "threading" 后端,而不是默認(rèn)的 "loky" 后端
>>> Parallel(n_jobs=2, prefer=threads')( ... delayed(sqrt)(i ** 2) for i in range(10)) [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
也可以在上下文管理器的幫助下手動(dòng)選擇特定的后端實(shí)現(xiàn):
>>> from joblib import parallel_backend
>>> with parallel_backend('threading', n_jobs=2):
... Parallel()(delayed(sqrt)(i ** 2) for i in range(10))
...
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]后者在調(diào)用內(nèi)部使用 joblib.Parallel 的庫(kù)時(shí)特別有用,不會(huì)將后端部分作為其公共API的一部分公開(kāi)。
'loky' 后端可能并不總是可獲取。
一些罕見(jiàn)的系統(tǒng)不支持多處理(例如Pyodide)。在這種情況下,loky后端不可用,使用線程作為默認(rèn)后端。
除了內(nèi)置的joblib后端之外,還可以使用幾個(gè)特定于集群的后端:
- 用于Dask集群的Dask后端
- 用于Ray集群的Ray后端
- 用于Spark集群上分發(fā)joblib任務(wù)的Joblib Apache Spark Backend
序列化與進(jìn)程
要在多個(gè)python進(jìn)程之間共享函數(shù)定義,必須依賴序列化協(xié)議。python中的標(biāo)準(zhǔn)協(xié)議是 pickle ,但它在標(biāo)準(zhǔn)庫(kù)中的默認(rèn)實(shí)現(xiàn)有幾個(gè)限制。例如,它不能序列化交互式定義的函數(shù)或在 __main__ 模塊中定義的函數(shù)。
為了避免這種限制, loky 后端現(xiàn)在依賴于 cloudpickle 以序列化python對(duì)象。 cloudpickle 是 pickle 協(xié)議的另一種實(shí)現(xiàn)方式,允許序列化更多的對(duì)象,特別是交互式定義的函數(shù)。因此,對(duì)于大多數(shù)用途, loky 后端應(yīng)該可以完美的工作。
cloudpickle 的主要缺點(diǎn)就是它可能比標(biāo)準(zhǔn)類庫(kù)中的 pickle 慢,特別是,對(duì)于大型python字典或列表來(lái)說(shuō),這一點(diǎn)至關(guān)重要,因?yàn)樗鼈兊男蛄谢瘯r(shí)間可能慢100倍。有兩種方法可以更改 joblib 的序列化過(guò)程以緩和此問(wèn)題:
- 如果您在UNIX系統(tǒng)上,則可以切換回舊的 multiprocessing 后端。有了這個(gè)后端,可以使用很快速的 pickle 在工作進(jìn)程中共享交互式定義的函數(shù)。該解決方案的主要問(wèn)題是,使用 fork 啟動(dòng)進(jìn)程會(huì)破壞標(biāo)準(zhǔn)POSIX,并可能與 numpy 和 openblas 等第三方庫(kù)進(jìn)行非正常交互。
- 如果希望將 loky 后端與不同的序列化庫(kù)一起使用,則可以設(shè)置 LOKY_PICKLER=mod_pickle 環(huán)境變量,以使用 mod_pickle 作為 loky 的序列化庫(kù)。作為參數(shù)傳遞的模塊 mod_pickle 應(yīng)按 import mod_picke 導(dǎo)入,并且應(yīng)包含一個(gè) Pickler 對(duì)象,該對(duì)象將用于序列化為對(duì)象??梢栽O(shè)置 LOKY_PICKLER=pickle 以使用表中類庫(kù)中的pickling模塊。 LOKY_PICKLER=pickle 的主要缺點(diǎn)是不能序列化交互式定義的函數(shù)。為了解決該問(wèn)題,可以將此解決方案與 joblib.wrap_non_picklable_objects() 一起使用, joblib.wrap_non_picklable_objects() 可用作裝飾器以為特定對(duì)下本地啟用 cloudpickle 。通過(guò)這種方式,可以為所有python對(duì)象使用速度快的picking,并在本地為交互式函數(shù)啟用慢速的pickling。查閱loky_wrapper獲取示例。
共享內(nèi)存語(yǔ)義
joblib的默認(rèn)后端將在獨(dú)立的Python進(jìn)程中運(yùn)行每個(gè)函數(shù)調(diào)用,因此它們不能更改主程序中定義的公共Python對(duì)象。
然而,如果并行函數(shù)確實(shí)需要依賴于線程的共享內(nèi)存語(yǔ)義,則應(yīng)顯示的使用 require='sharemem' ,例如:
>>> shared_set = set() >>> def collect(x): ... shared_set.add(x) ... >>> Parallel(n_jobs=2, require='sharedmem')( ... delayed(collect)(i) for i in range(5)) [None, None, None, None, None] >>> sorted(shared_set) [0, 1, 2, 3, 4]
請(qǐng)記住,從性能的角度來(lái)看,依賴共享內(nèi)存語(yǔ)義可能是次優(yōu)的,因?yàn)閷?duì)共享Python對(duì)象的并發(fā)訪問(wèn)將受到鎖爭(zhēng)用的影響。
注意,不使用共享內(nèi)存的情況下,任務(wù)進(jìn)程之間的內(nèi)存資源是相互獨(dú)立的,舉例說(shuō)明如下:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading
from joblib import Parallel, delayed, parallel_backend
from collections import deque
GLOBAL_LIST = []
class TestClass():
def __init__(self):
self.job_queue = deque()
def add_jobs(self):
i = 0
while i < 3:
time.sleep(1)
i += 1
GLOBAL_LIST.append(i)
self.job_queue.append(i)
print('obj_id:', id(self), 'job_queue:', self.job_queue, 'global_list:', GLOBAL_LIST)
def get_job_queue_list(obj):
i = 0
while not obj.job_queue and i < 3:
time.sleep(1)
i += 1
print('obj_id:', id(obj), 'job_queue:', obj.job_queue, 'global_list:', GLOBAL_LIST)
return obj.job_queue
if __name__ == "__main__":
obj = TestClass()
def test_fun():
with parallel_backend("multiprocessing", n_jobs=2):
Parallel()(delayed(get_job_queue_list)(obj) for i in range(2))
thread = threading.Thread(target=test_fun, name="parse_log")
thread.start()
time.sleep(1)
obj.add_jobs()
print('global_list_len:', len(GLOBAL_LIST))控制臺(tái)輸出:
obj_id: 1554577912664 job_queue: deque([]) global_list: []
obj_id: 1930069893920 job_queue: deque([]) global_list: []
obj_id: 2378500766968 job_queue: deque([1]) global_list: [1]
obj_id: 1554577912664 job_queue: deque([]) global_list: []
obj_id: 1930069893920 job_queue: deque([]) global_list: []
obj_id: 2378500766968 job_queue: deque([1, 2]) global_list: [1, 2]
obj_id: 1554577912664 job_queue: deque([]) global_list: []
obj_id: 1930069893920 job_queue: deque([]) global_list: []
obj_id: 2378500766968 job_queue: deque([1, 2, 3]) global_list: [1, 2, 3]
global_list_len: 3
通過(guò)輸出可知,通過(guò)joblib.Parallel開(kāi)啟的進(jìn)程,其占用內(nèi)存和主線程占用的內(nèi)存資源是相互獨(dú)立
復(fù)用worer池
一些算法需要對(duì)并行函數(shù)進(jìn)行多次連續(xù)調(diào)用,同時(shí)對(duì)中間結(jié)果進(jìn)行處理。在一個(gè)循環(huán)中多次調(diào)用 joblib.Parallel 次優(yōu)的,因?yàn)樗鼤?huì)多次創(chuàng)建和銷毀一個(gè)workde(線程或進(jìn)程)池,這可能會(huì)導(dǎo)致大量開(kāi)銷。
在這種情況下,使用 joblib.Parallel 類的上下文管理器API更有效,以便對(duì) joblib.Parallel 對(duì)象的多次調(diào)用可以復(fù)用同一worker池。
from joblib import Parallel, delayed
from math import sqrt
with Parallel(n_jobs=2) as parallel:
accumulator = 0.
n_iter = 0
while accumulator < 1000:
results = parallel(delayed(sqrt)(accumulator + i ** 2) for i in range(5))
accumulator += sum(results) # synchronization barrier
n_iter += 1
print(accumulator, n_iter) #輸出: 1136.5969161564717 14 請(qǐng)注意,現(xiàn)在基于進(jìn)程的并行默認(rèn)使用 'loky' 后端,該后端會(huì)自動(dòng)嘗試自己維護(hù)和重用worker池,即使是在沒(méi)有上下文管理器的調(diào)用中也是如此
筆者實(shí)踐發(fā)現(xiàn),即便采用這種實(shí)現(xiàn)方式,其運(yùn)行效率也是非常低下的,應(yīng)該盡量避免這種設(shè)計(jì)(實(shí)踐環(huán)境 Python3.6)
Parallel參考文檔
class joblib.Parallel(n_jobs=default(None), backend=None, return_generator=False, verbose=default(0), timeout=None, pre_dispatch='2 * n_jobs', batch_size='auto', temp_folder=default(None), max_nbytes=default('1M'), mmap_mode=default('r'), prefer=default(None), require=default(None))常用參數(shù)說(shuō)明
- n_jobs :int, 默認(rèn): None
并發(fā)運(yùn)行作業(yè)的最大數(shù)量,例如當(dāng) backend='multiprocessing' 時(shí)Python工作進(jìn)程的數(shù)量,或者當(dāng) backend='threading' 時(shí)線程池的大小。如果設(shè)置為 -1,則使用所有CPU。如果設(shè)置為1,則根本不使用并行計(jì)算代碼,并且行為相當(dāng)于一個(gè)簡(jiǎn)單的python for循環(huán)。此模式與 timeout 不兼容。如果 n_jobs 小于-1,則使用 (n_cpus+1+n_jobs) 。因此,如果 n_jobs=-2 ,將使用除一個(gè)CPU之外的所有CPU。如果為 None ,則默認(rèn) n_jobs=1 ,除非在 parallel_backend() 上下文管理器下執(zhí)行調(diào)用,此時(shí)會(huì)為 n_jobs 設(shè)置另一個(gè)值。
- backend : str, ParallelBackendBase 實(shí)例或者 None , 默認(rèn): 'loky'
指定并行化后端實(shí)現(xiàn)。支持的后端有:
- loky 在與工作Python進(jìn)程交換輸入和輸出數(shù)據(jù)時(shí),默認(rèn)使用的 loky 可能會(huì)導(dǎo)致一些通信和內(nèi)存開(kāi)銷。在一些罕見(jiàn)的系統(tǒng)(如Pyiode)上, loky 后端可能不可用。
- multiprocessing 以前基于進(jìn)程的后端,基于 multiprocessing.Pool 。不如loky健壯。
- threading 是一個(gè)開(kāi)銷很低的后端,但如果被調(diào)用的函數(shù)大量依賴于Python對(duì)象,它會(huì)受到Python GIL的影響。當(dāng)執(zhí)行瓶頸是顯式釋放GIL的已編譯擴(kuò)展時(shí), threading 最有用(例如, with-nogil 塊中封裝的Cython循環(huán)或?qū)umPy等庫(kù)的昂貴調(diào)用)。
- 最后,可以通過(guò)調(diào)用 register_pallel_backend() 來(lái)注冊(cè)后端。
不建議在類庫(kù)中調(diào)用 Parallel 時(shí)對(duì) backend 名稱進(jìn)行硬編碼,取而代之,建議設(shè)置軟提示( prefer )或硬約束( require ),以便庫(kù)用戶可以使用 parallel_backend() 上下文管理器從外部更改 backend 。
- return_generator : bool
如果為 True ,則對(duì)此實(shí)例的調(diào)用將返回一個(gè)生成器,并在結(jié)果可獲取時(shí)立即按原始順序返回結(jié)果。請(qǐng)注意,預(yù)期用途是一次運(yùn)行一個(gè)調(diào)用。對(duì)同一個(gè)Parallel對(duì)象的多次調(diào)用將導(dǎo)致 RuntimeError
- prefer : str 可選值 ‘processes’ , ‘threads’ , None , 默認(rèn): None
如果使用 parallel_backen() 上下文管理器時(shí)沒(méi)有指定特定后端,則選擇默認(rèn) prefer 給定值。默認(rèn)的基于進(jìn)程的后端是 loky ,而默認(rèn)的基于線程的后端則是 threading 。如果指定了 backend 參數(shù),則忽略該參數(shù)。
- require : ‘sharedmem’ 或者 None , 默認(rèn) None
用于選擇后端的硬約束。如果設(shè)置為 'sharedmem' ,則所選后端將是單主機(jī)和基于線程的,即使用戶要求使用具有 parallel_backend 的非基于線程的后端。
到此這篇關(guān)于Python中的Joblib庫(kù)使用學(xué)習(xí)總結(jié)的文章就介紹到這了,更多相關(guān)Python中的Joblib庫(kù)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python基礎(chǔ)學(xué)習(xí)之常見(jiàn)的內(nèi)建函數(shù)整理
所謂的內(nèi)建函數(shù),可以直接使用,而不需要import。下面這篇文章主要給大家整理介紹了關(guān)于Python基礎(chǔ)學(xué)習(xí)之常見(jiàn)的一些內(nèi)建函數(shù),文中通過(guò)示例代碼為大家介紹的非常詳細(xì),需要的朋友可以參考借鑒,下面跟著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。2017-09-09
python實(shí)現(xiàn)簡(jiǎn)易通訊錄修改版
這篇文章主要為大家詳細(xì)介紹了python實(shí)現(xiàn)簡(jiǎn)易通訊錄的修改版,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-03-03
教你用Python+selenium搭建自動(dòng)化測(cè)試環(huán)境
今天給大家?guī)?lái)的是關(guān)于Python的相關(guān)知識(shí),文章圍繞著如何用Python+selenium搭建自動(dòng)化測(cè)試環(huán)境展開(kāi),文中有非常詳細(xì)的介紹,需要的朋友可以參考下2021-06-06
Python3.4學(xué)習(xí)筆記之 idle 清屏擴(kuò)展插件用法分析
這篇文章主要介紹了Python3.4 idle 清屏擴(kuò)展插件用法,簡(jiǎn)單分析了idle清屏的幾種方法及idle清屏插件的相關(guān)使用技巧,需要的朋友可以參考下2019-03-03
Python中選擇結(jié)構(gòu)實(shí)例講解
在本篇文章里小編給大家整理了關(guān)于Python選擇結(jié)構(gòu)的基礎(chǔ)知識(shí)點(diǎn)及相關(guān)實(shí)例,有需要的朋友們可以學(xué)習(xí)參考下。2022-11-11
在Python中實(shí)現(xiàn)決策樹(shù)算法的示例代碼
決策樹(shù)(Decision Tree)是一種常見(jiàn)的機(jī)器學(xué)習(xí)算法,被廣泛應(yīng)用于分類和回歸任務(wù)中,并且再其之上的隨機(jī)森林和提升樹(shù)等算法一直是表格領(lǐng)域的最佳模型,所以本文將介紹理解其數(shù)學(xué)概念,并在Python中動(dòng)手實(shí)現(xiàn),這可以作為了解這類算法的基礎(chǔ)知識(shí)2023-08-08

