Python并行計(jì)算庫Joblib高效使用指北
Joblib是用于高效并行計(jì)算的Python開源庫,其提供了簡(jiǎn)單易用的內(nèi)存映射和并行計(jì)算的工具,以將任務(wù)分發(fā)到多個(gè)工作進(jìn)程中。Joblib庫特別適合用于需要進(jìn)行重復(fù)計(jì)算或大規(guī)模數(shù)據(jù)處理的任務(wù)。Joblib庫的官方倉(cāng)庫見:joblib,官方文檔見:joblib-doc。
Jolib庫安裝代碼如下:
pip install joblib
# 查看版本 import joblib joblib.__version__
'1.4.2'
1 使用說明
Joblib庫主要功能涵蓋以下三大塊:
- 記憶模式:Memory類將函數(shù)的返回值緩存到磁盤。下次調(diào)用時(shí),如果輸入?yún)?shù)不變,就直接從緩存中加載結(jié)果,避免重復(fù)計(jì)算。
- 并行計(jì)算:Parallel類將任務(wù)拆分到多個(gè)進(jìn)程或者線程中并行執(zhí)行,加速計(jì)算過程。
- 高效的序列化:針對(duì)NumPy數(shù)組等大型數(shù)據(jù)對(duì)象進(jìn)行了優(yōu)化,且序列化和反序列化速度快。
1.1 Memory類
Joblib庫的Memory類支持通過記憶模式,將函數(shù)的計(jì)算結(jié)果存儲(chǔ)起來,以便在下次使用時(shí)直接調(diào)用。這種機(jī)制的優(yōu)勢(shì)在于加速計(jì)算過程、節(jié)約資源以及簡(jiǎn)化管理。
Memory類構(gòu)造函數(shù)如下:
class joblib.Memory(location=None, backend='local', mmap_mode=None, compress=False, verbose=1, bytes_limit=None, backend_options=None)
參數(shù)介紹如下:
- location: 緩存文件的存放位置。如果設(shè)置為 None,則不緩存。
- backend: 緩存的后端存儲(chǔ)方式。默認(rèn)是 "local",表示使用本地文件系統(tǒng)。
- mmap_mode: 一個(gè)字符串,表示內(nèi)存映射文件的模式(None, ‘r+’, ‘r’, ‘w+’, ‘c’)。
- compress: 表示是否壓縮緩存文件。壓縮可以節(jié)省磁盤空間,但會(huì)增加 I/O 操作的時(shí)間。
- verbose: 一個(gè)整數(shù),表示日志的詳細(xì)程度。0 表示沒有輸出,1 表示只輸出警告,2 表示輸出信息,3 表示輸出調(diào)試信息。
- bytes_limit: 一個(gè)整數(shù)或 None,表示緩存使用的字節(jié)數(shù)限制。如果緩存超過了這個(gè)限制,最舊的緩存文件將被刪除。
- backend_options: 傳遞給緩存后端的選項(xiàng)。
Memory類簡(jiǎn)單使用
下面代碼展示第一次調(diào)用函數(shù)并緩存結(jié)果:
from joblib import Memory
import os, shutil
# 創(chuàng)建一個(gè)Memory對(duì)象,指定緩存目錄為當(dāng)前目錄下的run文件夾
# verbose=0表示關(guān)閉詳細(xì)輸出
cachedir = './run'
if os.path.exists(cachedir):
shutil.rmtree(cachedir)
memory = Memory(cachedir, verbose=0)
# 使用@memory.cache裝飾器,將函數(shù)f的結(jié)果緩存起來
@memory.cache
def f(x):
# 只有當(dāng)函數(shù)的輸入?yún)?shù)x沒有被緩存時(shí),才會(huì)執(zhí)行函數(shù)體內(nèi)的代碼
print('Running f(%s)' % x)
return x
# 第一次調(diào)用f(1),會(huì)執(zhí)行函數(shù)體內(nèi)的代碼,并將結(jié)果緩存起來
print(f(1))Running f(1) 1
第二次調(diào)用函數(shù):
# 第二次調(diào)用f(1),由于結(jié)果已經(jīng)被緩存,不會(huì)再次執(zhí)行函數(shù)體內(nèi)的代碼,而是直接從緩存中讀取結(jié)果 print(f(1))
1
調(diào)用其他函數(shù):
# 調(diào)用f(2),由于輸入?yún)?shù)不同,會(huì)再次執(zhí)行函數(shù)體內(nèi)的代碼,并將結(jié)果緩存起來 print(f(2))
Running f(2) 2
將Memory類應(yīng)用于numpy數(shù)組
import numpy as np
from joblib import Memory
import os, shutil
cachedir = './run'
if os.path.exists(cachedir):
shutil.rmtree(cachedir)
memory = Memory(cachedir, verbose=0)
@memory.cache
def g(x):
print('A long-running calculation, with parameter %s' % x)
# 返回漢明窗
return np.hamming(x)
@memory.cache
def h(x):
print('A second long-running calculation, using g(x)')
# 生成范德蒙德矩陣
return np.vander(x)
# 調(diào)用函數(shù)g,傳入?yún)?shù)3,并將結(jié)果存儲(chǔ)在變量a中
a = g(3)
# 打印變量a的值
print(a)
# 再次調(diào)用函數(shù)g,傳入相同的參數(shù)3,由于結(jié)果已被緩存,不會(huì)重新計(jì)算
print(g(3))A long-running calculation, with parameter 3 [0.08 1. 0.08] [0.08 1. 0.08]
直接計(jì)算和緩存結(jié)果是等同的:
# 調(diào)用函數(shù)h,傳入變量a作為參數(shù),并將結(jié)果存儲(chǔ)在變量b中 b = h(a) # 再次調(diào)用函數(shù)h,傳入相同的參數(shù)a,由于結(jié)果已被緩存,不會(huì)重新計(jì)算 b2 = h(a) # 使用numpy的allclose函數(shù)檢查b和b2是否足夠接近,即它們是否相等 print(np.allclose(b, b2))
A second long-running calculation, using g(x) True
直接調(diào)用緩存結(jié)果
import numpy as np
from joblib import Memory
import os, shutil
# 設(shè)置緩存目錄的路徑。
cachedir = './run'
# 檢查緩存目錄是否存在。
if os.path.exists(cachedir):
# 如果緩存目錄存在,使用shutil.rmtree刪除該目錄及其內(nèi)容。
shutil.rmtree(cachedir)
# 初始化Memory對(duì)象,設(shè)置緩存目錄為上面定義的cachedir,mmap_mode設(shè)置為'r',表示只讀模式。
memory = Memory(cachedir, mmap_mode='r', verbose=0)
# 使用memory.cache裝飾器緩存np.square函數(shù)的結(jié)果。
square = memory.cache(np.square)
a = np.vander(np.arange(3)).astype(float)
# 打印通過square函數(shù)處理后的矩陣a。
print(square(a))
# 獲取a的緩存結(jié)果
result = square.call_and_shelve(a)
print(result.get()) # 獲取并打印緩存的結(jié)果。[[ 0. 0. 1.] [ 1. 1. 1.] [16. 4. 1.]] [[ 0. 0. 1.] [ 1. 1. 1.] [16. 4. 1.]]
類中使用緩存
Memory類不建議將其直接用于類方法。如果想在類中使用緩存,建議的模式是在類中使用單獨(dú)定義的緩存函數(shù),如下所示:
@memory.cache
def compute_func(arg1, arg2, arg3):
pass
class Foo(object):
def __init__(self, args):
self.data = None
def compute(self):
# 類中調(diào)用緩存的函數(shù)
self.data = compute_func(self.arg1, self.arg2, 40)1.2 Parallel類
Joblib庫的Parallel類用于簡(jiǎn)單快速將任務(wù)分解為多個(gè)子任務(wù),并分配到不同的CPU核心或機(jī)器上執(zhí)行,從而顯著提高程序的運(yùn)行效率。
Parallel類構(gòu)造函數(shù)及主要參數(shù)如下:
class joblib.Parallel(n_jobs=default(None), backend=default(None), return_as='list', verbose=default(0), timeout=None, batch_size='auto', pre_dispatch='2 * n_jobs', temp_folder=default(None), max_nbytes=default('1M'), require=default(None))參數(shù)介紹如下:
- n_jobs: 指定并行任務(wù)的數(shù)量,為-1時(shí)表示使用所有可用的CPU核心;為None時(shí)表示使用單個(gè)進(jìn)程。
- backend:指定并行化的后端,可選項(xiàng):
- 'loky':使用loky庫實(shí)現(xiàn)多進(jìn)程,該庫由joblib開發(fā)者開發(fā),默認(rèn)選項(xiàng)。
- 'threading':使用threading庫實(shí)現(xiàn)多線程。
- 'multiprocessing':使用multiprocessing庫實(shí)現(xiàn)多進(jìn)程。
- return_as:返回結(jié)果格式,可選項(xiàng):
- 'list:列表。
- generator:按照任務(wù)提交順序生成結(jié)果的生成器。
- generator_unordered:按照?qǐng)?zhí)行結(jié)果完成先后順序的生成器。
- verbose: 一個(gè)整數(shù),表示日志的詳細(xì)程度。0 表示沒有輸出,1 表示只輸出警告,2 表示輸出信息,3 表示輸出調(diào)試信息。
- timeout:?jiǎn)蝹€(gè)任務(wù)最大運(yùn)行時(shí)長(zhǎng),超時(shí)將引發(fā)TimeOutError。僅適用于n_jobs不為1的情況。
- batch_size:當(dāng)Parallel類執(zhí)行任務(wù)時(shí),會(huì)將任務(wù)分批處理。batch_size參數(shù)決定了每個(gè)批次中包含的任務(wù)數(shù)。
- pre_dispatch: 用來決定在并行計(jì)算開始之前,每個(gè)批次有多少個(gè)任務(wù)會(huì)被預(yù)先準(zhǔn)備好并等待被分配給單個(gè)工作進(jìn)程。默認(rèn)值為“2*n_jobs”,表示并行計(jì)算時(shí)可以使用2倍工作進(jìn)程的任務(wù)數(shù)量。
- temp_folder:指定臨時(shí)文件的存儲(chǔ)路徑。
- max_nbytes:傳遞給工作程序的數(shù)組大小的閾值。
- require:對(duì)運(yùn)行任務(wù)的要求,可選None和sharedmem。sharedmem表示將使用共享內(nèi)存來執(zhí)行并行任務(wù),但會(huì)影響計(jì)算性能。
簡(jiǎn)單示例
以下代碼展示了單線程直接運(yùn)行計(jì)算密集型任務(wù)結(jié)果:
from joblib import Parallel, delayed
import numpy as np
import time
start = time.time()
# 定義一個(gè)計(jì)算密集型函數(shù)
def compute_heavy_task(data):
# 模擬處理時(shí)間
time.sleep(1)
# 數(shù)值計(jì)算
result = np.sum(np.square(data))
return result
# 生成一些模擬數(shù)據(jù)
# 設(shè)置隨機(jī)數(shù)生成器的種子
np.random.seed(42)
data = np.random.rand(10, 1000) # 10個(gè)1000維的向量
results = [compute_heavy_task(d) for d in data]
# 打印結(jié)果的和
print(f"結(jié)果: {sum(results)}")
print(f"耗時(shí):{time.time()-start}s")結(jié)果: 3269.16485027708
耗時(shí):10.101513624191284s
以下代碼展示利用Parallel類創(chuàng)建多進(jìn)程運(yùn)行計(jì)算密集型任務(wù)結(jié)果:
from joblib import Parallel, delayed
import numpy as np
import time
start = time.time()
# 定義一個(gè)計(jì)算密集型函數(shù)
def compute_heavy_task(data):
# 模擬處理時(shí)間
time.sleep(1)
# 數(shù)值計(jì)算
result = np.sum(np.square(data))
return result
# 設(shè)置隨機(jī)數(shù)生成器的種子
np.random.seed(42)
# 生成一些模擬數(shù)據(jù)
data = np.random.rand(10, 1000) # 10個(gè)1000維的向量
# 使用Parallel來并行執(zhí)行任務(wù)
results = Parallel(n_jobs=8, return_as="generator")(delayed(compute_heavy_task)(d) for d in data)
# 打印結(jié)果的和
print(f"結(jié)果: {sum(results)}")
print(f"耗時(shí):{time.time()-start}s")結(jié)果: 3269.16485027708
耗時(shí):2.381772041320801s
可以看到j(luò)oblib庫利用多進(jìn)程技術(shù)顯著提高了任務(wù)執(zhí)行的效率。然而,當(dāng)面對(duì)I/O密集型任務(wù)或執(zhí)行時(shí)間極短的任務(wù)時(shí),多線程或多進(jìn)程的優(yōu)勢(shì)可能并不明顯。這是因?yàn)榫€程創(chuàng)建和上下文切換的開銷有時(shí)可能超過任務(wù)本身的執(zhí)行時(shí)間。以上述的compute_heavy_task函數(shù)為例,如果移除了其中的time.sleep函數(shù),多進(jìn)程執(zhí)行所需的時(shí)間將會(huì)顯著增加。
此外獲取當(dāng)前系統(tǒng)的cpu核心數(shù)(邏輯處理器)代碼如下:
import joblib
# 獲取當(dāng)前系統(tǒng)的cpu核心數(shù)
n_cores = joblib.cpu_count()
print(f'系統(tǒng)的核心數(shù)是:{n_cores}')系統(tǒng)的核心數(shù)是:16
不同并行方式對(duì)比
以下代碼展示了不同并行方式在Parallel類中的應(yīng)用。默認(rèn)使用loky多進(jìn)程:
# 使用loky多進(jìn)程
from joblib import Parallel, delayed
import numpy as np
import time
start = time.time()
# 定義一個(gè)計(jì)算密集型函數(shù)
def compute_heavy_task(data):
# 模擬處理時(shí)間
time.sleep(1)
# 數(shù)值計(jì)算
result = np.sum(np.square(data))
return result
# 生成一些模擬數(shù)據(jù)
data = np.random.rand(10, 1000) # 10個(gè)1000維的向量
results = Parallel(n_jobs=8, return_as="generator", backend='loky')(delayed(compute_heavy_task)(d) for d in data)
# 打印結(jié)果的和
print(f"結(jié)果: {sum(results)}")
print(f"耗時(shí):{time.time()-start}s")結(jié)果: 3382.3336437893217
耗時(shí):2.042675256729126s
以下代碼展示了threading多線程的使用,注意由于Python的全局解釋器鎖(GIL)確保在任何時(shí)刻只有一個(gè)線程執(zhí)行Python字節(jié)碼。這表明即使在多核處理器上,Python的線程也無法實(shí)現(xiàn)真正的并行計(jì)算。然而,當(dāng)涉及到處理I/O密集型任務(wù)或需要快速響應(yīng)的小規(guī)模任務(wù)時(shí),多線程依然具有優(yōu)勢(shì):
# 使用threading多線程
start = time.time()
results = Parallel(n_jobs=8, return_as="generator", backend = 'threading')(delayed(compute_heavy_task)(d) for d in data)
# 打印結(jié)果的和
print(f"結(jié)果: {sum(results)}")
print(f"耗時(shí):{time.time()-start}s")結(jié)果: 3382.3336437893217
耗時(shí):2.040527105331421s
以下代碼展示了multiprocessing多進(jìn)程的使用,注意Windows下需要將multiprocessing相關(guān)代碼放在main函數(shù)中:
from joblib import Parallel, delayed
import numpy as np
import time
# 定義一個(gè)計(jì)算密集型函數(shù)
def compute_heavy_task(data):
# 模擬處理時(shí)間
time.sleep(1)
# 數(shù)值計(jì)算
result = np.sum(np.square(data))
return result
def main():
start = time.time()
# 生成一些模擬數(shù)據(jù)
data = np.random.rand(10, 1000) # 10個(gè)1000維的向量
# multiprocessing不支持返回rgenerator
results = Parallel(n_jobs=8, return_as="list", backend='multiprocessing')(delayed(compute_heavy_task)(d) for d in data)
# 打印結(jié)果的和
print(f"結(jié)果: {sum(results)}")
print(f"耗時(shí):{time.time()-start}s")
if __name__ == '__main__':
main()結(jié)果: 3304.6651996375645
耗時(shí):2.4303956031799316s
以下是loky、threading和 multiprocessing的一些關(guān)鍵特性對(duì)比:
| 特性/庫 | loky | threading | multiprocessing |
|---|---|---|---|
| 適用平臺(tái) | 跨平臺(tái) | 跨平臺(tái) | 跨平臺(tái),但Windows上存在限制 |
| 進(jìn)程/線程模型 | 進(jìn)程 | 線程 | 進(jìn)程 |
| GIL影響 | 無 | 有 | 無 |
| 適用場(chǎng)景 | CPU密集型任務(wù) | I/O密集型任務(wù) | CPU密集型任務(wù) |
| 啟動(dòng)開銷 | 較小 | 較小 | 較大 |
| 內(nèi)存使用 | 較高 | 較低 | 較高 |
| 進(jìn)程間通信 | 通過管道、隊(duì)列等 | 通過共享數(shù)據(jù)結(jié)構(gòu) | 通過管道、隊(duì)列等 |
| 線程間通信 | 共享數(shù)據(jù)結(jié)構(gòu) | 共享數(shù)據(jù)結(jié)構(gòu) | 不適用 |
| 異常處理 | 進(jìn)程間獨(dú)立 | 線程間共享 | 進(jìn)程間獨(dú)立 |
| 調(diào)試難度 | 較高 | 較低 | 較高 |
| 適用框架 | 通用 | 通用 | 通用 |
Python中線程和進(jìn)程簡(jiǎn)單對(duì)比如下:
- 資源共享:線程共享同一進(jìn)程的內(nèi)存和資源,而進(jìn)程擁有獨(dú)立的內(nèi)存空間。
- GIL影響:線程受GIL限制,進(jìn)程不受GIL限制。
- 開銷:線程的創(chuàng)建和切換開銷小,進(jìn)程的創(chuàng)建和切換開銷大。
- 適用性:線程適合I/O密集型任務(wù),進(jìn)程適合CPU密集型任務(wù)。
- 通信:線程間通信簡(jiǎn)單但需要處理同步問題,進(jìn)程間通信復(fù)雜但天然隔離。
在實(shí)際應(yīng)用中,選擇使用線程還是進(jìn)程取決于任務(wù)的特性和性能需求。如果任務(wù)主要是I/O密集型,使用線程可以提高性能;如果任務(wù)是CPU密集型,使用進(jìn)程可以更好地利用多核處理器的計(jì)算能力。
共享內(nèi)存
默認(rèn)情況下,Parallel類執(zhí)行任務(wù)時(shí)各個(gè)任務(wù)不共享內(nèi)存,如下所示:
from joblib import Parallel, delayed shared_set = set() def collect(x): shared_set.add(x) Parallel(n_jobs=2)(delayed(collect)(i) for i in range(5)) print(sorted(shared_set))
[]
通過設(shè)置require='sharedmem'可以實(shí)現(xiàn)內(nèi)存共享:
# require='sharedmem'表示需要共享內(nèi)存,以確保多個(gè)進(jìn)程可以訪問shared_set集合 Parallel(n_jobs=2, require='sharedmem')(delayed(collect)(i) for i in range(5)) print(sorted(shared_set))
[0, 1, 2, 3, 4]
上下文管理器
一些算法需要對(duì)一個(gè)并行函數(shù)進(jìn)行多次連續(xù)調(diào)用,但在循環(huán)中多次調(diào)用joblib.Parallel是次優(yōu)的,因?yàn)檫@將多次創(chuàng)建和銷毀一組工作進(jìn)程,從而導(dǎo)致顯著的性能開銷。
對(duì)于這種情況,使用joblib.Parallel類的上下文管理器API更為高效,可以重用同一組工作進(jìn)程進(jìn)行多次調(diào)用joblib.Parallel對(duì)象。如下所示:
from joblib import Parallel, delayed
import math
with Parallel(n_jobs=2) as parallel:
accumulator = 0.
n_iter = 0
while accumulator < 1000:
results = parallel(delayed(math.sqrt)(accumulator + i ** 2) for i in range(5))
accumulator += sum(results)
n_iter += 1
print(accumulator, n_iter) 1136.5969161564717 14
parallel_config
Joblib提供parallel_config類用于配置并行執(zhí)行的參數(shù),比如并行的后端類型、批處理大小等,這些配置可以影響后續(xù)所有的parallel實(shí)例。它通常在調(diào)用Parallel類之前使用。關(guān)于parallel_config使用見:parallel_config。
1.3 序列化
joblib.dump()和joblib.load()提供了一種替代pickle庫的方法,可以高效地序列化處理包含大量數(shù)據(jù)的任意Python對(duì)象,特別是大型的NumPy數(shù)組。關(guān)于pickle庫使用見:Python數(shù)據(jù)序列化模塊pickle使用筆記 。兩者效果對(duì)比見:
| 特點(diǎn) | pickle | joblib |
|---|---|---|
| 性能 | 一般 | 針對(duì)NumPy數(shù)組等大數(shù)據(jù)類型有優(yōu)化,通常更快 |
| 并行處理 | 不支持 | 內(nèi)置并行處理功能,可以加速任務(wù) |
| 內(nèi)存映射 | 不支持 | 支持內(nèi)存映射,可以高效處理大文件 |
| 壓縮 | 支持 | 支持壓縮,可以減少存儲(chǔ)空間 |
| 附加功能 | 少 | 提供了一些額外的功能,如緩存、延遲加載等 |
以下代碼展示了joblib.dump的基本使用:
from tempfile import mkdtemp
# 使用mkdtemp創(chuàng)建一個(gè)臨時(shí)目錄,并將目錄路徑存儲(chǔ)在變量savedir中。
savedir = mkdtemp(dir='./')
import os
# 文件保存路徑
filename = os.path.join(savedir, 'test.joblib')
import numpy as np
import pandas as pd
import joblib
# 創(chuàng)建一個(gè)要持久化的字典
to_persist = [('a', [1, 2, 3]), ('b', np.arange(10)), ('c', pd.DataFrame(np.ones((5,5))))]
# 使用joblib.dump函數(shù)將to_persist字典序列化并保存到filename指定的文件中
# 注意pickle庫無法序列化numpy數(shù)據(jù)
joblib.dump(to_persist, filename)['./tmp82ms1z5w\\test.joblib']
使用joblib.load函數(shù)從指定的文件中加載之前保存的序列化數(shù)據(jù):
joblib.load(filename)
[('a', [1, 2, 3]),
('b', array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])),
('c',
0 1 2 3 4
0 1.0 1.0 1.0 1.0 1.0
1 1.0 1.0 1.0 1.0 1.0
2 1.0 1.0 1.0 1.0 1.0
3 1.0 1.0 1.0 1.0 1.0
4 1.0 1.0 1.0 1.0 1.0)]joblib.dump和joblib.load函數(shù)還接受文件對(duì)象:
with open(filename, 'wb') as fo:
# 使用joblib將對(duì)象to_persist序列化并寫入文件
joblib.dump(to_persist, fo)
with open(filename, 'rb') as fo:
joblib.load(fo)此外joblib.dump也支持設(shè)置compress參數(shù)以實(shí)現(xiàn)數(shù)據(jù)壓縮:
# compress參數(shù)為壓縮級(jí)別,取值為0到9,值越大壓縮效果越好。為0時(shí)表示不壓縮,默認(rèn)值為0 joblib.dump(to_persist, filename, compress=1)
['./tmp82ms1z5w\\test.joblib']
默認(rèn)情況下,joblib.dump使用zlib壓縮方法,因?yàn)樗谒俣群痛疟P空間之間實(shí)現(xiàn)了最佳平衡。其他支持的壓縮方法包括“gzip”、“bz2”、“lzma”和“xz”。compress參數(shù)輸入帶有壓縮方法和壓縮級(jí)別就可以選擇不同壓縮方法:
joblib.dump(to_persist, filename + '.gz', compress=('gzip', 3))
joblib.load(filename + '.gz')
joblib.dump(to_persist, filename + '.bz2', compress=('bz2', 5))
joblib.load(filename + '.bz2')[('a', [1, 2, 3]),
('b', array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])),
('c',
0 1 2 3 4
0 1.0 1.0 1.0 1.0 1.0
1 1.0 1.0 1.0 1.0 1.0
2 1.0 1.0 1.0 1.0 1.0
3 1.0 1.0 1.0 1.0 1.0
4 1.0 1.0 1.0 1.0 1.0)]除了默認(rèn)壓縮方法,lz4壓縮算法也可以用于數(shù)據(jù)壓縮。前提是需要安裝lz4壓縮庫:
pip install lz4
在這些壓縮方法中,lz4和默認(rèn)方法效果較好。lz4使用方式與其他壓縮方式一樣:
joblib.dump(to_persist, filename, compress=('lz4', 3)) ['./tmp82ms1z5w\\test.joblib']
???????2 實(shí)例
2.1 joblib緩存和并行
本實(shí)例展示了利用joblib緩存和并行來加速任務(wù)執(zhí)行。以下代碼展示了一個(gè)高耗時(shí)任務(wù):
# 導(dǎo)入time模塊,用于實(shí)現(xiàn)延時(shí)功能
import time
# 定義一個(gè)模擬耗時(shí)計(jì)算的函數(shù)
def costly_compute(data, column):
# 休眠1秒,模擬耗時(shí)操作
time.sleep(1)
# 返回傳入數(shù)據(jù)的指定列
return data[column]
# 定義一個(gè)計(jì)算數(shù)據(jù)列平均值的函數(shù)
def data_processing_mean(data, column):
# 調(diào)用costly_compute函數(shù)獲取指定列的數(shù)據(jù)
column_data = costly_compute(data, column)
# 計(jì)算并返回該列數(shù)據(jù)的平均值
return column_data.mean()
# 導(dǎo)入numpy庫,并設(shè)置隨機(jī)數(shù)生成器的種子,以保證結(jié)果的可復(fù)現(xiàn)性
import numpy as np
rng = np.random.RandomState(42)
# 生成1000行4列的隨機(jī)數(shù)據(jù)矩陣
data = rng.randn(int(1000), 4)
# 記錄開始時(shí)間
start = time.time()
# 對(duì)數(shù)據(jù)的每一列計(jì)算平均值,并將結(jié)果存儲(chǔ)在results列表中
results = [data_processing_mean(data, col) for col in range(data.shape[1])]
# 記錄結(jié)束時(shí)間
stop = time.time()
# 打印處理過程的描述信息
print('\nSequential processing')
# 打印整個(gè)處理過程的耗時(shí)
print('Elapsed time for the entire processing: {:.2f} s'.format(stop - start))Sequential processing Elapsed time for the entire processing: 4.05 s
下段代碼演示了如何使用joblib庫來緩存和并行化計(jì)算上述任務(wù):
# 導(dǎo)入time模塊,用于模擬耗時(shí)操作。
import time
# 定義一個(gè)使用緩存的函數(shù),用于計(jì)算數(shù)據(jù)的均值。
def data_processing_mean_using_cache(data, column):
return costly_compute_cached(data, column).mean()
# 從joblib庫導(dǎo)入Memory類,用于緩存函數(shù)的輸出。
from joblib import Memory
# 設(shè)置緩存的存儲(chǔ)位置和詳細(xì)程度
location = './cachedir'
memory = Memory(location, verbose=0)
# 使用Memory對(duì)象的cache方法來緩存costly_compute函數(shù)的輸出。
costly_compute_cached = memory.cache(costly_compute)
# 從joblib庫導(dǎo)入Parallel和delayed類,用于并行執(zhí)行函數(shù)。
from joblib import Parallel, delayed
# 記錄開始時(shí)間。
start = time.time()
# 使用Parallel類并行執(zhí)行data_processing_mean_using_cache函數(shù),對(duì)數(shù)據(jù)的每一列進(jìn)行處理。
results = Parallel(n_jobs=2)(
delayed(data_processing_mean_using_cache)(data, col)
for col in range(data.shape[1]))
# 記錄結(jié)束時(shí)間。
stop = time.time()
# 打印第一輪處理的耗時(shí)信息,包括緩存數(shù)據(jù)的時(shí)間。
print('\nFirst round - caching the data')
print('Elapsed time for the entire processing: {:.2f} s'.format(stop - start))First round - caching the data Elapsed time for the entire processing: 2.05 s
再次執(zhí)行相同的過程,可以看到結(jié)果被緩存而不是重新執(zhí)行函數(shù):
start = time.time()
results = Parallel(n_jobs=2)(
delayed(data_processing_mean_using_cache)(data, col)
for col in range(data.shape[1]))
stop = time.time()
print('\nSecond round - reloading from the cache')
print('Elapsed time for the entire processing: {:.2f} s'.format(stop - start))
# 如果不想使用緩存結(jié)果,可以清除緩存信息
memory.clear(warn=False)Second round - reloading from the cache Elapsed time for the entire processing: 0.02 s
???????2.2 序列化
以下示例展示了在joblib.Parallel中使用序列化內(nèi)存映射(numpy.memmap)。內(nèi)存映射可以將大型數(shù)據(jù)集分割成小塊,并在需要時(shí)將其加載到內(nèi)存中。這種方法可以減少內(nèi)存使用,并提高處理速度。
定義耗時(shí)函數(shù):
import numpy as np
data = np.random.random((int(1e7),))
window_size = int(5e5)
slices = [slice(start, start + window_size)
for start in range(0, data.size - window_size, int(1e5))]
import time
def slow_mean(data, sl):
time.sleep(0.01)
return data[sl].mean()以下代碼是直接調(diào)用函數(shù)的運(yùn)行結(jié)果:
tic = time.time()
results = [slow_mean(data, sl) for sl in slices]
toc = time.time()
print('\nElapsed time computing the average of couple of slices {:.2f} s'.format(toc - tic))Elapsed time computing the average of couple of slices 1.49 s
以下代碼是調(diào)用Parallel類2個(gè)進(jìn)程運(yùn)行的結(jié)果,由于整體任務(wù)計(jì)算耗時(shí)較少。所以Parallel類并行計(jì)算并沒有比直接調(diào)用函數(shù)有太多速度優(yōu)勢(shì),因?yàn)檫M(jìn)程啟動(dòng)銷毀需要額外時(shí)間:
from joblib import Parallel, delayed
tic = time.time()
results = Parallel(n_jobs=2)(delayed(slow_mean)(data, sl) for sl in slices)
toc = time.time()
print('\nElapsed time computing the average of couple of slices {:.2f} s'.format(toc - tic))Elapsed time computing the average of couple of slices 1.00 s
以下代碼提供了joblib.dump和load函數(shù)加速數(shù)據(jù)讀取。其中dump函數(shù)用于將data對(duì)象序列化并保存到磁盤上的文件中,同時(shí)創(chuàng)建了一個(gè)內(nèi)存映射,使得該文件可以像內(nèi)存數(shù)組一樣被訪問。當(dāng)程序再次加載這個(gè)文件時(shí),可以使用load函數(shù)以內(nèi)存映射模式打開:
import os
from joblib import dump, load # 從joblib庫導(dǎo)入dump和load函數(shù),用于創(chuàng)建和加載內(nèi)存映射文件
# 設(shè)置內(nèi)存映射文件的文件夾路徑
folder = './memmap'
os.makedirs(folder, exist_ok = True)
# 將內(nèi)存映射文件的名稱與路徑結(jié)合
data_filename_memmap = os.path.join(folder, 'data_memmap.joblib')
# 使用dump函數(shù)將數(shù)據(jù)對(duì)象'data'保存到內(nèi)存映射文件
dump(data, data_filename_memmap)
# 使用load函數(shù)加載內(nèi)存映射文件,mmap_mode='r'表示以只讀模式打開
data_ = load(data_filename_memmap, mmap_mode='r')
# 記錄開始時(shí)間
tic = time.time()
results = Parallel(n_jobs=2)(delayed(slow_mean)(data_, sl) for sl in slices)
# 記錄結(jié)束時(shí)間
toc = time.time()
print('\nElapsed time computing the average of couple of slices {:.2f} s\n'.format(toc - tic))
import shutil
# 結(jié)束時(shí)刪除映射文件
try:
shutil.rmtree(folder)
except:
passElapsed time computing the average of couple of slices 0.77 s
???????2.3 內(nèi)存監(jiān)視
本實(shí)例展示不同并行方式的內(nèi)存消耗情況。
創(chuàng)建內(nèi)存監(jiān)視器
from psutil import Process
from threading import Thread
class MemoryMonitor(Thread):
"""在單獨(dú)的線程中監(jiān)控內(nèi)存使用情況(以MB為單位)。"""
def __init__(self):
super().__init__() # 調(diào)用父類Thread的構(gòu)造函數(shù)
self.stop = False # 用于控制線程停止的標(biāo)記
self.memory_buffer = [] # 用于存儲(chǔ)內(nèi)存使用記錄的列表
self.start() # 啟動(dòng)線程
def get_memory(self):
"""獲取進(jìn)程及其子進(jìn)程的內(nèi)存使用情況。"""
p = Process() # 獲取當(dāng)前進(jìn)程
memory = p.memory_info().rss # 獲取當(dāng)前進(jìn)程的內(nèi)存使用量
for c in p.children(): # 遍歷所有子進(jìn)程
memory += c.memory_info().rss # 累加子進(jìn)程的內(nèi)存使用量
return memory
def run(self):
"""線程運(yùn)行的主體方法,周期性地記錄內(nèi)存使用情況。"""
memory_start = self.get_memory() # 獲取初始內(nèi)存使用量
while not self.stop: # 當(dāng)未設(shè)置停止標(biāo)記時(shí)循環(huán)
self.memory_buffer.append(self.get_memory() - memory_start) # 記錄當(dāng)前內(nèi)存使用量與初始內(nèi)存使用量的差值
time.sleep(0.2) # 休眠0.2秒
def join(self):
"""重寫join方法,設(shè)置停止標(biāo)記并等待線程結(jié)束。"""
self.stop = True # 設(shè)置停止標(biāo)記
super().join() # 調(diào)用父類方法等待線程結(jié)束并行任務(wù)
結(jié)果返回list的并行任務(wù):
import time
import numpy as np
def return_big_object(i):
"""生成并返回一個(gè)大型NumPy數(shù)組對(duì)象。"""
time.sleep(.1) # 休眠0.1秒模擬耗時(shí)操作
return i * np.ones((10000, 200), dtype=np.float64)
def accumulator_sum(generator):
"""累加生成器生成的所有值,并打印進(jìn)度。"""
result = 0
for value in generator:
result += value
# print(".", end="", flush=True) # 打印點(diǎn)號(hào)并刷新輸出
# print("") # 打印換行符
return result
from joblib import Parallel, delayed
monitor = MemoryMonitor() # 創(chuàng)建內(nèi)存監(jiān)控器實(shí)例,并啟動(dòng)監(jiān)視
print("Running tasks with return_as='list'...") # 打印啟動(dòng)任務(wù)信息
res = Parallel(n_jobs=2, return_as="list")(
delayed(return_big_object)(i) for i in range(150) # 使用joblib的Parallel功能并行執(zhí)行任務(wù)
)
res = accumulator_sum(res) # 累加結(jié)果
print('All tasks completed and reduced successfully.') # 打印任務(wù)完成信息
# 報(bào)告內(nèi)存使用情況
del res # 清理結(jié)果以避免內(nèi)存邊界效應(yīng)
monitor.join() # 等待內(nèi)存監(jiān)控線程結(jié)束
peak = max(monitor.memory_buffer) / 1e9 # 計(jì)算峰值內(nèi)存使用量,并轉(zhuǎn)換為GB
print(f"Peak memory usage: {peak:.2f}GB") # 打印峰值內(nèi)存使用量Running tasks with return_as='list'... All tasks completed and reduced successfully. Peak memory usage: 2.44GB
如果改為輸出生成器,那么內(nèi)存使用量將會(huì)大大減少:
monitor_gen = MemoryMonitor() # 創(chuàng)建內(nèi)存監(jiān)控器實(shí)例,并啟動(dòng)監(jiān)視
print("Running tasks with return_as='generator'...") # 打印啟動(dòng)任務(wù)信息
res = Parallel(n_jobs=2, return_as="generator")(
delayed(return_big_object)(i) for i in range(150)
)
res = accumulator_sum(res) # 累加結(jié)果
print('All tasks completed and reduced successfully.') # 打印任務(wù)完成信息
# 報(bào)告內(nèi)存使用情況
del res # 清理結(jié)果以避免內(nèi)存邊界效應(yīng)
monitor_gen.join() # 等待內(nèi)存監(jiān)控線程結(jié)束
peak = max(monitor_gen.memory_buffer) / 1e9 # 計(jì)算峰值內(nèi)存使用量,并轉(zhuǎn)換為GB
print(f"Peak memory usage: {peak:.2f}GB") # 打印峰值內(nèi)存使用量Running tasks with return_as='generator'... All tasks completed and reduced successfully. Peak memory usage: 0.19GB
下圖展示了以上兩種方法的內(nèi)存消耗情況,第一種情況涉及到將所有結(jié)果存儲(chǔ)在內(nèi)存中,直到處理完成,這可能導(dǎo)致內(nèi)存使用量隨著時(shí)間線性增長(zhǎng)。而第二種情況generator則涉及到流式處理,即結(jié)果被實(shí)時(shí)處理,因此不需要同時(shí)在內(nèi)存中存儲(chǔ)所有結(jié)果,從而減少了內(nèi)存使用的需求:
import matplotlib.pyplot as plt
plt.figure(0)
plt.semilogy(
np.maximum.accumulate(monitor.memory_buffer),
label='return_as="list"'
)
plt.semilogy(
np.maximum.accumulate(monitor_gen.memory_buffer),
label='return_as="generator"'
)
plt.xlabel("Time")
plt.xticks([], [])
plt.ylabel("Memory usage")
plt.yticks([1e7, 1e8, 1e9], ['10MB', '100MB', '1GB'])
plt.legend()
plt.show()
進(jìn)一步節(jié)省內(nèi)存
前一個(gè)例子中的生成器是保持任務(wù)提交的順序的。如果某些進(jìn)程任務(wù)提交晚,但比其他任務(wù)更早完成。相應(yīng)的結(jié)果會(huì)保持在內(nèi)存中,以等待其他任務(wù)完成。如果任務(wù)對(duì)結(jié)果返回順序無要求,例如最后只是對(duì)所有結(jié)果求和,可以使用generator_unordered減少內(nèi)存消耗。如下所示:
# 創(chuàng)建一個(gè)每個(gè)任務(wù)耗時(shí)可能不同的處理函數(shù)
def return_big_object_delayed(i):
if (i + 20) % 60:
time.sleep(0.1)
else:
time.sleep(5)
return i * np.ones((10000, 200), dtype=np.float64)返回為generator格式的內(nèi)存使用:
monitor_delayed_gen = MemoryMonitor()
print("Create result generator on delayed tasks with return_as='generator'...")
res = Parallel(n_jobs=2, return_as="generator")(
delayed(return_big_object_delayed)(i) for i in range(150)
)
res = accumulator_sum(res)
print('All tasks completed and reduced successfully.')
del res
monitor_delayed_gen.join()
peak = max(monitor_delayed_gen.memory_buffer) / 1e6
print(f"Peak memory usage: {peak:.2f}MB")Create result generator on delayed tasks with return_as='generator'... All tasks completed and reduced successfully. Peak memory usage: 784.23MB
返回為generator_unordered格式的內(nèi)存使用:
monitor_delayed_gen_unordered = MemoryMonitor()
print(
"Create result generator on delayed tasks with "
"return_as='generator_unordered'..."
)
res = Parallel(n_jobs=2, return_as="generator_unordered")(
delayed(return_big_object_delayed)(i) for i in range(150)
)
res = accumulator_sum(res)
print('All tasks completed and reduced successfully.')
del res
monitor_delayed_gen_unordered.join()
peak = max(monitor_delayed_gen_unordered.memory_buffer) / 1e6
print(f"Peak memory usage: {peak:.2f}MB")Create result generator on delayed tasks with return_as='generator_unordered'... All tasks completed and reduced successfully. Peak memory usage: 175.22MB
內(nèi)存使用結(jié)果對(duì)比如下。基于generator_unordered選項(xiàng)在執(zhí)行任務(wù)時(shí),能夠獨(dú)立地處理每個(gè)任務(wù),而不需要依賴于其他任務(wù)的完成狀態(tài)。但是要注意的是由于系統(tǒng)負(fù)載、后端實(shí)現(xiàn)等多種可能影響任務(wù)執(zhí)行順序的因素,結(jié)果的返回順序是不確定的:
plt.figure(1)
plt.semilogy(
np.maximum.accumulate(monitor_delayed_gen.memory_buffer),
label='return_as="generator"'
)
plt.semilogy(
np.maximum.accumulate(monitor_delayed_gen_unordered.memory_buffer),
label='return_as="generator_unordered"'
)
plt.xlabel("Time")
plt.xticks([], [])
plt.ylabel("Memory usage")
plt.yticks([1e7, 1e8, 1e9], ['10MB', '100MB', '1GB'])
plt.legend()
plt.show()
3 參考
本文來自博客園,作者:落痕的寒假,轉(zhuǎn)載請(qǐng)注明原文鏈接:https://www.cnblogs.com/luohenyueji/p/18351959
到此這篇關(guān)于Python并行計(jì)算庫Joblib使用指北的文章就介紹到這了,更多相關(guān)Python Joblib使用內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Django框架ORM操作數(shù)據(jù)庫不生效問題示例解決方法
本文詳細(xì)描述使用Django 的ORM框架操作PostgreSQL數(shù)據(jù)庫刪除不生效問題的定位過程及解決方案,并總結(jié)使用ORM框架操作數(shù)據(jù)庫不生效的問題的通用定位方法,感興趣的朋友跟隨小編一起看看吧2023-01-01
如何在Python中將字符串轉(zhuǎn)換為數(shù)組詳解
最近在用Python,做一個(gè)小腳本,有個(gè)操作就是要把內(nèi)容換成數(shù)組對(duì)象再進(jìn)行相關(guān)操作,下面這篇文章主要給大家介紹了關(guān)于如何在Python中將字符串轉(zhuǎn)換為數(shù)組的相關(guān)資料,需要的朋友可以參考下2022-12-12
Python使用Numpy實(shí)現(xiàn)Kmeans算法的步驟詳解
將物理或抽象對(duì)象的集合分成由類似的對(duì)象組成的多個(gè)類的過程被稱為聚類。這篇文章主要介紹了Python使用Numpy實(shí)現(xiàn)Kmeans算法,需要的朋友可以參考下2021-11-11
python實(shí)現(xiàn)本地批量ping多個(gè)IP的方法示例
這篇文章主要介紹了python實(shí)現(xiàn)本地批量ping多個(gè)IP的方法示例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-08-08
django數(shù)據(jù)庫自動(dòng)重連的方法實(shí)例
這篇文章主要給大家介紹了關(guān)于django數(shù)據(jù)庫自動(dòng)重連的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用django具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧2019-07-07
Python連接MySQL數(shù)據(jù)庫并查找表信息
本文主要介紹了Python連接MySQL數(shù)據(jù)庫并查找表信息,通過使用Python中的MySQL Connector模塊,連接到MySQL服務(wù)器并執(zhí)行SQL查詢語句,可以獲取表的結(jié)構(gòu)、列信息、行數(shù)據(jù)等,感興趣的可以了解一下2023-08-08

