Python中的Joblib庫使用學習總結
Joblib
實踐環(huán)境
- python 3.6.2
- Joblib
簡介
Joblib是一組在Python中提供輕量級流水線的工具。特別是:
- 函數的透明磁盤緩存和延遲重新計算(記憶模式)
- 簡單易用的并行計算
Joblib已被優(yōu)化得很快速,很健壯了,特別是在大數據上,并對numpy數組進行了特定的優(yōu)化。
主要功能
1.輸出值的透明快速磁盤緩存(Transparent and fast disk-caching of output value):
Python函數的內存化或類似make的功能,適用于任意Python對象,包括非常大的numpy數組。
通過將操作寫成一組具有定義良好的輸入和輸出的步驟:Python函數,將持久性和流執(zhí)行邏輯與域邏輯或算法代碼分離開來。
Joblib可以將其計算保存到磁盤上,并僅在必要時重新運行:
>>> from joblib import Memory >>> cachedir = 'your_cache_dir_goes_here' >>> mem = Memory(cachedir) >>> import numpy as np >>> a = np.vander(np.arange(3)).astype(float) >>> square = mem.cache(np.square) >>> b = square(a) ______________________________________________________________________... [Memory] Calling square... square(array([[0., 0., 1.], [1., 1., 1.], [4., 2., 1.]])) _________________________________________________...square - ...s, 0.0min >>> c = square(a) # The above call did not trigger an evaluation
2.并行助手(parallel helper):
輕松編寫可讀的并行代碼并快速調試
>>> from joblib import Parallel, delayed >>> from math import sqrt >>> Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10)) [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] >>> res = Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10)) >>> res [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
3.快速壓縮的持久化(Fast compressed Persistence):
代替pickle在包含大數據的Python對象上高效工作( joblib.dump & joblib.load )。
parallel for loops
常見用法
Joblib提供了一個簡單的助手類,用于使用多進程為循環(huán)實現并行。核心思想是將要執(zhí)行的代碼編寫為生成器表達式,并將其轉換為并行計算
>>> from math import sqrt >>> [sqrt(i ** 2) for i in range(10)] [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
使用以下代碼,可以分布到2個CPU上:
>>> from math import sqrt >>> from joblib import Parallel, delayed >>> Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in range(10)) [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
輸出可以是一個生成器,在可以獲取結果時立即返回結果,即使后續(xù)任務尚未完成。輸出的順序始終與輸入的順序相匹配:輸出的順序總是匹配輸入的順序:
>>> from math import sqrt >>> from joblib import Parallel, delayed >>> parallel = Parallel(n_jobs=2, return_generator=True) # py3.7往后版本才支持return_generator參數 >>> output_generator = parallel(delayed(sqrt)(i ** 2) for i in range(10)) >>> print(type(output_generator)) <class 'generator'> >>> print(next(output_generator)) 0.0 >>> print(next(output_generator)) 1.0 >>> print(list(output_generator)) [2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
此生成器允許減少joblib.Parallel的內存占用調用
基于線程的并行VS基于進程的并行
默認情況下, joblib.Parallel 使用 'loky' 后端模塊啟動單獨的Python工作進程,以便在分散的CPU上同時執(zhí)行任務。
對于一般的Python程序來說,這是一個合理的默認值,但由于輸入和輸出數據需要在隊列中序列化以便同工作進程進行通信,因此可能會導致大量開銷(請參閱序列化和進程)。
當你知道你調用的函數是基于一個已編譯的擴展,并且該擴展在大部分計算過程中釋放了Python全局解釋器鎖(GIL)時,使用線程而不是Python進程作為并發(fā)工作者會更有效。
例如,在Cython函數的with nogil 塊中編寫CPU密集型代碼。
如果希望代碼有效地使用線程,只需傳遞 preferre='threads' 作為 joblib.Parallel 構造函數的參數即可。在這種情況下,joblib將自動使用 "threading" 后端,而不是默認的 "loky" 后端
>>> Parallel(n_jobs=2, prefer=threads')( ... delayed(sqrt)(i ** 2) for i in range(10)) [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
也可以在上下文管理器的幫助下手動選擇特定的后端實現:
>>> from joblib import parallel_backend >>> with parallel_backend('threading', n_jobs=2): ... Parallel()(delayed(sqrt)(i ** 2) for i in range(10)) ... [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
后者在調用內部使用 joblib.Parallel 的庫時特別有用,不會將后端部分作為其公共API的一部分公開。
'loky' 后端可能并不總是可獲取。
一些罕見的系統(tǒng)不支持多處理(例如Pyodide)。在這種情況下,loky后端不可用,使用線程作為默認后端。
除了內置的joblib后端之外,還可以使用幾個特定于集群的后端:
- 用于Dask集群的Dask后端
- 用于Ray集群的Ray后端
- 用于Spark集群上分發(fā)joblib任務的Joblib Apache Spark Backend
序列化與進程
要在多個python進程之間共享函數定義,必須依賴序列化協(xié)議。python中的標準協(xié)議是 pickle ,但它在標準庫中的默認實現有幾個限制。例如,它不能序列化交互式定義的函數或在 __main__ 模塊中定義的函數。
為了避免這種限制, loky 后端現在依賴于 cloudpickle 以序列化python對象。 cloudpickle 是 pickle 協(xié)議的另一種實現方式,允許序列化更多的對象,特別是交互式定義的函數。因此,對于大多數用途, loky 后端應該可以完美的工作。
cloudpickle 的主要缺點就是它可能比標準類庫中的 pickle 慢,特別是,對于大型python字典或列表來說,這一點至關重要,因為它們的序列化時間可能慢100倍。有兩種方法可以更改 joblib 的序列化過程以緩和此問題:
- 如果您在UNIX系統(tǒng)上,則可以切換回舊的 multiprocessing 后端。有了這個后端,可以使用很快速的 pickle 在工作進程中共享交互式定義的函數。該解決方案的主要問題是,使用 fork 啟動進程會破壞標準POSIX,并可能與 numpy 和 openblas 等第三方庫進行非正常交互。
- 如果希望將 loky 后端與不同的序列化庫一起使用,則可以設置 LOKY_PICKLER=mod_pickle 環(huán)境變量,以使用 mod_pickle 作為 loky 的序列化庫。作為參數傳遞的模塊 mod_pickle 應按 import mod_picke 導入,并且應包含一個 Pickler 對象,該對象將用于序列化為對象。可以設置 LOKY_PICKLER=pickle 以使用表中類庫中的pickling模塊。 LOKY_PICKLER=pickle 的主要缺點是不能序列化交互式定義的函數。為了解決該問題,可以將此解決方案與 joblib.wrap_non_picklable_objects() 一起使用, joblib.wrap_non_picklable_objects() 可用作裝飾器以為特定對下本地啟用 cloudpickle 。通過這種方式,可以為所有python對象使用速度快的picking,并在本地為交互式函數啟用慢速的pickling。查閱loky_wrapper獲取示例。
共享內存語義
joblib的默認后端將在獨立的Python進程中運行每個函數調用,因此它們不能更改主程序中定義的公共Python對象。
然而,如果并行函數確實需要依賴于線程的共享內存語義,則應顯示的使用 require='sharemem' ,例如:
>>> shared_set = set() >>> def collect(x): ... shared_set.add(x) ... >>> Parallel(n_jobs=2, require='sharedmem')( ... delayed(collect)(i) for i in range(5)) [None, None, None, None, None] >>> sorted(shared_set) [0, 1, 2, 3, 4]
請記住,從性能的角度來看,依賴共享內存語義可能是次優(yōu)的,因為對共享Python對象的并發(fā)訪問將受到鎖爭用的影響。
注意,不使用共享內存的情況下,任務進程之間的內存資源是相互獨立的,舉例說明如下:
#!/usr/bin/env python # -*- coding:utf-8 -*- import time import threading from joblib import Parallel, delayed, parallel_backend from collections import deque GLOBAL_LIST = [] class TestClass(): def __init__(self): self.job_queue = deque() def add_jobs(self): i = 0 while i < 3: time.sleep(1) i += 1 GLOBAL_LIST.append(i) self.job_queue.append(i) print('obj_id:', id(self), 'job_queue:', self.job_queue, 'global_list:', GLOBAL_LIST) def get_job_queue_list(obj): i = 0 while not obj.job_queue and i < 3: time.sleep(1) i += 1 print('obj_id:', id(obj), 'job_queue:', obj.job_queue, 'global_list:', GLOBAL_LIST) return obj.job_queue if __name__ == "__main__": obj = TestClass() def test_fun(): with parallel_backend("multiprocessing", n_jobs=2): Parallel()(delayed(get_job_queue_list)(obj) for i in range(2)) thread = threading.Thread(target=test_fun, name="parse_log") thread.start() time.sleep(1) obj.add_jobs() print('global_list_len:', len(GLOBAL_LIST))
控制臺輸出:
obj_id: 1554577912664 job_queue: deque([]) global_list: []
obj_id: 1930069893920 job_queue: deque([]) global_list: []
obj_id: 2378500766968 job_queue: deque([1]) global_list: [1]
obj_id: 1554577912664 job_queue: deque([]) global_list: []
obj_id: 1930069893920 job_queue: deque([]) global_list: []
obj_id: 2378500766968 job_queue: deque([1, 2]) global_list: [1, 2]
obj_id: 1554577912664 job_queue: deque([]) global_list: []
obj_id: 1930069893920 job_queue: deque([]) global_list: []
obj_id: 2378500766968 job_queue: deque([1, 2, 3]) global_list: [1, 2, 3]
global_list_len: 3
通過輸出可知,通過joblib.Parallel開啟的進程,其占用內存和主線程占用的內存資源是相互獨立
復用worer池
一些算法需要對并行函數進行多次連續(xù)調用,同時對中間結果進行處理。在一個循環(huán)中多次調用 joblib.Parallel 次優(yōu)的,因為它會多次創(chuàng)建和銷毀一個workde(線程或進程)池,這可能會導致大量開銷。
在這種情況下,使用 joblib.Parallel 類的上下文管理器API更有效,以便對 joblib.Parallel 對象的多次調用可以復用同一worker池。
from joblib import Parallel, delayed from math import sqrt with Parallel(n_jobs=2) as parallel: accumulator = 0. n_iter = 0 while accumulator < 1000: results = parallel(delayed(sqrt)(accumulator + i ** 2) for i in range(5)) accumulator += sum(results) # synchronization barrier n_iter += 1 print(accumulator, n_iter) #輸出: 1136.5969161564717 14
請注意,現在基于進程的并行默認使用 'loky' 后端,該后端會自動嘗試自己維護和重用worker池,即使是在沒有上下文管理器的調用中也是如此
筆者實踐發(fā)現,即便采用這種實現方式,其運行效率也是非常低下的,應該盡量避免這種設計(實踐環(huán)境 Python3.6)
Parallel參考文檔
class joblib.Parallel(n_jobs=default(None), backend=None, return_generator=False, verbose=default(0), timeout=None, pre_dispatch='2 * n_jobs', batch_size='auto', temp_folder=default(None), max_nbytes=default('1M'), mmap_mode=default('r'), prefer=default(None), require=default(None))
常用參數說明
- n_jobs :int, 默認: None
并發(fā)運行作業(yè)的最大數量,例如當 backend='multiprocessing' 時Python工作進程的數量,或者當 backend='threading' 時線程池的大小。如果設置為 -1,則使用所有CPU。如果設置為1,則根本不使用并行計算代碼,并且行為相當于一個簡單的python for循環(huán)。此模式與 timeout 不兼容。如果 n_jobs 小于-1,則使用 (n_cpus+1+n_jobs) 。因此,如果 n_jobs=-2 ,將使用除一個CPU之外的所有CPU。如果為 None ,則默認 n_jobs=1 ,除非在 parallel_backend() 上下文管理器下執(zhí)行調用,此時會為 n_jobs 設置另一個值。
- backend : str, ParallelBackendBase 實例或者 None , 默認: 'loky'
指定并行化后端實現。支持的后端有:
- loky 在與工作Python進程交換輸入和輸出數據時,默認使用的 loky 可能會導致一些通信和內存開銷。在一些罕見的系統(tǒng)(如Pyiode)上, loky 后端可能不可用。
- multiprocessing 以前基于進程的后端,基于 multiprocessing.Pool 。不如loky健壯。
- threading 是一個開銷很低的后端,但如果被調用的函數大量依賴于Python對象,它會受到Python GIL的影響。當執(zhí)行瓶頸是顯式釋放GIL的已編譯擴展時, threading 最有用(例如, with-nogil 塊中封裝的Cython循環(huán)或對NumPy等庫的昂貴調用)。
- 最后,可以通過調用 register_pallel_backend() 來注冊后端。
不建議在類庫中調用 Parallel 時對 backend 名稱進行硬編碼,取而代之,建議設置軟提示( prefer )或硬約束( require ),以便庫用戶可以使用 parallel_backend() 上下文管理器從外部更改 backend 。
- return_generator : bool
如果為 True ,則對此實例的調用將返回一個生成器,并在結果可獲取時立即按原始順序返回結果。請注意,預期用途是一次運行一個調用。對同一個Parallel對象的多次調用將導致 RuntimeError
- prefer : str 可選值 ‘processes’ , ‘threads’ , None , 默認: None
如果使用 parallel_backen() 上下文管理器時沒有指定特定后端,則選擇默認 prefer 給定值。默認的基于進程的后端是 loky ,而默認的基于線程的后端則是 threading 。如果指定了 backend 參數,則忽略該參數。
- require : ‘sharedmem’ 或者 None , 默認 None
用于選擇后端的硬約束。如果設置為 'sharedmem' ,則所選后端將是單主機和基于線程的,即使用戶要求使用具有 parallel_backend 的非基于線程的后端。
到此這篇關于Python中的Joblib庫使用學習總結的文章就介紹到這了,更多相關Python中的Joblib庫內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
教你用Python+selenium搭建自動化測試環(huán)境
今天給大家?guī)淼氖顷P于Python的相關知識,文章圍繞著如何用Python+selenium搭建自動化測試環(huán)境展開,文中有非常詳細的介紹,需要的朋友可以參考下2021-06-06Python3.4學習筆記之 idle 清屏擴展插件用法分析
這篇文章主要介紹了Python3.4 idle 清屏擴展插件用法,簡單分析了idle清屏的幾種方法及idle清屏插件的相關使用技巧,需要的朋友可以參考下2019-03-03