Python并行計算庫Joblib高效使用指北
Joblib是用于高效并行計算的Python開源庫,其提供了簡單易用的內存映射和并行計算的工具,以將任務分發(fā)到多個工作進程中。Joblib庫特別適合用于需要進行重復計算或大規(guī)模數(shù)據(jù)處理的任務。Joblib庫的官方倉庫見:joblib,官方文檔見:joblib-doc。
Jolib庫安裝代碼如下:
pip install joblib
# 查看版本 import joblib joblib.__version__
'1.4.2'
1 使用說明
Joblib庫主要功能涵蓋以下三大塊:
- 記憶模式:Memory類將函數(shù)的返回值緩存到磁盤。下次調用時,如果輸入?yún)?shù)不變,就直接從緩存中加載結果,避免重復計算。
- 并行計算:Parallel類將任務拆分到多個進程或者線程中并行執(zhí)行,加速計算過程。
- 高效的序列化:針對NumPy數(shù)組等大型數(shù)據(jù)對象進行了優(yōu)化,且序列化和反序列化速度快。
1.1 Memory類
Joblib庫的Memory類支持通過記憶模式,將函數(shù)的計算結果存儲起來,以便在下次使用時直接調用。這種機制的優(yōu)勢在于加速計算過程、節(jié)約資源以及簡化管理。
Memory類構造函數(shù)如下:
class joblib.Memory(location=None, backend='local', mmap_mode=None, compress=False, verbose=1, bytes_limit=None, backend_options=None)
參數(shù)介紹如下:
- location: 緩存文件的存放位置。如果設置為 None,則不緩存。
- backend: 緩存的后端存儲方式。默認是 "local",表示使用本地文件系統(tǒng)。
- mmap_mode: 一個字符串,表示內存映射文件的模式(None, ‘r+’, ‘r’, ‘w+’, ‘c’)。
- compress: 表示是否壓縮緩存文件。壓縮可以節(jié)省磁盤空間,但會增加 I/O 操作的時間。
- verbose: 一個整數(shù),表示日志的詳細程度。0 表示沒有輸出,1 表示只輸出警告,2 表示輸出信息,3 表示輸出調試信息。
- bytes_limit: 一個整數(shù)或 None,表示緩存使用的字節(jié)數(shù)限制。如果緩存超過了這個限制,最舊的緩存文件將被刪除。
- backend_options: 傳遞給緩存后端的選項。
Memory類簡單使用
下面代碼展示第一次調用函數(shù)并緩存結果:
from joblib import Memory import os, shutil # 創(chuàng)建一個Memory對象,指定緩存目錄為當前目錄下的run文件夾 # verbose=0表示關閉詳細輸出 cachedir = './run' if os.path.exists(cachedir): shutil.rmtree(cachedir) memory = Memory(cachedir, verbose=0) # 使用@memory.cache裝飾器,將函數(shù)f的結果緩存起來 @memory.cache def f(x): # 只有當函數(shù)的輸入?yún)?shù)x沒有被緩存時,才會執(zhí)行函數(shù)體內的代碼 print('Running f(%s)' % x) return x # 第一次調用f(1),會執(zhí)行函數(shù)體內的代碼,并將結果緩存起來 print(f(1))
Running f(1) 1
第二次調用函數(shù):
# 第二次調用f(1),由于結果已經被緩存,不會再次執(zhí)行函數(shù)體內的代碼,而是直接從緩存中讀取結果 print(f(1))
1
調用其他函數(shù):
# 調用f(2),由于輸入?yún)?shù)不同,會再次執(zhí)行函數(shù)體內的代碼,并將結果緩存起來 print(f(2))
Running f(2) 2
將Memory類應用于numpy數(shù)組
import numpy as np from joblib import Memory import os, shutil cachedir = './run' if os.path.exists(cachedir): shutil.rmtree(cachedir) memory = Memory(cachedir, verbose=0) @memory.cache def g(x): print('A long-running calculation, with parameter %s' % x) # 返回漢明窗 return np.hamming(x) @memory.cache def h(x): print('A second long-running calculation, using g(x)') # 生成范德蒙德矩陣 return np.vander(x) # 調用函數(shù)g,傳入?yún)?shù)3,并將結果存儲在變量a中 a = g(3) # 打印變量a的值 print(a) # 再次調用函數(shù)g,傳入相同的參數(shù)3,由于結果已被緩存,不會重新計算 print(g(3))
A long-running calculation, with parameter 3 [0.08 1. 0.08] [0.08 1. 0.08]
直接計算和緩存結果是等同的:
# 調用函數(shù)h,傳入變量a作為參數(shù),并將結果存儲在變量b中 b = h(a) # 再次調用函數(shù)h,傳入相同的參數(shù)a,由于結果已被緩存,不會重新計算 b2 = h(a) # 使用numpy的allclose函數(shù)檢查b和b2是否足夠接近,即它們是否相等 print(np.allclose(b, b2))
A second long-running calculation, using g(x) True
直接調用緩存結果
import numpy as np from joblib import Memory import os, shutil # 設置緩存目錄的路徑。 cachedir = './run' # 檢查緩存目錄是否存在。 if os.path.exists(cachedir): # 如果緩存目錄存在,使用shutil.rmtree刪除該目錄及其內容。 shutil.rmtree(cachedir) # 初始化Memory對象,設置緩存目錄為上面定義的cachedir,mmap_mode設置為'r',表示只讀模式。 memory = Memory(cachedir, mmap_mode='r', verbose=0) # 使用memory.cache裝飾器緩存np.square函數(shù)的結果。 square = memory.cache(np.square) a = np.vander(np.arange(3)).astype(float) # 打印通過square函數(shù)處理后的矩陣a。 print(square(a)) # 獲取a的緩存結果 result = square.call_and_shelve(a) print(result.get()) # 獲取并打印緩存的結果。
[[ 0. 0. 1.] [ 1. 1. 1.] [16. 4. 1.]] [[ 0. 0. 1.] [ 1. 1. 1.] [16. 4. 1.]]
類中使用緩存
Memory類不建議將其直接用于類方法。如果想在類中使用緩存,建議的模式是在類中使用單獨定義的緩存函數(shù),如下所示:
@memory.cache def compute_func(arg1, arg2, arg3): pass class Foo(object): def __init__(self, args): self.data = None def compute(self): # 類中調用緩存的函數(shù) self.data = compute_func(self.arg1, self.arg2, 40)
1.2 Parallel類
Joblib庫的Parallel類用于簡單快速將任務分解為多個子任務,并分配到不同的CPU核心或機器上執(zhí)行,從而顯著提高程序的運行效率。
Parallel類構造函數(shù)及主要參數(shù)如下:
class joblib.Parallel(n_jobs=default(None), backend=default(None), return_as='list', verbose=default(0), timeout=None, batch_size='auto', pre_dispatch='2 * n_jobs', temp_folder=default(None), max_nbytes=default('1M'), require=default(None))
參數(shù)介紹如下:
- n_jobs: 指定并行任務的數(shù)量,為-1時表示使用所有可用的CPU核心;為None時表示使用單個進程。
- backend:指定并行化的后端,可選項:
- 'loky':使用loky庫實現(xiàn)多進程,該庫由joblib開發(fā)者開發(fā),默認選項。
- 'threading':使用threading庫實現(xiàn)多線程。
- 'multiprocessing':使用multiprocessing庫實現(xiàn)多進程。
- return_as:返回結果格式,可選項:
- 'list:列表。
- generator:按照任務提交順序生成結果的生成器。
- generator_unordered:按照執(zhí)行結果完成先后順序的生成器。
- verbose: 一個整數(shù),表示日志的詳細程度。0 表示沒有輸出,1 表示只輸出警告,2 表示輸出信息,3 表示輸出調試信息。
- timeout:單個任務最大運行時長,超時將引發(fā)TimeOutError。僅適用于n_jobs不為1的情況。
- batch_size:當Parallel類執(zhí)行任務時,會將任務分批處理。batch_size參數(shù)決定了每個批次中包含的任務數(shù)。
- pre_dispatch: 用來決定在并行計算開始之前,每個批次有多少個任務會被預先準備好并等待被分配給單個工作進程。默認值為“2*n_jobs”,表示并行計算時可以使用2倍工作進程的任務數(shù)量。
- temp_folder:指定臨時文件的存儲路徑。
- max_nbytes:傳遞給工作程序的數(shù)組大小的閾值。
- require:對運行任務的要求,可選None和sharedmem。sharedmem表示將使用共享內存來執(zhí)行并行任務,但會影響計算性能。
簡單示例
以下代碼展示了單線程直接運行計算密集型任務結果:
from joblib import Parallel, delayed import numpy as np import time start = time.time() # 定義一個計算密集型函數(shù) def compute_heavy_task(data): # 模擬處理時間 time.sleep(1) # 數(shù)值計算 result = np.sum(np.square(data)) return result # 生成一些模擬數(shù)據(jù) # 設置隨機數(shù)生成器的種子 np.random.seed(42) data = np.random.rand(10, 1000) # 10個1000維的向量 results = [compute_heavy_task(d) for d in data] # 打印結果的和 print(f"結果: {sum(results)}") print(f"耗時:{time.time()-start}s")
結果: 3269.16485027708
耗時:10.101513624191284s
以下代碼展示利用Parallel類創(chuàng)建多進程運行計算密集型任務結果:
from joblib import Parallel, delayed import numpy as np import time start = time.time() # 定義一個計算密集型函數(shù) def compute_heavy_task(data): # 模擬處理時間 time.sleep(1) # 數(shù)值計算 result = np.sum(np.square(data)) return result # 設置隨機數(shù)生成器的種子 np.random.seed(42) # 生成一些模擬數(shù)據(jù) data = np.random.rand(10, 1000) # 10個1000維的向量 # 使用Parallel來并行執(zhí)行任務 results = Parallel(n_jobs=8, return_as="generator")(delayed(compute_heavy_task)(d) for d in data) # 打印結果的和 print(f"結果: {sum(results)}") print(f"耗時:{time.time()-start}s")
結果: 3269.16485027708
耗時:2.381772041320801s
可以看到joblib庫利用多進程技術顯著提高了任務執(zhí)行的效率。然而,當面對I/O密集型任務或執(zhí)行時間極短的任務時,多線程或多進程的優(yōu)勢可能并不明顯。這是因為線程創(chuàng)建和上下文切換的開銷有時可能超過任務本身的執(zhí)行時間。以上述的compute_heavy_task函數(shù)為例,如果移除了其中的time.sleep函數(shù),多進程執(zhí)行所需的時間將會顯著增加。
此外獲取當前系統(tǒng)的cpu核心數(shù)(邏輯處理器)代碼如下:
import joblib # 獲取當前系統(tǒng)的cpu核心數(shù) n_cores = joblib.cpu_count() print(f'系統(tǒng)的核心數(shù)是:{n_cores}')
系統(tǒng)的核心數(shù)是:16
不同并行方式對比
以下代碼展示了不同并行方式在Parallel類中的應用。默認使用loky多進程:
# 使用loky多進程 from joblib import Parallel, delayed import numpy as np import time start = time.time() # 定義一個計算密集型函數(shù) def compute_heavy_task(data): # 模擬處理時間 time.sleep(1) # 數(shù)值計算 result = np.sum(np.square(data)) return result # 生成一些模擬數(shù)據(jù) data = np.random.rand(10, 1000) # 10個1000維的向量 results = Parallel(n_jobs=8, return_as="generator", backend='loky')(delayed(compute_heavy_task)(d) for d in data) # 打印結果的和 print(f"結果: {sum(results)}") print(f"耗時:{time.time()-start}s")
結果: 3382.3336437893217
耗時:2.042675256729126s
以下代碼展示了threading多線程的使用,注意由于Python的全局解釋器鎖(GIL)確保在任何時刻只有一個線程執(zhí)行Python字節(jié)碼。這表明即使在多核處理器上,Python的線程也無法實現(xiàn)真正的并行計算。然而,當涉及到處理I/O密集型任務或需要快速響應的小規(guī)模任務時,多線程依然具有優(yōu)勢:
# 使用threading多線程 start = time.time() results = Parallel(n_jobs=8, return_as="generator", backend = 'threading')(delayed(compute_heavy_task)(d) for d in data) # 打印結果的和 print(f"結果: {sum(results)}") print(f"耗時:{time.time()-start}s")
結果: 3382.3336437893217
耗時:2.040527105331421s
以下代碼展示了multiprocessing多進程的使用,注意Windows下需要將multiprocessing相關代碼放在main函數(shù)中:
from joblib import Parallel, delayed import numpy as np import time # 定義一個計算密集型函數(shù) def compute_heavy_task(data): # 模擬處理時間 time.sleep(1) # 數(shù)值計算 result = np.sum(np.square(data)) return result def main(): start = time.time() # 生成一些模擬數(shù)據(jù) data = np.random.rand(10, 1000) # 10個1000維的向量 # multiprocessing不支持返回rgenerator results = Parallel(n_jobs=8, return_as="list", backend='multiprocessing')(delayed(compute_heavy_task)(d) for d in data) # 打印結果的和 print(f"結果: {sum(results)}") print(f"耗時:{time.time()-start}s") if __name__ == '__main__': main()
結果: 3304.6651996375645
耗時:2.4303956031799316s
以下是loky
、threading
和 multiprocessing
的一些關鍵特性對比:
特性/庫 | loky | threading | multiprocessing |
---|---|---|---|
適用平臺 | 跨平臺 | 跨平臺 | 跨平臺,但Windows上存在限制 |
進程/線程模型 | 進程 | 線程 | 進程 |
GIL影響 | 無 | 有 | 無 |
適用場景 | CPU密集型任務 | I/O密集型任務 | CPU密集型任務 |
啟動開銷 | 較小 | 較小 | 較大 |
內存使用 | 較高 | 較低 | 較高 |
進程間通信 | 通過管道、隊列等 | 通過共享數(shù)據(jù)結構 | 通過管道、隊列等 |
線程間通信 | 共享數(shù)據(jù)結構 | 共享數(shù)據(jù)結構 | 不適用 |
異常處理 | 進程間獨立 | 線程間共享 | 進程間獨立 |
調試難度 | 較高 | 較低 | 較高 |
適用框架 | 通用 | 通用 | 通用 |
Python中線程和進程簡單對比如下:
- 資源共享:線程共享同一進程的內存和資源,而進程擁有獨立的內存空間。
- GIL影響:線程受GIL限制,進程不受GIL限制。
- 開銷:線程的創(chuàng)建和切換開銷小,進程的創(chuàng)建和切換開銷大。
- 適用性:線程適合I/O密集型任務,進程適合CPU密集型任務。
- 通信:線程間通信簡單但需要處理同步問題,進程間通信復雜但天然隔離。
在實際應用中,選擇使用線程還是進程取決于任務的特性和性能需求。如果任務主要是I/O密集型,使用線程可以提高性能;如果任務是CPU密集型,使用進程可以更好地利用多核處理器的計算能力。
共享內存
默認情況下,Parallel類執(zhí)行任務時各個任務不共享內存,如下所示:
from joblib import Parallel, delayed shared_set = set() def collect(x): shared_set.add(x) Parallel(n_jobs=2)(delayed(collect)(i) for i in range(5)) print(sorted(shared_set))
[]
通過設置require='sharedmem'可以實現(xiàn)內存共享:
# require='sharedmem'表示需要共享內存,以確保多個進程可以訪問shared_set集合 Parallel(n_jobs=2, require='sharedmem')(delayed(collect)(i) for i in range(5)) print(sorted(shared_set))
[0, 1, 2, 3, 4]
上下文管理器
一些算法需要對一個并行函數(shù)進行多次連續(xù)調用,但在循環(huán)中多次調用joblib.Parallel是次優(yōu)的,因為這將多次創(chuàng)建和銷毀一組工作進程,從而導致顯著的性能開銷。
對于這種情況,使用joblib.Parallel類的上下文管理器API更為高效,可以重用同一組工作進程進行多次調用joblib.Parallel對象。如下所示:
from joblib import Parallel, delayed import math with Parallel(n_jobs=2) as parallel: accumulator = 0. n_iter = 0 while accumulator < 1000: results = parallel(delayed(math.sqrt)(accumulator + i ** 2) for i in range(5)) accumulator += sum(results) n_iter += 1 print(accumulator, n_iter)
1136.5969161564717 14
parallel_config
Joblib提供parallel_config類用于配置并行執(zhí)行的參數(shù),比如并行的后端類型、批處理大小等,這些配置可以影響后續(xù)所有的parallel實例。它通常在調用Parallel類之前使用。關于parallel_config使用見:parallel_config。
1.3 序列化
joblib.dump()和joblib.load()提供了一種替代pickle庫的方法,可以高效地序列化處理包含大量數(shù)據(jù)的任意Python對象,特別是大型的NumPy數(shù)組。關于pickle庫使用見:Python數(shù)據(jù)序列化模塊pickle使用筆記 。兩者效果對比見:
特點 | pickle | joblib |
---|---|---|
性能 | 一般 | 針對NumPy數(shù)組等大數(shù)據(jù)類型有優(yōu)化,通常更快 |
并行處理 | 不支持 | 內置并行處理功能,可以加速任務 |
內存映射 | 不支持 | 支持內存映射,可以高效處理大文件 |
壓縮 | 支持 | 支持壓縮,可以減少存儲空間 |
附加功能 | 少 | 提供了一些額外的功能,如緩存、延遲加載等 |
以下代碼展示了joblib.dump的基本使用:
from tempfile import mkdtemp # 使用mkdtemp創(chuàng)建一個臨時目錄,并將目錄路徑存儲在變量savedir中。 savedir = mkdtemp(dir='./') import os # 文件保存路徑 filename = os.path.join(savedir, 'test.joblib') import numpy as np import pandas as pd import joblib # 創(chuàng)建一個要持久化的字典 to_persist = [('a', [1, 2, 3]), ('b', np.arange(10)), ('c', pd.DataFrame(np.ones((5,5))))] # 使用joblib.dump函數(shù)將to_persist字典序列化并保存到filename指定的文件中 # 注意pickle庫無法序列化numpy數(shù)據(jù) joblib.dump(to_persist, filename)
['./tmp82ms1z5w\\test.joblib']
使用joblib.load函數(shù)從指定的文件中加載之前保存的序列化數(shù)據(jù):
joblib.load(filename)
[('a', [1, 2, 3]), ('b', array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])), ('c', 0 1 2 3 4 0 1.0 1.0 1.0 1.0 1.0 1 1.0 1.0 1.0 1.0 1.0 2 1.0 1.0 1.0 1.0 1.0 3 1.0 1.0 1.0 1.0 1.0 4 1.0 1.0 1.0 1.0 1.0)]
joblib.dump和joblib.load函數(shù)還接受文件對象:
with open(filename, 'wb') as fo: # 使用joblib將對象to_persist序列化并寫入文件 joblib.dump(to_persist, fo) with open(filename, 'rb') as fo: joblib.load(fo)
此外joblib.dump也支持設置compress參數(shù)以實現(xiàn)數(shù)據(jù)壓縮:
# compress參數(shù)為壓縮級別,取值為0到9,值越大壓縮效果越好。為0時表示不壓縮,默認值為0 joblib.dump(to_persist, filename, compress=1)
['./tmp82ms1z5w\\test.joblib']
默認情況下,joblib.dump使用zlib壓縮方法,因為它在速度和磁盤空間之間實現(xiàn)了最佳平衡。其他支持的壓縮方法包括“gzip”、“bz2”、“lzma”和“xz”。compress參數(shù)輸入帶有壓縮方法和壓縮級別就可以選擇不同壓縮方法:
joblib.dump(to_persist, filename + '.gz', compress=('gzip', 3)) joblib.load(filename + '.gz') joblib.dump(to_persist, filename + '.bz2', compress=('bz2', 5)) joblib.load(filename + '.bz2')
[('a', [1, 2, 3]), ('b', array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])), ('c', 0 1 2 3 4 0 1.0 1.0 1.0 1.0 1.0 1 1.0 1.0 1.0 1.0 1.0 2 1.0 1.0 1.0 1.0 1.0 3 1.0 1.0 1.0 1.0 1.0 4 1.0 1.0 1.0 1.0 1.0)]
除了默認壓縮方法,lz4壓縮算法也可以用于數(shù)據(jù)壓縮。前提是需要安裝lz4壓縮庫:
pip install lz4
在這些壓縮方法中,lz4和默認方法效果較好。lz4使用方式與其他壓縮方式一樣:
joblib.dump(to_persist, filename, compress=('lz4', 3))
['./tmp82ms1z5w\\test.joblib']
???????2 實例
2.1 joblib緩存和并行
本實例展示了利用joblib緩存和并行來加速任務執(zhí)行。以下代碼展示了一個高耗時任務:
# 導入time模塊,用于實現(xiàn)延時功能 import time # 定義一個模擬耗時計算的函數(shù) def costly_compute(data, column): # 休眠1秒,模擬耗時操作 time.sleep(1) # 返回傳入數(shù)據(jù)的指定列 return data[column] # 定義一個計算數(shù)據(jù)列平均值的函數(shù) def data_processing_mean(data, column): # 調用costly_compute函數(shù)獲取指定列的數(shù)據(jù) column_data = costly_compute(data, column) # 計算并返回該列數(shù)據(jù)的平均值 return column_data.mean() # 導入numpy庫,并設置隨機數(shù)生成器的種子,以保證結果的可復現(xiàn)性 import numpy as np rng = np.random.RandomState(42) # 生成1000行4列的隨機數(shù)據(jù)矩陣 data = rng.randn(int(1000), 4) # 記錄開始時間 start = time.time() # 對數(shù)據(jù)的每一列計算平均值,并將結果存儲在results列表中 results = [data_processing_mean(data, col) for col in range(data.shape[1])] # 記錄結束時間 stop = time.time() # 打印處理過程的描述信息 print('\nSequential processing') # 打印整個處理過程的耗時 print('Elapsed time for the entire processing: {:.2f} s'.format(stop - start))
Sequential processing Elapsed time for the entire processing: 4.05 s
下段代碼演示了如何使用joblib庫來緩存和并行化計算上述任務:
# 導入time模塊,用于模擬耗時操作。 import time # 定義一個使用緩存的函數(shù),用于計算數(shù)據(jù)的均值。 def data_processing_mean_using_cache(data, column): return costly_compute_cached(data, column).mean() # 從joblib庫導入Memory類,用于緩存函數(shù)的輸出。 from joblib import Memory # 設置緩存的存儲位置和詳細程度 location = './cachedir' memory = Memory(location, verbose=0) # 使用Memory對象的cache方法來緩存costly_compute函數(shù)的輸出。 costly_compute_cached = memory.cache(costly_compute) # 從joblib庫導入Parallel和delayed類,用于并行執(zhí)行函數(shù)。 from joblib import Parallel, delayed # 記錄開始時間。 start = time.time() # 使用Parallel類并行執(zhí)行data_processing_mean_using_cache函數(shù),對數(shù)據(jù)的每一列進行處理。 results = Parallel(n_jobs=2)( delayed(data_processing_mean_using_cache)(data, col) for col in range(data.shape[1])) # 記錄結束時間。 stop = time.time() # 打印第一輪處理的耗時信息,包括緩存數(shù)據(jù)的時間。 print('\nFirst round - caching the data') print('Elapsed time for the entire processing: {:.2f} s'.format(stop - start))
First round - caching the data Elapsed time for the entire processing: 2.05 s
再次執(zhí)行相同的過程,可以看到結果被緩存而不是重新執(zhí)行函數(shù):
start = time.time() results = Parallel(n_jobs=2)( delayed(data_processing_mean_using_cache)(data, col) for col in range(data.shape[1])) stop = time.time() print('\nSecond round - reloading from the cache') print('Elapsed time for the entire processing: {:.2f} s'.format(stop - start)) # 如果不想使用緩存結果,可以清除緩存信息 memory.clear(warn=False)
Second round - reloading from the cache Elapsed time for the entire processing: 0.02 s
???????2.2 序列化
以下示例展示了在joblib.Parallel中使用序列化內存映射(numpy.memmap)。內存映射可以將大型數(shù)據(jù)集分割成小塊,并在需要時將其加載到內存中。這種方法可以減少內存使用,并提高處理速度。
定義耗時函數(shù):
import numpy as np data = np.random.random((int(1e7),)) window_size = int(5e5) slices = [slice(start, start + window_size) for start in range(0, data.size - window_size, int(1e5))] import time def slow_mean(data, sl): time.sleep(0.01) return data[sl].mean()
以下代碼是直接調用函數(shù)的運行結果:
tic = time.time() results = [slow_mean(data, sl) for sl in slices] toc = time.time() print('\nElapsed time computing the average of couple of slices {:.2f} s'.format(toc - tic))
Elapsed time computing the average of couple of slices 1.49 s
以下代碼是調用Parallel類2個進程運行的結果,由于整體任務計算耗時較少。所以Parallel類并行計算并沒有比直接調用函數(shù)有太多速度優(yōu)勢,因為進程啟動銷毀需要額外時間:
from joblib import Parallel, delayed tic = time.time() results = Parallel(n_jobs=2)(delayed(slow_mean)(data, sl) for sl in slices) toc = time.time() print('\nElapsed time computing the average of couple of slices {:.2f} s'.format(toc - tic))
Elapsed time computing the average of couple of slices 1.00 s
以下代碼提供了joblib.dump和load函數(shù)加速數(shù)據(jù)讀取。其中dump函數(shù)用于將data對象序列化并保存到磁盤上的文件中,同時創(chuàng)建了一個內存映射,使得該文件可以像內存數(shù)組一樣被訪問。當程序再次加載這個文件時,可以使用load函數(shù)以內存映射模式打開:
import os from joblib import dump, load # 從joblib庫導入dump和load函數(shù),用于創(chuàng)建和加載內存映射文件 # 設置內存映射文件的文件夾路徑 folder = './memmap' os.makedirs(folder, exist_ok = True) # 將內存映射文件的名稱與路徑結合 data_filename_memmap = os.path.join(folder, 'data_memmap.joblib') # 使用dump函數(shù)將數(shù)據(jù)對象'data'保存到內存映射文件 dump(data, data_filename_memmap) # 使用load函數(shù)加載內存映射文件,mmap_mode='r'表示以只讀模式打開 data_ = load(data_filename_memmap, mmap_mode='r') # 記錄開始時間 tic = time.time() results = Parallel(n_jobs=2)(delayed(slow_mean)(data_, sl) for sl in slices) # 記錄結束時間 toc = time.time() print('\nElapsed time computing the average of couple of slices {:.2f} s\n'.format(toc - tic)) import shutil # 結束時刪除映射文件 try: shutil.rmtree(folder) except: pass
Elapsed time computing the average of couple of slices 0.77 s
???????2.3 內存監(jiān)視
本實例展示不同并行方式的內存消耗情況。
創(chuàng)建內存監(jiān)視器
from psutil import Process from threading import Thread class MemoryMonitor(Thread): """在單獨的線程中監(jiān)控內存使用情況(以MB為單位)。""" def __init__(self): super().__init__() # 調用父類Thread的構造函數(shù) self.stop = False # 用于控制線程停止的標記 self.memory_buffer = [] # 用于存儲內存使用記錄的列表 self.start() # 啟動線程 def get_memory(self): """獲取進程及其子進程的內存使用情況。""" p = Process() # 獲取當前進程 memory = p.memory_info().rss # 獲取當前進程的內存使用量 for c in p.children(): # 遍歷所有子進程 memory += c.memory_info().rss # 累加子進程的內存使用量 return memory def run(self): """線程運行的主體方法,周期性地記錄內存使用情況。""" memory_start = self.get_memory() # 獲取初始內存使用量 while not self.stop: # 當未設置停止標記時循環(huán) self.memory_buffer.append(self.get_memory() - memory_start) # 記錄當前內存使用量與初始內存使用量的差值 time.sleep(0.2) # 休眠0.2秒 def join(self): """重寫join方法,設置停止標記并等待線程結束。""" self.stop = True # 設置停止標記 super().join() # 調用父類方法等待線程結束
并行任務
結果返回list的并行任務:
import time import numpy as np def return_big_object(i): """生成并返回一個大型NumPy數(shù)組對象。""" time.sleep(.1) # 休眠0.1秒模擬耗時操作 return i * np.ones((10000, 200), dtype=np.float64) def accumulator_sum(generator): """累加生成器生成的所有值,并打印進度。""" result = 0 for value in generator: result += value # print(".", end="", flush=True) # 打印點號并刷新輸出 # print("") # 打印換行符 return result from joblib import Parallel, delayed monitor = MemoryMonitor() # 創(chuàng)建內存監(jiān)控器實例,并啟動監(jiān)視 print("Running tasks with return_as='list'...") # 打印啟動任務信息 res = Parallel(n_jobs=2, return_as="list")( delayed(return_big_object)(i) for i in range(150) # 使用joblib的Parallel功能并行執(zhí)行任務 ) res = accumulator_sum(res) # 累加結果 print('All tasks completed and reduced successfully.') # 打印任務完成信息 # 報告內存使用情況 del res # 清理結果以避免內存邊界效應 monitor.join() # 等待內存監(jiān)控線程結束 peak = max(monitor.memory_buffer) / 1e9 # 計算峰值內存使用量,并轉換為GB print(f"Peak memory usage: {peak:.2f}GB") # 打印峰值內存使用量
Running tasks with return_as='list'... All tasks completed and reduced successfully. Peak memory usage: 2.44GB
如果改為輸出生成器,那么內存使用量將會大大減少:
monitor_gen = MemoryMonitor() # 創(chuàng)建內存監(jiān)控器實例,并啟動監(jiān)視 print("Running tasks with return_as='generator'...") # 打印啟動任務信息 res = Parallel(n_jobs=2, return_as="generator")( delayed(return_big_object)(i) for i in range(150) ) res = accumulator_sum(res) # 累加結果 print('All tasks completed and reduced successfully.') # 打印任務完成信息 # 報告內存使用情況 del res # 清理結果以避免內存邊界效應 monitor_gen.join() # 等待內存監(jiān)控線程結束 peak = max(monitor_gen.memory_buffer) / 1e9 # 計算峰值內存使用量,并轉換為GB print(f"Peak memory usage: {peak:.2f}GB") # 打印峰值內存使用量
Running tasks with return_as='generator'... All tasks completed and reduced successfully. Peak memory usage: 0.19GB
下圖展示了以上兩種方法的內存消耗情況,第一種情況涉及到將所有結果存儲在內存中,直到處理完成,這可能導致內存使用量隨著時間線性增長。而第二種情況generator則涉及到流式處理,即結果被實時處理,因此不需要同時在內存中存儲所有結果,從而減少了內存使用的需求:
import matplotlib.pyplot as plt plt.figure(0) plt.semilogy( np.maximum.accumulate(monitor.memory_buffer), label='return_as="list"' ) plt.semilogy( np.maximum.accumulate(monitor_gen.memory_buffer), label='return_as="generator"' ) plt.xlabel("Time") plt.xticks([], []) plt.ylabel("Memory usage") plt.yticks([1e7, 1e8, 1e9], ['10MB', '100MB', '1GB']) plt.legend() plt.show()
進一步節(jié)省內存
前一個例子中的生成器是保持任務提交的順序的。如果某些進程任務提交晚,但比其他任務更早完成。相應的結果會保持在內存中,以等待其他任務完成。如果任務對結果返回順序無要求,例如最后只是對所有結果求和,可以使用generator_unordered減少內存消耗。如下所示:
# 創(chuàng)建一個每個任務耗時可能不同的處理函數(shù) def return_big_object_delayed(i): if (i + 20) % 60: time.sleep(0.1) else: time.sleep(5) return i * np.ones((10000, 200), dtype=np.float64)
返回為generator格式的內存使用:
monitor_delayed_gen = MemoryMonitor() print("Create result generator on delayed tasks with return_as='generator'...") res = Parallel(n_jobs=2, return_as="generator")( delayed(return_big_object_delayed)(i) for i in range(150) ) res = accumulator_sum(res) print('All tasks completed and reduced successfully.') del res monitor_delayed_gen.join() peak = max(monitor_delayed_gen.memory_buffer) / 1e6 print(f"Peak memory usage: {peak:.2f}MB")
Create result generator on delayed tasks with return_as='generator'... All tasks completed and reduced successfully. Peak memory usage: 784.23MB
返回為generator_unordered格式的內存使用:
monitor_delayed_gen_unordered = MemoryMonitor() print( "Create result generator on delayed tasks with " "return_as='generator_unordered'..." ) res = Parallel(n_jobs=2, return_as="generator_unordered")( delayed(return_big_object_delayed)(i) for i in range(150) ) res = accumulator_sum(res) print('All tasks completed and reduced successfully.') del res monitor_delayed_gen_unordered.join() peak = max(monitor_delayed_gen_unordered.memory_buffer) / 1e6 print(f"Peak memory usage: {peak:.2f}MB")
Create result generator on delayed tasks with return_as='generator_unordered'... All tasks completed and reduced successfully. Peak memory usage: 175.22MB
內存使用結果對比如下。基于generator_unordered選項在執(zhí)行任務時,能夠獨立地處理每個任務,而不需要依賴于其他任務的完成狀態(tài)。但是要注意的是由于系統(tǒng)負載、后端實現(xiàn)等多種可能影響任務執(zhí)行順序的因素,結果的返回順序是不確定的:
plt.figure(1) plt.semilogy( np.maximum.accumulate(monitor_delayed_gen.memory_buffer), label='return_as="generator"' ) plt.semilogy( np.maximum.accumulate(monitor_delayed_gen_unordered.memory_buffer), label='return_as="generator_unordered"' ) plt.xlabel("Time") plt.xticks([], []) plt.ylabel("Memory usage") plt.yticks([1e7, 1e8, 1e9], ['10MB', '100MB', '1GB']) plt.legend() plt.show()
3 參考
本文來自博客園,作者:落痕的寒假,轉載請注明原文鏈接:https://www.cnblogs.com/luohenyueji/p/18351959
到此這篇關于Python并行計算庫Joblib使用指北的文章就介紹到這了,更多相關Python Joblib使用內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Django框架ORM操作數(shù)據(jù)庫不生效問題示例解決方法
本文詳細描述使用Django 的ORM框架操作PostgreSQL數(shù)據(jù)庫刪除不生效問題的定位過程及解決方案,并總結使用ORM框架操作數(shù)據(jù)庫不生效的問題的通用定位方法,感興趣的朋友跟隨小編一起看看吧2023-01-01Python使用Numpy實現(xiàn)Kmeans算法的步驟詳解
將物理或抽象對象的集合分成由類似的對象組成的多個類的過程被稱為聚類。這篇文章主要介紹了Python使用Numpy實現(xiàn)Kmeans算法,需要的朋友可以參考下2021-11-11python實現(xiàn)本地批量ping多個IP的方法示例
這篇文章主要介紹了python實現(xiàn)本地批量ping多個IP的方法示例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-08-08Python連接MySQL數(shù)據(jù)庫并查找表信息
本文主要介紹了Python連接MySQL數(shù)據(jù)庫并查找表信息,通過使用Python中的MySQL Connector模塊,連接到MySQL服務器并執(zhí)行SQL查詢語句,可以獲取表的結構、列信息、行數(shù)據(jù)等,感興趣的可以了解一下2023-08-08