Python Joblib庫使用方法案例總結(jié)
實(shí)踐環(huán)境
python 3.6.2
Joblib
簡介
Joblib是一組在Python中提供輕量級流水線的工具。特別是:
- 函數(shù)的透明磁盤緩存和延遲重新計(jì)算(記憶模式)
- 簡單易用的并行計(jì)算
Joblib已被優(yōu)化得很快速,很健壯了,特別是在大數(shù)據(jù)上,并對numpy數(shù)組進(jìn)行了特定的優(yōu)化。
主要功能
輸出值的透明快速磁盤緩存(Transparent and fast disk-caching of output value): Python函數(shù)的內(nèi)存化或類似make的功能,適用于任意Python對象,包括非常大的numpy數(shù)組。通過將操作寫成一組具有定義良好的輸入和輸出的步驟:Python函數(shù),將持久性和流執(zhí)行邏輯與域邏輯或算法代碼分離開來。Joblib可以將其計(jì)算保存到磁盤上,并僅在必要時(shí)重新運(yùn)行:
原文:
Transparent and fast disk-caching of output value: a memoize or make-like functionality for Python functions that works well for arbitrary Python objects, including very large numpy arrays. Separate persistence and flow-execution logic from domain logic or algorithmic code by writing the operations as a set of steps with well-defined inputs and outputs: Python functions. Joblib can save their computation to disk and rerun it only if necessary:
>>> from joblib import Memory >>> cachedir = 'your_cache_dir_goes_here' >>> mem = Memory(cachedir) >>> import numpy as np >>> a = np.vander(np.arange(3)).astype(float) >>> square = mem.cache(np.square) >>> b = square(a) ______________________________________________________________________... [Memory] Calling square... square(array([[0., 0., 1.], [1., 1., 1.], [4., 2., 1.]])) _________________________________________________...square - ...s, 0.0min >>> c = square(a) # The above call did not trigger an evaluation
并行助手(parallel helper):輕松編寫可讀的并行代碼并快速調(diào)試
>>> from joblib import Parallel, delayed >>> from math import sqrt >>> Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10)) [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] >>> res = Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10)) >>> res [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
快速壓縮的持久化(Fast compressed Persistence):代替pickle在包含大數(shù)據(jù)的Python對象上高效工作(
joblib.dump
&joblib.load
)。
parallel for loops
常見用法
Joblib提供了一個(gè)簡單的助手類,用于使用多進(jìn)程為循環(huán)實(shí)現(xiàn)并行。核心思想是將要執(zhí)行的代碼編寫為生成器表達(dá)式,并將其轉(zhuǎn)換為并行計(jì)算
>>> from math import sqrt >>> [sqrt(i ** 2) for i in range(10)] [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
使用以下代碼,可以分布到2個(gè)CPU上:
>>> from math import sqrt >>> from joblib import Parallel, delayed >>> Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in range(10)) [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
輸出可以是一個(gè)生成器,在可以獲取結(jié)果時(shí)立即返回結(jié)果,即使后續(xù)任務(wù)尚未完成。輸出的順序始終與輸入的順序相匹配:輸出的順序總是匹配輸入的順序:
>>> from math import sqrt >>> from joblib import Parallel, delayed >>> parallel = Parallel(n_jobs=2, return_generator=True) # py3.7往后版本才支持return_generator參數(shù) >>> output_generator = parallel(delayed(sqrt)(i ** 2) for i in range(10)) >>> print(type(output_generator)) <class 'generator'> >>> print(next(output_generator)) 0.0 >>> print(next(output_generator)) 1.0 >>> print(list(output_generator)) [2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
此生成器允許減少joblib.Parallel的內(nèi)存占用調(diào)用
基于線程的并行VS基于進(jìn)程的并行
默認(rèn)情況下,joblib.Parallel
使用'loky'
后端模塊啟動單獨(dú)的Python工作進(jìn)程,以便在分散的CPU上同時(shí)執(zhí)行任務(wù)。對于一般的Python程序來說,這是一個(gè)合理的默認(rèn)值,但由于輸入和輸出數(shù)據(jù)需要在隊(duì)列中序列化以便同工作進(jìn)程進(jìn)行通信,因此可能會導(dǎo)致大量開銷(請參閱序列化和進(jìn)程)。
當(dāng)你知道你調(diào)用的函數(shù)是基于一個(gè)已編譯的擴(kuò)展,并且該擴(kuò)展在大部分計(jì)算過程中釋放了Python全局解釋器鎖(GIL)時(shí),使用線程而不是Python進(jìn)程作為并發(fā)工作者會更有效。例如,在Cython函數(shù)的with nogil 塊中編寫CPU密集型代碼。
如果希望代碼有效地使用線程,只需傳遞preferre='threads'
作為joblib.Parallel
構(gòu)造函數(shù)的參數(shù)即可。在這種情況下,joblib將自動使用"threading"
后端,而不是默認(rèn)的"loky"
后端
>>> Parallel(n_jobs=2, prefer=threads')( ... delayed(sqrt)(i ** 2) for i in range(10)) [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
也可以在上下文管理器的幫助下手動選擇特定的后端實(shí)現(xiàn):
>>> from joblib import parallel_backend >>> with parallel_backend('threading', n_jobs=2): ... Parallel()(delayed(sqrt)(i ** 2) for i in range(10)) ... [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
后者在調(diào)用內(nèi)部使用joblib.Parallel
的庫時(shí)特別有用,不會將后端部分作為其公共API的一部分公開。
'loky'
后端可能并不總是可獲取。
一些罕見的系統(tǒng)不支持多處理(例如Pyodide)。在這種情況下,loky后端不可用,使用線程作為默認(rèn)后端。
除了內(nèi)置的joblib后端之外,還可以使用幾個(gè)特定于集群的后端:
- 用于Dask集群的Dask后端 (查閱Using Dask for single-machine parallel computing 以獲取示例),
- 用于Ray集群的Ray后端
- 用于Spark集群上分發(fā)joblib任務(wù)的Joblib Apache Spark Backend
序列化與進(jìn)程
要在多個(gè)python進(jìn)程之間共享函數(shù)定義,必須依賴序列化協(xié)議。python中的標(biāo)準(zhǔn)協(xié)議是pickle ,但它在標(biāo)準(zhǔn)庫中的默認(rèn)實(shí)現(xiàn)有幾個(gè)限制。例如,它不能序列化交互式定義的函數(shù)或在__main__
模塊中定義的函數(shù)。
為了避免這種限制,loky
后端現(xiàn)在依賴于cloudpickle以序列化python對象。cloudpickle
是pickle
協(xié)議的另一種實(shí)現(xiàn)方式,允許序列化更多的對象,特別是交互式定義的函數(shù)。因此,對于大多數(shù)用途,loky
后端應(yīng)該可以完美的工作。
cloudpickle
的主要缺點(diǎn)就是它可能比標(biāo)準(zhǔn)類庫中的pickle
慢,特別是,對于大型python字典或列表來說,這一點(diǎn)至關(guān)重要,因?yàn)樗鼈兊男蛄谢瘯r(shí)間可能慢100倍。有兩種方法可以更改 joblib
的序列化過程以緩和此問題:
如果您在UNIX系統(tǒng)上,則可以切換回舊的
multiprocessing
后端。有了這個(gè)后端,可以使用很快速的pickle
在工作進(jìn)程中共享交互式定義的函數(shù)。該解決方案的主要問題是,使用fork
啟動進(jìn)程會破壞標(biāo)準(zhǔn)POSIX,并可能與numpy
和openblas
等第三方庫進(jìn)行非正常交互。如果希望將
loky
后端與不同的序列化庫一起使用,則可以設(shè)置LOKY_PICKLER=mod_pickle
環(huán)境變量,以使用mod_pickle
作為loky
的序列化庫。作為參數(shù)傳遞的模塊mod_pickle
應(yīng)按import mod_picke
導(dǎo)入,并且應(yīng)包含一個(gè)Pickler
對象,該對象將用于序列化為對象??梢栽O(shè)置LOKY_PICKLER=pickle
以使用表中類庫中的pickling模塊。LOKY_PICKLER=pickle
的主要缺點(diǎn)是不能序列化交互式定義的函數(shù)。為了解決該問題,可以將此解決方案與joblib.wrap_non_picklable_objects() 一起使用,joblib.wrap_non_picklable_objects()
可用作裝飾器以為特定對下本地啟用cloudpickle。通過這種方式,可以為所有python對象使用速度快的picking,并在本地為交互式函數(shù)啟用慢速的pickling。查閱loky_wrapper獲取示例。
共享內(nèi)存語義
joblib的默認(rèn)后端將在獨(dú)立的Python進(jìn)程中運(yùn)行每個(gè)函數(shù)調(diào)用,因此它們不能更改主程序中定義的公共Python對象。
然而,如果并行函數(shù)確實(shí)需要依賴于線程的共享內(nèi)存語義,則應(yīng)顯示的使用require='sharemem'
,例如:
>>> shared_set = set() >>> def collect(x): ... shared_set.add(x) ... >>> Parallel(n_jobs=2, require='sharedmem')( ... delayed(collect)(i) for i in range(5)) [None, None, None, None, None] >>> sorted(shared_set) [0, 1, 2, 3, 4]
請記住,從性能的角度來看,依賴共享內(nèi)存語義可能是次優(yōu)的,因?yàn)閷蚕鞵ython對象的并發(fā)訪問將受到鎖爭用的影響。
注意,不使用共享內(nèi)存的情況下,任務(wù)進(jìn)程之間的內(nèi)存資源是相互獨(dú)立的,舉例說明如下:
#!/usr/bin/env python # -*- coding:utf-8 -*- import time import threading from joblib import Parallel, delayed, parallel_backend from collections import deque GLOBAL_LIST = [] class TestClass(): def __init__(self): self.job_queue = deque() def add_jobs(self): i = 0 while i < 3: time.sleep(1) i += 1 GLOBAL_LIST.append(i) self.job_queue.append(i) print('obj_id:', id(self), 'job_queue:', self.job_queue, 'global_list:', GLOBAL_LIST) def get_job_queue_list(obj): i = 0 while not obj.job_queue and i < 3: time.sleep(1) i += 1 print('obj_id:', id(obj), 'job_queue:', obj.job_queue, 'global_list:', GLOBAL_LIST) return obj.job_queue if __name__ == "__main__": obj = TestClass() def test_fun(): with parallel_backend("multiprocessing", n_jobs=2): Parallel()(delayed(get_job_queue_list)(obj) for i in range(2)) thread = threading.Thread(target=test_fun, name="parse_log") thread.start() time.sleep(1) obj.add_jobs() print('global_list_len:', len(GLOBAL_LIST))
控制臺輸出:
obj_id: 1554577912664 job_queue: deque([]) global_list: [] obj_id: 1930069893920 job_queue: deque([]) global_list: [] obj_id: 2378500766968 job_queue: deque([1]) global_list: [1] obj_id: 1554577912664 job_queue: deque([]) global_list: [] obj_id: 1930069893920 job_queue: deque([]) global_list: [] obj_id: 2378500766968 job_queue: deque([1, 2]) global_list: [1, 2] obj_id: 1554577912664 job_queue: deque([]) global_list: [] obj_id: 1930069893920 job_queue: deque([]) global_list: [] obj_id: 2378500766968 job_queue: deque([1, 2, 3]) global_list: [1, 2, 3] global_list_len: 3
通過輸出可知,通過joblib.Parallel開啟的進(jìn)程,其占用內(nèi)存和主線程占用的內(nèi)存資源是相互獨(dú)立
復(fù)用worer池
一些算法需要對并行函數(shù)進(jìn)行多次連續(xù)調(diào)用,同時(shí)對中間結(jié)果進(jìn)行處理。在一個(gè)循環(huán)中多次調(diào)用joblib.Parallel
次優(yōu)的,因?yàn)樗鼤啻蝿?chuàng)建和銷毀一個(gè)workde(線程或進(jìn)程)池,這可能會導(dǎo)致大量開銷。
在這種情況下,使用joblib.Parallel
類的上下文管理器API更有效,以便對joblib.Parallel
對象的多次調(diào)用可以復(fù)用同一worker池。
from joblib import Parallel, delayed from math import sqrt with Parallel(n_jobs=2) as parallel: accumulator = 0. n_iter = 0 while accumulator < 1000: results = parallel(delayed(sqrt)(accumulator + i ** 2) for i in range(5)) accumulator += sum(results) # synchronization barrier n_iter += 1 print(accumulator, n_iter) #輸出: 1136.5969161564717 14
請注意,現(xiàn)在基于進(jìn)程的并行默認(rèn)使用'loky'
后端,該后端會自動嘗試自己維護(hù)和重用worker池,即使是在沒有上下文管理器的調(diào)用中也是如此
筆者實(shí)踐發(fā)現(xiàn),即便采用這種實(shí)現(xiàn)方式,其運(yùn)行效率也是非常低下的,應(yīng)該盡量避免這種設(shè)計(jì)(實(shí)踐環(huán)境 Python3.6)
...略
Parallel參考文檔
class joblib.Parallel(n_jobs=default(None), backend=None, return_generator=False, verbose=default(0), timeout=None, pre_dispatch='2 * n_jobs', batch_size='auto', temp_folder=default(None), max_nbytes=default('1M'), mmap_mode=default('r'), prefer=default(None), require=default(None))
常用參數(shù)說明
n_jobs
:int, 默認(rèn):None
并發(fā)運(yùn)行作業(yè)的最大數(shù)量,例如當(dāng)
backend='multiprocessing'
時(shí)Python工作進(jìn)程的數(shù)量,或者當(dāng)backend='threading'
時(shí)線程池的大小。如果設(shè)置為 -1,則使用所有CPU。如果設(shè)置為1,則根本不使用并行計(jì)算代碼,并且行為相當(dāng)于一個(gè)簡單的python for循環(huán)。此模式與timeout
不兼容。如果n_jobs
小于-1,則使用(n_cpus+1+n_jobs)
。因此,如果n_jobs=-2
,將使用除一個(gè)CPU之外的所有CPU。如果為None
,則默認(rèn)n_jobs=1
,除非在parallel_backend()
上下文管理器下執(zhí)行調(diào)用,此時(shí)會為n_jobs
設(shè)置另一個(gè)值。backend
: str,ParallelBackendBase
實(shí)例或者None
, 默認(rèn):'loky'
指定并行化后端實(shí)現(xiàn)。支持的后端有:
loky
在與工作Python進(jìn)程交換輸入和輸出數(shù)據(jù)時(shí),默認(rèn)使用的loky
可能會導(dǎo)致一些通信和內(nèi)存開銷。在一些罕見的系統(tǒng)(如Pyiode)上,loky
后端可能不可用。multiprocessing
以前基于進(jìn)程的后端,基于multiprocessing.Pool
。不如loky健壯。threading
是一個(gè)開銷很低的后端,但如果被調(diào)用的函數(shù)大量依賴于Python對象,它會受到Python GIL的影響。當(dāng)執(zhí)行瓶頸是顯式釋放GIL的已編譯擴(kuò)展時(shí),threading
最有用(例如,with-nogil
塊中封裝的Cython循環(huán)或?qū)umPy等庫的昂貴調(diào)用)。最后,可以通過調(diào)用
register_pallel_backend()
來注冊后端。
不建議在類庫中調(diào)用
Parallel
時(shí)對backend
名稱進(jìn)行硬編碼,取而代之,建議設(shè)置軟提示(prefer
)或硬約束(require
),以便庫用戶可以使用parallel_backend()
上下文管理器從外部更改backend
。return_generator
: bool如果為
True
,則對此實(shí)例的調(diào)用將返回一個(gè)生成器,并在結(jié)果可獲取時(shí)立即按原始順序返回結(jié)果。請注意,預(yù)期用途是一次運(yùn)行一個(gè)調(diào)用。對同一個(gè)Parallel對象的多次調(diào)用將導(dǎo)致RuntimeError
prefer
: str 可選值‘processes’
,‘threads’
,None
, 默認(rèn):None
如果使用
parallel_backen()
上下文管理器時(shí)沒有指定特定后端,則選擇默認(rèn)prefer
給定值。默認(rèn)的基于進(jìn)程的后端是loky
,而默認(rèn)的基于線程的后端則是threading
。如果指定了backend
參數(shù),則忽略該參數(shù)。require
:‘sharedmem’
或者None
, 默認(rèn)None
用于選擇后端的硬約束。如果設(shè)置為
'sharedmem'
,則所選后端將是單主機(jī)和基于線程的,即使用戶要求使用具有parallel_backend
的非基于線程的后端。
參考文檔
https://joblib.readthedocs.io/en/latest/
https://joblib.readthedocs.io/
https://joblib.readthedocs.io/en/latest/parallel.html#common-usage
Python Joblib庫是一個(gè)非常實(shí)用的工具庫,可以幫助用戶更高效地處理數(shù)據(jù)。它提供了并行計(jì)算和數(shù)據(jù)預(yù)處理的功能,可以大大縮短計(jì)算時(shí)間,提高計(jì)算效率。使用Joblib庫,用戶只需要簡單地調(diào)用相關(guān)函數(shù)即可完成任務(wù),非常方便。在實(shí)際應(yīng)用中,Joblib庫可以幫助用戶處理大量數(shù)據(jù),提高數(shù)據(jù)處理的效率和準(zhǔn)確性。因此,學(xué)習(xí)和掌握J(rèn)oblib庫的使用方法對于數(shù)據(jù)科學(xué)從業(yè)者來說非常重要。
到此這篇關(guān)于Python Joblib庫使用方法案例總結(jié)的文章就介紹到這了,更多相關(guān)Python Joblib庫總結(jié)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Tensorflow加載預(yù)訓(xùn)練模型和保存模型的實(shí)例
今天小編就為大家分享一篇Tensorflow加載預(yù)訓(xùn)練模型和保存模型的實(shí)例,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-07-07Python TensorFlow介紹與實(shí)戰(zhàn)
這篇文章介紹了Python TensorFlow介紹與實(shí)戰(zhàn),通過本文的介紹,我們不僅了解了TensorFlow的基本概念和安裝方法,還通過線性回歸和卷積神經(jīng)網(wǎng)絡(luò)的實(shí)例,深入探討了 TensorFlow 的使用技巧,TensorFlow 的強(qiáng)大功能和靈活性使其成為深度學(xué)習(xí)領(lǐng)域的重要工具,需要的朋友可以參考下2024-07-07Python虛擬環(huán)境的創(chuàng)建和使用詳解
這篇文章主要給大家介紹了關(guān)于Python虛擬環(huán)境的創(chuàng)建和使用的相關(guān)資料,文中通過圖文介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09利用Tensorflow的隊(duì)列多線程讀取數(shù)據(jù)方式
今天小編就為大家分享一篇利用Tensorflow的隊(duì)列多線程讀取數(shù)據(jù)方式,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-02-02Python pydotplus安裝及可視化圖形創(chuàng)建教程
這篇文章主要為大家介紹了Python pydotplus安裝及可視化圖形創(chuàng)建教程示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-10-10對pandas中時(shí)間窗函數(shù)rolling的使用詳解
今天小編就為大家分享一篇對pandas中時(shí)間窗函數(shù)rolling的使用詳解,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-11-11