Python并行計(jì)算庫(kù)Joblib高效使用指北
Joblib是用于高效并行計(jì)算的Python開源庫(kù),其提供了簡(jiǎn)單易用的內(nèi)存映射和并行計(jì)算的工具,以將任務(wù)分發(fā)到多個(gè)工作進(jìn)程中。Joblib庫(kù)特別適合用于需要進(jìn)行重復(fù)計(jì)算或大規(guī)模數(shù)據(jù)處理的任務(wù)。Joblib庫(kù)的官方倉(cāng)庫(kù)見:joblib,官方文檔見:joblib-doc。
Jolib庫(kù)安裝代碼如下:
pip install joblib
# 查看版本 import joblib joblib.__version__
'1.4.2'
1 使用說(shuō)明
Joblib庫(kù)主要功能涵蓋以下三大塊:
- 記憶模式:Memory類將函數(shù)的返回值緩存到磁盤。下次調(diào)用時(shí),如果輸入?yún)?shù)不變,就直接從緩存中加載結(jié)果,避免重復(fù)計(jì)算。
- 并行計(jì)算:Parallel類將任務(wù)拆分到多個(gè)進(jìn)程或者線程中并行執(zhí)行,加速計(jì)算過程。
- 高效的序列化:針對(duì)NumPy數(shù)組等大型數(shù)據(jù)對(duì)象進(jìn)行了優(yōu)化,且序列化和反序列化速度快。
1.1 Memory類
Joblib庫(kù)的Memory類支持通過記憶模式,將函數(shù)的計(jì)算結(jié)果存儲(chǔ)起來(lái),以便在下次使用時(shí)直接調(diào)用。這種機(jī)制的優(yōu)勢(shì)在于加速計(jì)算過程、節(jié)約資源以及簡(jiǎn)化管理。
Memory類構(gòu)造函數(shù)如下:
class joblib.Memory(location=None, backend='local', mmap_mode=None, compress=False, verbose=1, bytes_limit=None, backend_options=None)
參數(shù)介紹如下:
- location: 緩存文件的存放位置。如果設(shè)置為 None,則不緩存。
- backend: 緩存的后端存儲(chǔ)方式。默認(rèn)是 "local",表示使用本地文件系統(tǒng)。
- mmap_mode: 一個(gè)字符串,表示內(nèi)存映射文件的模式(None, ‘r+’, ‘r’, ‘w+’, ‘c’)。
- compress: 表示是否壓縮緩存文件。壓縮可以節(jié)省磁盤空間,但會(huì)增加 I/O 操作的時(shí)間。
- verbose: 一個(gè)整數(shù),表示日志的詳細(xì)程度。0 表示沒有輸出,1 表示只輸出警告,2 表示輸出信息,3 表示輸出調(diào)試信息。
- bytes_limit: 一個(gè)整數(shù)或 None,表示緩存使用的字節(jié)數(shù)限制。如果緩存超過了這個(gè)限制,最舊的緩存文件將被刪除。
- backend_options: 傳遞給緩存后端的選項(xiàng)。
Memory類簡(jiǎn)單使用
下面代碼展示第一次調(diào)用函數(shù)并緩存結(jié)果:
from joblib import Memory import os, shutil # 創(chuàng)建一個(gè)Memory對(duì)象,指定緩存目錄為當(dāng)前目錄下的run文件夾 # verbose=0表示關(guān)閉詳細(xì)輸出 cachedir = './run' if os.path.exists(cachedir): shutil.rmtree(cachedir) memory = Memory(cachedir, verbose=0) # 使用@memory.cache裝飾器,將函數(shù)f的結(jié)果緩存起來(lái) @memory.cache def f(x): # 只有當(dāng)函數(shù)的輸入?yún)?shù)x沒有被緩存時(shí),才會(huì)執(zhí)行函數(shù)體內(nèi)的代碼 print('Running f(%s)' % x) return x # 第一次調(diào)用f(1),會(huì)執(zhí)行函數(shù)體內(nèi)的代碼,并將結(jié)果緩存起來(lái) print(f(1))
Running f(1) 1
第二次調(diào)用函數(shù):
# 第二次調(diào)用f(1),由于結(jié)果已經(jīng)被緩存,不會(huì)再次執(zhí)行函數(shù)體內(nèi)的代碼,而是直接從緩存中讀取結(jié)果 print(f(1))
1
調(diào)用其他函數(shù):
# 調(diào)用f(2),由于輸入?yún)?shù)不同,會(huì)再次執(zhí)行函數(shù)體內(nèi)的代碼,并將結(jié)果緩存起來(lái) print(f(2))
Running f(2) 2
將Memory類應(yīng)用于numpy數(shù)組
import numpy as np from joblib import Memory import os, shutil cachedir = './run' if os.path.exists(cachedir): shutil.rmtree(cachedir) memory = Memory(cachedir, verbose=0) @memory.cache def g(x): print('A long-running calculation, with parameter %s' % x) # 返回漢明窗 return np.hamming(x) @memory.cache def h(x): print('A second long-running calculation, using g(x)') # 生成范德蒙德矩陣 return np.vander(x) # 調(diào)用函數(shù)g,傳入?yún)?shù)3,并將結(jié)果存儲(chǔ)在變量a中 a = g(3) # 打印變量a的值 print(a) # 再次調(diào)用函數(shù)g,傳入相同的參數(shù)3,由于結(jié)果已被緩存,不會(huì)重新計(jì)算 print(g(3))
A long-running calculation, with parameter 3 [0.08 1. 0.08] [0.08 1. 0.08]
直接計(jì)算和緩存結(jié)果是等同的:
# 調(diào)用函數(shù)h,傳入變量a作為參數(shù),并將結(jié)果存儲(chǔ)在變量b中 b = h(a) # 再次調(diào)用函數(shù)h,傳入相同的參數(shù)a,由于結(jié)果已被緩存,不會(huì)重新計(jì)算 b2 = h(a) # 使用numpy的allclose函數(shù)檢查b和b2是否足夠接近,即它們是否相等 print(np.allclose(b, b2))
A second long-running calculation, using g(x) True
直接調(diào)用緩存結(jié)果
import numpy as np from joblib import Memory import os, shutil # 設(shè)置緩存目錄的路徑。 cachedir = './run' # 檢查緩存目錄是否存在。 if os.path.exists(cachedir): # 如果緩存目錄存在,使用shutil.rmtree刪除該目錄及其內(nèi)容。 shutil.rmtree(cachedir) # 初始化Memory對(duì)象,設(shè)置緩存目錄為上面定義的cachedir,mmap_mode設(shè)置為'r',表示只讀模式。 memory = Memory(cachedir, mmap_mode='r', verbose=0) # 使用memory.cache裝飾器緩存np.square函數(shù)的結(jié)果。 square = memory.cache(np.square) a = np.vander(np.arange(3)).astype(float) # 打印通過square函數(shù)處理后的矩陣a。 print(square(a)) # 獲取a的緩存結(jié)果 result = square.call_and_shelve(a) print(result.get()) # 獲取并打印緩存的結(jié)果。
[[ 0. 0. 1.] [ 1. 1. 1.] [16. 4. 1.]] [[ 0. 0. 1.] [ 1. 1. 1.] [16. 4. 1.]]
類中使用緩存
Memory類不建議將其直接用于類方法。如果想在類中使用緩存,建議的模式是在類中使用單獨(dú)定義的緩存函數(shù),如下所示:
@memory.cache def compute_func(arg1, arg2, arg3): pass class Foo(object): def __init__(self, args): self.data = None def compute(self): # 類中調(diào)用緩存的函數(shù) self.data = compute_func(self.arg1, self.arg2, 40)
1.2 Parallel類
Joblib庫(kù)的Parallel類用于簡(jiǎn)單快速將任務(wù)分解為多個(gè)子任務(wù),并分配到不同的CPU核心或機(jī)器上執(zhí)行,從而顯著提高程序的運(yùn)行效率。
Parallel類構(gòu)造函數(shù)及主要參數(shù)如下:
class joblib.Parallel(n_jobs=default(None), backend=default(None), return_as='list', verbose=default(0), timeout=None, batch_size='auto', pre_dispatch='2 * n_jobs', temp_folder=default(None), max_nbytes=default('1M'), require=default(None))
參數(shù)介紹如下:
- n_jobs: 指定并行任務(wù)的數(shù)量,為-1時(shí)表示使用所有可用的CPU核心;為None時(shí)表示使用單個(gè)進(jìn)程。
- backend:指定并行化的后端,可選項(xiàng):
- 'loky':使用loky庫(kù)實(shí)現(xiàn)多進(jìn)程,該庫(kù)由joblib開發(fā)者開發(fā),默認(rèn)選項(xiàng)。
- 'threading':使用threading庫(kù)實(shí)現(xiàn)多線程。
- 'multiprocessing':使用multiprocessing庫(kù)實(shí)現(xiàn)多進(jìn)程。
- return_as:返回結(jié)果格式,可選項(xiàng):
- 'list:列表。
- generator:按照任務(wù)提交順序生成結(jié)果的生成器。
- generator_unordered:按照?qǐng)?zhí)行結(jié)果完成先后順序的生成器。
- verbose: 一個(gè)整數(shù),表示日志的詳細(xì)程度。0 表示沒有輸出,1 表示只輸出警告,2 表示輸出信息,3 表示輸出調(diào)試信息。
- timeout:?jiǎn)蝹€(gè)任務(wù)最大運(yùn)行時(shí)長(zhǎng),超時(shí)將引發(fā)TimeOutError。僅適用于n_jobs不為1的情況。
- batch_size:當(dāng)Parallel類執(zhí)行任務(wù)時(shí),會(huì)將任務(wù)分批處理。batch_size參數(shù)決定了每個(gè)批次中包含的任務(wù)數(shù)。
- pre_dispatch: 用來(lái)決定在并行計(jì)算開始之前,每個(gè)批次有多少個(gè)任務(wù)會(huì)被預(yù)先準(zhǔn)備好并等待被分配給單個(gè)工作進(jìn)程。默認(rèn)值為“2*n_jobs”,表示并行計(jì)算時(shí)可以使用2倍工作進(jìn)程的任務(wù)數(shù)量。
- temp_folder:指定臨時(shí)文件的存儲(chǔ)路徑。
- max_nbytes:傳遞給工作程序的數(shù)組大小的閾值。
- require:對(duì)運(yùn)行任務(wù)的要求,可選None和sharedmem。sharedmem表示將使用共享內(nèi)存來(lái)執(zhí)行并行任務(wù),但會(huì)影響計(jì)算性能。
簡(jiǎn)單示例
以下代碼展示了單線程直接運(yùn)行計(jì)算密集型任務(wù)結(jié)果:
from joblib import Parallel, delayed import numpy as np import time start = time.time() # 定義一個(gè)計(jì)算密集型函數(shù) def compute_heavy_task(data): # 模擬處理時(shí)間 time.sleep(1) # 數(shù)值計(jì)算 result = np.sum(np.square(data)) return result # 生成一些模擬數(shù)據(jù) # 設(shè)置隨機(jī)數(shù)生成器的種子 np.random.seed(42) data = np.random.rand(10, 1000) # 10個(gè)1000維的向量 results = [compute_heavy_task(d) for d in data] # 打印結(jié)果的和 print(f"結(jié)果: {sum(results)}") print(f"耗時(shí):{time.time()-start}s")
結(jié)果: 3269.16485027708
耗時(shí):10.101513624191284s
以下代碼展示利用Parallel類創(chuàng)建多進(jìn)程運(yùn)行計(jì)算密集型任務(wù)結(jié)果:
from joblib import Parallel, delayed import numpy as np import time start = time.time() # 定義一個(gè)計(jì)算密集型函數(shù) def compute_heavy_task(data): # 模擬處理時(shí)間 time.sleep(1) # 數(shù)值計(jì)算 result = np.sum(np.square(data)) return result # 設(shè)置隨機(jī)數(shù)生成器的種子 np.random.seed(42) # 生成一些模擬數(shù)據(jù) data = np.random.rand(10, 1000) # 10個(gè)1000維的向量 # 使用Parallel來(lái)并行執(zhí)行任務(wù) results = Parallel(n_jobs=8, return_as="generator")(delayed(compute_heavy_task)(d) for d in data) # 打印結(jié)果的和 print(f"結(jié)果: {sum(results)}") print(f"耗時(shí):{time.time()-start}s")
結(jié)果: 3269.16485027708
耗時(shí):2.381772041320801s
可以看到j(luò)oblib庫(kù)利用多進(jìn)程技術(shù)顯著提高了任務(wù)執(zhí)行的效率。然而,當(dāng)面對(duì)I/O密集型任務(wù)或執(zhí)行時(shí)間極短的任務(wù)時(shí),多線程或多進(jìn)程的優(yōu)勢(shì)可能并不明顯。這是因?yàn)榫€程創(chuàng)建和上下文切換的開銷有時(shí)可能超過任務(wù)本身的執(zhí)行時(shí)間。以上述的compute_heavy_task函數(shù)為例,如果移除了其中的time.sleep函數(shù),多進(jìn)程執(zhí)行所需的時(shí)間將會(huì)顯著增加。
此外獲取當(dāng)前系統(tǒng)的cpu核心數(shù)(邏輯處理器)代碼如下:
import joblib # 獲取當(dāng)前系統(tǒng)的cpu核心數(shù) n_cores = joblib.cpu_count() print(f'系統(tǒng)的核心數(shù)是:{n_cores}')
系統(tǒng)的核心數(shù)是:16
不同并行方式對(duì)比
以下代碼展示了不同并行方式在Parallel類中的應(yīng)用。默認(rèn)使用loky多進(jìn)程:
# 使用loky多進(jìn)程 from joblib import Parallel, delayed import numpy as np import time start = time.time() # 定義一個(gè)計(jì)算密集型函數(shù) def compute_heavy_task(data): # 模擬處理時(shí)間 time.sleep(1) # 數(shù)值計(jì)算 result = np.sum(np.square(data)) return result # 生成一些模擬數(shù)據(jù) data = np.random.rand(10, 1000) # 10個(gè)1000維的向量 results = Parallel(n_jobs=8, return_as="generator", backend='loky')(delayed(compute_heavy_task)(d) for d in data) # 打印結(jié)果的和 print(f"結(jié)果: {sum(results)}") print(f"耗時(shí):{time.time()-start}s")
結(jié)果: 3382.3336437893217
耗時(shí):2.042675256729126s
以下代碼展示了threading多線程的使用,注意由于Python的全局解釋器鎖(GIL)確保在任何時(shí)刻只有一個(gè)線程執(zhí)行Python字節(jié)碼。這表明即使在多核處理器上,Python的線程也無(wú)法實(shí)現(xiàn)真正的并行計(jì)算。然而,當(dāng)涉及到處理I/O密集型任務(wù)或需要快速響應(yīng)的小規(guī)模任務(wù)時(shí),多線程依然具有優(yōu)勢(shì):
# 使用threading多線程 start = time.time() results = Parallel(n_jobs=8, return_as="generator", backend = 'threading')(delayed(compute_heavy_task)(d) for d in data) # 打印結(jié)果的和 print(f"結(jié)果: {sum(results)}") print(f"耗時(shí):{time.time()-start}s")
結(jié)果: 3382.3336437893217
耗時(shí):2.040527105331421s
以下代碼展示了multiprocessing多進(jìn)程的使用,注意Windows下需要將multiprocessing相關(guān)代碼放在main函數(shù)中:
from joblib import Parallel, delayed import numpy as np import time # 定義一個(gè)計(jì)算密集型函數(shù) def compute_heavy_task(data): # 模擬處理時(shí)間 time.sleep(1) # 數(shù)值計(jì)算 result = np.sum(np.square(data)) return result def main(): start = time.time() # 生成一些模擬數(shù)據(jù) data = np.random.rand(10, 1000) # 10個(gè)1000維的向量 # multiprocessing不支持返回rgenerator results = Parallel(n_jobs=8, return_as="list", backend='multiprocessing')(delayed(compute_heavy_task)(d) for d in data) # 打印結(jié)果的和 print(f"結(jié)果: {sum(results)}") print(f"耗時(shí):{time.time()-start}s") if __name__ == '__main__': main()
結(jié)果: 3304.6651996375645
耗時(shí):2.4303956031799316s
以下是loky
、threading
和 multiprocessing
的一些關(guān)鍵特性對(duì)比:
特性/庫(kù) | loky | threading | multiprocessing |
---|---|---|---|
適用平臺(tái) | 跨平臺(tái) | 跨平臺(tái) | 跨平臺(tái),但Windows上存在限制 |
進(jìn)程/線程模型 | 進(jìn)程 | 線程 | 進(jìn)程 |
GIL影響 | 無(wú) | 有 | 無(wú) |
適用場(chǎng)景 | CPU密集型任務(wù) | I/O密集型任務(wù) | CPU密集型任務(wù) |
啟動(dòng)開銷 | 較小 | 較小 | 較大 |
內(nèi)存使用 | 較高 | 較低 | 較高 |
進(jìn)程間通信 | 通過管道、隊(duì)列等 | 通過共享數(shù)據(jù)結(jié)構(gòu) | 通過管道、隊(duì)列等 |
線程間通信 | 共享數(shù)據(jù)結(jié)構(gòu) | 共享數(shù)據(jù)結(jié)構(gòu) | 不適用 |
異常處理 | 進(jìn)程間獨(dú)立 | 線程間共享 | 進(jìn)程間獨(dú)立 |
調(diào)試難度 | 較高 | 較低 | 較高 |
適用框架 | 通用 | 通用 | 通用 |
Python中線程和進(jìn)程簡(jiǎn)單對(duì)比如下:
- 資源共享:線程共享同一進(jìn)程的內(nèi)存和資源,而進(jìn)程擁有獨(dú)立的內(nèi)存空間。
- GIL影響:線程受GIL限制,進(jìn)程不受GIL限制。
- 開銷:線程的創(chuàng)建和切換開銷小,進(jìn)程的創(chuàng)建和切換開銷大。
- 適用性:線程適合I/O密集型任務(wù),進(jìn)程適合CPU密集型任務(wù)。
- 通信:線程間通信簡(jiǎn)單但需要處理同步問題,進(jìn)程間通信復(fù)雜但天然隔離。
在實(shí)際應(yīng)用中,選擇使用線程還是進(jìn)程取決于任務(wù)的特性和性能需求。如果任務(wù)主要是I/O密集型,使用線程可以提高性能;如果任務(wù)是CPU密集型,使用進(jìn)程可以更好地利用多核處理器的計(jì)算能力。
共享內(nèi)存
默認(rèn)情況下,Parallel類執(zhí)行任務(wù)時(shí)各個(gè)任務(wù)不共享內(nèi)存,如下所示:
from joblib import Parallel, delayed shared_set = set() def collect(x): shared_set.add(x) Parallel(n_jobs=2)(delayed(collect)(i) for i in range(5)) print(sorted(shared_set))
[]
通過設(shè)置require='sharedmem'可以實(shí)現(xiàn)內(nèi)存共享:
# require='sharedmem'表示需要共享內(nèi)存,以確保多個(gè)進(jìn)程可以訪問shared_set集合 Parallel(n_jobs=2, require='sharedmem')(delayed(collect)(i) for i in range(5)) print(sorted(shared_set))
[0, 1, 2, 3, 4]
上下文管理器
一些算法需要對(duì)一個(gè)并行函數(shù)進(jìn)行多次連續(xù)調(diào)用,但在循環(huán)中多次調(diào)用joblib.Parallel是次優(yōu)的,因?yàn)檫@將多次創(chuàng)建和銷毀一組工作進(jìn)程,從而導(dǎo)致顯著的性能開銷。
對(duì)于這種情況,使用joblib.Parallel類的上下文管理器API更為高效,可以重用同一組工作進(jìn)程進(jìn)行多次調(diào)用joblib.Parallel對(duì)象。如下所示:
from joblib import Parallel, delayed import math with Parallel(n_jobs=2) as parallel: accumulator = 0. n_iter = 0 while accumulator < 1000: results = parallel(delayed(math.sqrt)(accumulator + i ** 2) for i in range(5)) accumulator += sum(results) n_iter += 1 print(accumulator, n_iter)
1136.5969161564717 14
parallel_config
Joblib提供parallel_config類用于配置并行執(zhí)行的參數(shù),比如并行的后端類型、批處理大小等,這些配置可以影響后續(xù)所有的parallel實(shí)例。它通常在調(diào)用Parallel類之前使用。關(guān)于parallel_config使用見:parallel_config。
1.3 序列化
joblib.dump()和joblib.load()提供了一種替代pickle庫(kù)的方法,可以高效地序列化處理包含大量數(shù)據(jù)的任意Python對(duì)象,特別是大型的NumPy數(shù)組。關(guān)于pickle庫(kù)使用見:Python數(shù)據(jù)序列化模塊pickle使用筆記 。兩者效果對(duì)比見:
特點(diǎn) | pickle | joblib |
---|---|---|
性能 | 一般 | 針對(duì)NumPy數(shù)組等大數(shù)據(jù)類型有優(yōu)化,通常更快 |
并行處理 | 不支持 | 內(nèi)置并行處理功能,可以加速任務(wù) |
內(nèi)存映射 | 不支持 | 支持內(nèi)存映射,可以高效處理大文件 |
壓縮 | 支持 | 支持壓縮,可以減少存儲(chǔ)空間 |
附加功能 | 少 | 提供了一些額外的功能,如緩存、延遲加載等 |
以下代碼展示了joblib.dump的基本使用:
from tempfile import mkdtemp # 使用mkdtemp創(chuàng)建一個(gè)臨時(shí)目錄,并將目錄路徑存儲(chǔ)在變量savedir中。 savedir = mkdtemp(dir='./') import os # 文件保存路徑 filename = os.path.join(savedir, 'test.joblib') import numpy as np import pandas as pd import joblib # 創(chuàng)建一個(gè)要持久化的字典 to_persist = [('a', [1, 2, 3]), ('b', np.arange(10)), ('c', pd.DataFrame(np.ones((5,5))))] # 使用joblib.dump函數(shù)將to_persist字典序列化并保存到filename指定的文件中 # 注意pickle庫(kù)無(wú)法序列化numpy數(shù)據(jù) joblib.dump(to_persist, filename)
['./tmp82ms1z5w\\test.joblib']
使用joblib.load函數(shù)從指定的文件中加載之前保存的序列化數(shù)據(jù):
joblib.load(filename)
[('a', [1, 2, 3]), ('b', array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])), ('c', 0 1 2 3 4 0 1.0 1.0 1.0 1.0 1.0 1 1.0 1.0 1.0 1.0 1.0 2 1.0 1.0 1.0 1.0 1.0 3 1.0 1.0 1.0 1.0 1.0 4 1.0 1.0 1.0 1.0 1.0)]
joblib.dump和joblib.load函數(shù)還接受文件對(duì)象:
with open(filename, 'wb') as fo: # 使用joblib將對(duì)象to_persist序列化并寫入文件 joblib.dump(to_persist, fo) with open(filename, 'rb') as fo: joblib.load(fo)
此外joblib.dump也支持設(shè)置compress參數(shù)以實(shí)現(xiàn)數(shù)據(jù)壓縮:
# compress參數(shù)為壓縮級(jí)別,取值為0到9,值越大壓縮效果越好。為0時(shí)表示不壓縮,默認(rèn)值為0 joblib.dump(to_persist, filename, compress=1)
['./tmp82ms1z5w\\test.joblib']
默認(rèn)情況下,joblib.dump使用zlib壓縮方法,因?yàn)樗谒俣群痛疟P空間之間實(shí)現(xiàn)了最佳平衡。其他支持的壓縮方法包括“gzip”、“bz2”、“lzma”和“xz”。compress參數(shù)輸入帶有壓縮方法和壓縮級(jí)別就可以選擇不同壓縮方法:
joblib.dump(to_persist, filename + '.gz', compress=('gzip', 3)) joblib.load(filename + '.gz') joblib.dump(to_persist, filename + '.bz2', compress=('bz2', 5)) joblib.load(filename + '.bz2')
[('a', [1, 2, 3]), ('b', array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])), ('c', 0 1 2 3 4 0 1.0 1.0 1.0 1.0 1.0 1 1.0 1.0 1.0 1.0 1.0 2 1.0 1.0 1.0 1.0 1.0 3 1.0 1.0 1.0 1.0 1.0 4 1.0 1.0 1.0 1.0 1.0)]
除了默認(rèn)壓縮方法,lz4壓縮算法也可以用于數(shù)據(jù)壓縮。前提是需要安裝lz4壓縮庫(kù):
pip install lz4
在這些壓縮方法中,lz4和默認(rèn)方法效果較好。lz4使用方式與其他壓縮方式一樣:
joblib.dump(to_persist, filename, compress=('lz4', 3))
['./tmp82ms1z5w\\test.joblib']
???????2 實(shí)例
2.1 joblib緩存和并行
本實(shí)例展示了利用joblib緩存和并行來(lái)加速任務(wù)執(zhí)行。以下代碼展示了一個(gè)高耗時(shí)任務(wù):
# 導(dǎo)入time模塊,用于實(shí)現(xiàn)延時(shí)功能 import time # 定義一個(gè)模擬耗時(shí)計(jì)算的函數(shù) def costly_compute(data, column): # 休眠1秒,模擬耗時(shí)操作 time.sleep(1) # 返回傳入數(shù)據(jù)的指定列 return data[column] # 定義一個(gè)計(jì)算數(shù)據(jù)列平均值的函數(shù) def data_processing_mean(data, column): # 調(diào)用costly_compute函數(shù)獲取指定列的數(shù)據(jù) column_data = costly_compute(data, column) # 計(jì)算并返回該列數(shù)據(jù)的平均值 return column_data.mean() # 導(dǎo)入numpy庫(kù),并設(shè)置隨機(jī)數(shù)生成器的種子,以保證結(jié)果的可復(fù)現(xiàn)性 import numpy as np rng = np.random.RandomState(42) # 生成1000行4列的隨機(jī)數(shù)據(jù)矩陣 data = rng.randn(int(1000), 4) # 記錄開始時(shí)間 start = time.time() # 對(duì)數(shù)據(jù)的每一列計(jì)算平均值,并將結(jié)果存儲(chǔ)在results列表中 results = [data_processing_mean(data, col) for col in range(data.shape[1])] # 記錄結(jié)束時(shí)間 stop = time.time() # 打印處理過程的描述信息 print('\nSequential processing') # 打印整個(gè)處理過程的耗時(shí) print('Elapsed time for the entire processing: {:.2f} s'.format(stop - start))
Sequential processing Elapsed time for the entire processing: 4.05 s
下段代碼演示了如何使用joblib庫(kù)來(lái)緩存和并行化計(jì)算上述任務(wù):
# 導(dǎo)入time模塊,用于模擬耗時(shí)操作。 import time # 定義一個(gè)使用緩存的函數(shù),用于計(jì)算數(shù)據(jù)的均值。 def data_processing_mean_using_cache(data, column): return costly_compute_cached(data, column).mean() # 從joblib庫(kù)導(dǎo)入Memory類,用于緩存函數(shù)的輸出。 from joblib import Memory # 設(shè)置緩存的存儲(chǔ)位置和詳細(xì)程度 location = './cachedir' memory = Memory(location, verbose=0) # 使用Memory對(duì)象的cache方法來(lái)緩存costly_compute函數(shù)的輸出。 costly_compute_cached = memory.cache(costly_compute) # 從joblib庫(kù)導(dǎo)入Parallel和delayed類,用于并行執(zhí)行函數(shù)。 from joblib import Parallel, delayed # 記錄開始時(shí)間。 start = time.time() # 使用Parallel類并行執(zhí)行data_processing_mean_using_cache函數(shù),對(duì)數(shù)據(jù)的每一列進(jìn)行處理。 results = Parallel(n_jobs=2)( delayed(data_processing_mean_using_cache)(data, col) for col in range(data.shape[1])) # 記錄結(jié)束時(shí)間。 stop = time.time() # 打印第一輪處理的耗時(shí)信息,包括緩存數(shù)據(jù)的時(shí)間。 print('\nFirst round - caching the data') print('Elapsed time for the entire processing: {:.2f} s'.format(stop - start))
First round - caching the data Elapsed time for the entire processing: 2.05 s
再次執(zhí)行相同的過程,可以看到結(jié)果被緩存而不是重新執(zhí)行函數(shù):
start = time.time() results = Parallel(n_jobs=2)( delayed(data_processing_mean_using_cache)(data, col) for col in range(data.shape[1])) stop = time.time() print('\nSecond round - reloading from the cache') print('Elapsed time for the entire processing: {:.2f} s'.format(stop - start)) # 如果不想使用緩存結(jié)果,可以清除緩存信息 memory.clear(warn=False)
Second round - reloading from the cache Elapsed time for the entire processing: 0.02 s
???????2.2 序列化
以下示例展示了在joblib.Parallel中使用序列化內(nèi)存映射(numpy.memmap)。內(nèi)存映射可以將大型數(shù)據(jù)集分割成小塊,并在需要時(shí)將其加載到內(nèi)存中。這種方法可以減少內(nèi)存使用,并提高處理速度。
定義耗時(shí)函數(shù):
import numpy as np data = np.random.random((int(1e7),)) window_size = int(5e5) slices = [slice(start, start + window_size) for start in range(0, data.size - window_size, int(1e5))] import time def slow_mean(data, sl): time.sleep(0.01) return data[sl].mean()
以下代碼是直接調(diào)用函數(shù)的運(yùn)行結(jié)果:
tic = time.time() results = [slow_mean(data, sl) for sl in slices] toc = time.time() print('\nElapsed time computing the average of couple of slices {:.2f} s'.format(toc - tic))
Elapsed time computing the average of couple of slices 1.49 s
以下代碼是調(diào)用Parallel類2個(gè)進(jìn)程運(yùn)行的結(jié)果,由于整體任務(wù)計(jì)算耗時(shí)較少。所以Parallel類并行計(jì)算并沒有比直接調(diào)用函數(shù)有太多速度優(yōu)勢(shì),因?yàn)檫M(jìn)程啟動(dòng)銷毀需要額外時(shí)間:
from joblib import Parallel, delayed tic = time.time() results = Parallel(n_jobs=2)(delayed(slow_mean)(data, sl) for sl in slices) toc = time.time() print('\nElapsed time computing the average of couple of slices {:.2f} s'.format(toc - tic))
Elapsed time computing the average of couple of slices 1.00 s
以下代碼提供了joblib.dump和load函數(shù)加速數(shù)據(jù)讀取。其中dump函數(shù)用于將data對(duì)象序列化并保存到磁盤上的文件中,同時(shí)創(chuàng)建了一個(gè)內(nèi)存映射,使得該文件可以像內(nèi)存數(shù)組一樣被訪問。當(dāng)程序再次加載這個(gè)文件時(shí),可以使用load函數(shù)以內(nèi)存映射模式打開:
import os from joblib import dump, load # 從joblib庫(kù)導(dǎo)入dump和load函數(shù),用于創(chuàng)建和加載內(nèi)存映射文件 # 設(shè)置內(nèi)存映射文件的文件夾路徑 folder = './memmap' os.makedirs(folder, exist_ok = True) # 將內(nèi)存映射文件的名稱與路徑結(jié)合 data_filename_memmap = os.path.join(folder, 'data_memmap.joblib') # 使用dump函數(shù)將數(shù)據(jù)對(duì)象'data'保存到內(nèi)存映射文件 dump(data, data_filename_memmap) # 使用load函數(shù)加載內(nèi)存映射文件,mmap_mode='r'表示以只讀模式打開 data_ = load(data_filename_memmap, mmap_mode='r') # 記錄開始時(shí)間 tic = time.time() results = Parallel(n_jobs=2)(delayed(slow_mean)(data_, sl) for sl in slices) # 記錄結(jié)束時(shí)間 toc = time.time() print('\nElapsed time computing the average of couple of slices {:.2f} s\n'.format(toc - tic)) import shutil # 結(jié)束時(shí)刪除映射文件 try: shutil.rmtree(folder) except: pass
Elapsed time computing the average of couple of slices 0.77 s
???????2.3 內(nèi)存監(jiān)視
本實(shí)例展示不同并行方式的內(nèi)存消耗情況。
創(chuàng)建內(nèi)存監(jiān)視器
from psutil import Process from threading import Thread class MemoryMonitor(Thread): """在單獨(dú)的線程中監(jiān)控內(nèi)存使用情況(以MB為單位)。""" def __init__(self): super().__init__() # 調(diào)用父類Thread的構(gòu)造函數(shù) self.stop = False # 用于控制線程停止的標(biāo)記 self.memory_buffer = [] # 用于存儲(chǔ)內(nèi)存使用記錄的列表 self.start() # 啟動(dòng)線程 def get_memory(self): """獲取進(jìn)程及其子進(jìn)程的內(nèi)存使用情況。""" p = Process() # 獲取當(dāng)前進(jìn)程 memory = p.memory_info().rss # 獲取當(dāng)前進(jìn)程的內(nèi)存使用量 for c in p.children(): # 遍歷所有子進(jìn)程 memory += c.memory_info().rss # 累加子進(jìn)程的內(nèi)存使用量 return memory def run(self): """線程運(yùn)行的主體方法,周期性地記錄內(nèi)存使用情況。""" memory_start = self.get_memory() # 獲取初始內(nèi)存使用量 while not self.stop: # 當(dāng)未設(shè)置停止標(biāo)記時(shí)循環(huán) self.memory_buffer.append(self.get_memory() - memory_start) # 記錄當(dāng)前內(nèi)存使用量與初始內(nèi)存使用量的差值 time.sleep(0.2) # 休眠0.2秒 def join(self): """重寫join方法,設(shè)置停止標(biāo)記并等待線程結(jié)束。""" self.stop = True # 設(shè)置停止標(biāo)記 super().join() # 調(diào)用父類方法等待線程結(jié)束
并行任務(wù)
結(jié)果返回list的并行任務(wù):
import time import numpy as np def return_big_object(i): """生成并返回一個(gè)大型NumPy數(shù)組對(duì)象。""" time.sleep(.1) # 休眠0.1秒模擬耗時(shí)操作 return i * np.ones((10000, 200), dtype=np.float64) def accumulator_sum(generator): """累加生成器生成的所有值,并打印進(jìn)度。""" result = 0 for value in generator: result += value # print(".", end="", flush=True) # 打印點(diǎn)號(hào)并刷新輸出 # print("") # 打印換行符 return result from joblib import Parallel, delayed monitor = MemoryMonitor() # 創(chuàng)建內(nèi)存監(jiān)控器實(shí)例,并啟動(dòng)監(jiān)視 print("Running tasks with return_as='list'...") # 打印啟動(dòng)任務(wù)信息 res = Parallel(n_jobs=2, return_as="list")( delayed(return_big_object)(i) for i in range(150) # 使用joblib的Parallel功能并行執(zhí)行任務(wù) ) res = accumulator_sum(res) # 累加結(jié)果 print('All tasks completed and reduced successfully.') # 打印任務(wù)完成信息 # 報(bào)告內(nèi)存使用情況 del res # 清理結(jié)果以避免內(nèi)存邊界效應(yīng) monitor.join() # 等待內(nèi)存監(jiān)控線程結(jié)束 peak = max(monitor.memory_buffer) / 1e9 # 計(jì)算峰值內(nèi)存使用量,并轉(zhuǎn)換為GB print(f"Peak memory usage: {peak:.2f}GB") # 打印峰值內(nèi)存使用量
Running tasks with return_as='list'... All tasks completed and reduced successfully. Peak memory usage: 2.44GB
如果改為輸出生成器,那么內(nèi)存使用量將會(huì)大大減少:
monitor_gen = MemoryMonitor() # 創(chuàng)建內(nèi)存監(jiān)控器實(shí)例,并啟動(dòng)監(jiān)視 print("Running tasks with return_as='generator'...") # 打印啟動(dòng)任務(wù)信息 res = Parallel(n_jobs=2, return_as="generator")( delayed(return_big_object)(i) for i in range(150) ) res = accumulator_sum(res) # 累加結(jié)果 print('All tasks completed and reduced successfully.') # 打印任務(wù)完成信息 # 報(bào)告內(nèi)存使用情況 del res # 清理結(jié)果以避免內(nèi)存邊界效應(yīng) monitor_gen.join() # 等待內(nèi)存監(jiān)控線程結(jié)束 peak = max(monitor_gen.memory_buffer) / 1e9 # 計(jì)算峰值內(nèi)存使用量,并轉(zhuǎn)換為GB print(f"Peak memory usage: {peak:.2f}GB") # 打印峰值內(nèi)存使用量
Running tasks with return_as='generator'... All tasks completed and reduced successfully. Peak memory usage: 0.19GB
下圖展示了以上兩種方法的內(nèi)存消耗情況,第一種情況涉及到將所有結(jié)果存儲(chǔ)在內(nèi)存中,直到處理完成,這可能導(dǎo)致內(nèi)存使用量隨著時(shí)間線性增長(zhǎng)。而第二種情況generator則涉及到流式處理,即結(jié)果被實(shí)時(shí)處理,因此不需要同時(shí)在內(nèi)存中存儲(chǔ)所有結(jié)果,從而減少了內(nèi)存使用的需求:
import matplotlib.pyplot as plt plt.figure(0) plt.semilogy( np.maximum.accumulate(monitor.memory_buffer), label='return_as="list"' ) plt.semilogy( np.maximum.accumulate(monitor_gen.memory_buffer), label='return_as="generator"' ) plt.xlabel("Time") plt.xticks([], []) plt.ylabel("Memory usage") plt.yticks([1e7, 1e8, 1e9], ['10MB', '100MB', '1GB']) plt.legend() plt.show()
進(jìn)一步節(jié)省內(nèi)存
前一個(gè)例子中的生成器是保持任務(wù)提交的順序的。如果某些進(jìn)程任務(wù)提交晚,但比其他任務(wù)更早完成。相應(yīng)的結(jié)果會(huì)保持在內(nèi)存中,以等待其他任務(wù)完成。如果任務(wù)對(duì)結(jié)果返回順序無(wú)要求,例如最后只是對(duì)所有結(jié)果求和,可以使用generator_unordered減少內(nèi)存消耗。如下所示:
# 創(chuàng)建一個(gè)每個(gè)任務(wù)耗時(shí)可能不同的處理函數(shù) def return_big_object_delayed(i): if (i + 20) % 60: time.sleep(0.1) else: time.sleep(5) return i * np.ones((10000, 200), dtype=np.float64)
返回為generator格式的內(nèi)存使用:
monitor_delayed_gen = MemoryMonitor() print("Create result generator on delayed tasks with return_as='generator'...") res = Parallel(n_jobs=2, return_as="generator")( delayed(return_big_object_delayed)(i) for i in range(150) ) res = accumulator_sum(res) print('All tasks completed and reduced successfully.') del res monitor_delayed_gen.join() peak = max(monitor_delayed_gen.memory_buffer) / 1e6 print(f"Peak memory usage: {peak:.2f}MB")
Create result generator on delayed tasks with return_as='generator'... All tasks completed and reduced successfully. Peak memory usage: 784.23MB
返回為generator_unordered格式的內(nèi)存使用:
monitor_delayed_gen_unordered = MemoryMonitor() print( "Create result generator on delayed tasks with " "return_as='generator_unordered'..." ) res = Parallel(n_jobs=2, return_as="generator_unordered")( delayed(return_big_object_delayed)(i) for i in range(150) ) res = accumulator_sum(res) print('All tasks completed and reduced successfully.') del res monitor_delayed_gen_unordered.join() peak = max(monitor_delayed_gen_unordered.memory_buffer) / 1e6 print(f"Peak memory usage: {peak:.2f}MB")
Create result generator on delayed tasks with return_as='generator_unordered'... All tasks completed and reduced successfully. Peak memory usage: 175.22MB
內(nèi)存使用結(jié)果對(duì)比如下?;趃enerator_unordered選項(xiàng)在執(zhí)行任務(wù)時(shí),能夠獨(dú)立地處理每個(gè)任務(wù),而不需要依賴于其他任務(wù)的完成狀態(tài)。但是要注意的是由于系統(tǒng)負(fù)載、后端實(shí)現(xiàn)等多種可能影響任務(wù)執(zhí)行順序的因素,結(jié)果的返回順序是不確定的:
plt.figure(1) plt.semilogy( np.maximum.accumulate(monitor_delayed_gen.memory_buffer), label='return_as="generator"' ) plt.semilogy( np.maximum.accumulate(monitor_delayed_gen_unordered.memory_buffer), label='return_as="generator_unordered"' ) plt.xlabel("Time") plt.xticks([], []) plt.ylabel("Memory usage") plt.yticks([1e7, 1e8, 1e9], ['10MB', '100MB', '1GB']) plt.legend() plt.show()
3 參考
本文來(lái)自博客園,作者:落痕的寒假,轉(zhuǎn)載請(qǐng)注明原文鏈接:https://www.cnblogs.com/luohenyueji/p/18351959
到此這篇關(guān)于Python并行計(jì)算庫(kù)Joblib使用指北的文章就介紹到這了,更多相關(guān)Python Joblib使用內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Python使用Joblib模塊實(shí)現(xiàn)加快任務(wù)處理速度
- python利用joblib進(jìn)行并行數(shù)據(jù)處理的代碼示例
- Python的joblib模型固化函數(shù)解析
- Python并行庫(kù)joblib之delayed函數(shù)與Parallel函數(shù)詳解
- Python中的Joblib庫(kù)使用學(xué)習(xí)總結(jié)
- Python中的joblib模塊詳解
- python如何利用joblib保存訓(xùn)練模型
- Python Joblib庫(kù)使用方法案例總結(jié)
- Python高效計(jì)算庫(kù)Joblib的入門教程
相關(guān)文章
Django框架ORM操作數(shù)據(jù)庫(kù)不生效問題示例解決方法
本文詳細(xì)描述使用Django 的ORM框架操作PostgreSQL數(shù)據(jù)庫(kù)刪除不生效問題的定位過程及解決方案,并總結(jié)使用ORM框架操作數(shù)據(jù)庫(kù)不生效的問題的通用定位方法,感興趣的朋友跟隨小編一起看看吧2023-01-01如何在Python中將字符串轉(zhuǎn)換為數(shù)組詳解
最近在用Python,做一個(gè)小腳本,有個(gè)操作就是要把內(nèi)容換成數(shù)組對(duì)象再進(jìn)行相關(guān)操作,下面這篇文章主要給大家介紹了關(guān)于如何在Python中將字符串轉(zhuǎn)換為數(shù)組的相關(guān)資料,需要的朋友可以參考下2022-12-12Python使用Numpy實(shí)現(xiàn)Kmeans算法的步驟詳解
將物理或抽象對(duì)象的集合分成由類似的對(duì)象組成的多個(gè)類的過程被稱為聚類。這篇文章主要介紹了Python使用Numpy實(shí)現(xiàn)Kmeans算法,需要的朋友可以參考下2021-11-11python實(shí)現(xiàn)本地批量ping多個(gè)IP的方法示例
這篇文章主要介紹了python實(shí)現(xiàn)本地批量ping多個(gè)IP的方法示例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-08-08django數(shù)據(jù)庫(kù)自動(dòng)重連的方法實(shí)例
這篇文章主要給大家介紹了關(guān)于django數(shù)據(jù)庫(kù)自動(dòng)重連的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用django具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-07-07Python連接MySQL數(shù)據(jù)庫(kù)并查找表信息
本文主要介紹了Python連接MySQL數(shù)據(jù)庫(kù)并查找表信息,通過使用Python中的MySQL Connector模塊,連接到MySQL服務(wù)器并執(zhí)行SQL查詢語(yǔ)句,可以獲取表的結(jié)構(gòu)、列信息、行數(shù)據(jù)等,感興趣的可以了解一下2023-08-08