欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python并行計(jì)算庫(kù)Joblib高效使用指北

 更新時(shí)間:2024年08月10日 10:03:40   作者:落痕的寒假  
Joblib是用于高效并行計(jì)算的Python開源庫(kù),其提供了簡(jiǎn)單易用的內(nèi)存映射和并行計(jì)算的工具,以將任務(wù)分發(fā)到多個(gè)工作進(jìn)程中,這篇文章主要介紹了Python并行計(jì)算庫(kù)Joblib使用指北,需要的朋友可以參考下

Joblib是用于高效并行計(jì)算的Python開源庫(kù),其提供了簡(jiǎn)單易用的內(nèi)存映射和并行計(jì)算的工具,以將任務(wù)分發(fā)到多個(gè)工作進(jìn)程中。Joblib庫(kù)特別適合用于需要進(jìn)行重復(fù)計(jì)算或大規(guī)模數(shù)據(jù)處理的任務(wù)。Joblib庫(kù)的官方倉(cāng)庫(kù)見:joblib,官方文檔見:joblib-doc。

Jolib庫(kù)安裝代碼如下:

pip install joblib

# 查看版本
import joblib
joblib.__version__

'1.4.2'

1 使用說(shuō)明

Joblib庫(kù)主要功能涵蓋以下三大塊:

  • 記憶模式:Memory類將函數(shù)的返回值緩存到磁盤。下次調(diào)用時(shí),如果輸入?yún)?shù)不變,就直接從緩存中加載結(jié)果,避免重復(fù)計(jì)算。
  • 并行計(jì)算:Parallel類將任務(wù)拆分到多個(gè)進(jìn)程或者線程中并行執(zhí)行,加速計(jì)算過程。
  • 高效的序列化:針對(duì)NumPy數(shù)組等大型數(shù)據(jù)對(duì)象進(jìn)行了優(yōu)化,且序列化和反序列化速度快。

1.1 Memory類

Joblib庫(kù)的Memory類支持通過記憶模式,將函數(shù)的計(jì)算結(jié)果存儲(chǔ)起來(lái),以便在下次使用時(shí)直接調(diào)用。這種機(jī)制的優(yōu)勢(shì)在于加速計(jì)算過程、節(jié)約資源以及簡(jiǎn)化管理。

Memory類構(gòu)造函數(shù)如下:

class joblib.Memory(location=None, backend='local', mmap_mode=None, compress=False, verbose=1, bytes_limit=None, backend_options=None)

參數(shù)介紹如下:

  • location: 緩存文件的存放位置。如果設(shè)置為 None,則不緩存。
  • backend: 緩存的后端存儲(chǔ)方式。默認(rèn)是 "local",表示使用本地文件系統(tǒng)。
  • mmap_mode: 一個(gè)字符串,表示內(nèi)存映射文件的模式(None, ‘r+’, ‘r’, ‘w+’, ‘c’)。
  • compress: 表示是否壓縮緩存文件。壓縮可以節(jié)省磁盤空間,但會(huì)增加 I/O 操作的時(shí)間。
  • verbose: 一個(gè)整數(shù),表示日志的詳細(xì)程度。0 表示沒有輸出,1 表示只輸出警告,2 表示輸出信息,3 表示輸出調(diào)試信息。
  • bytes_limit: 一個(gè)整數(shù)或 None,表示緩存使用的字節(jié)數(shù)限制。如果緩存超過了這個(gè)限制,最舊的緩存文件將被刪除。
  • backend_options: 傳遞給緩存后端的選項(xiàng)。

Memory類簡(jiǎn)單使用

下面代碼展示第一次調(diào)用函數(shù)并緩存結(jié)果:

from joblib import Memory
import os, shutil
# 創(chuàng)建一個(gè)Memory對(duì)象,指定緩存目錄為當(dāng)前目錄下的run文件夾
# verbose=0表示關(guān)閉詳細(xì)輸出
cachedir = './run'
if os.path.exists(cachedir):
    shutil.rmtree(cachedir)
memory = Memory(cachedir, verbose=0)
# 使用@memory.cache裝飾器,將函數(shù)f的結(jié)果緩存起來(lái)
@memory.cache
def f(x):
    # 只有當(dāng)函數(shù)的輸入?yún)?shù)x沒有被緩存時(shí),才會(huì)執(zhí)行函數(shù)體內(nèi)的代碼
    print('Running f(%s)' % x)
    return x
# 第一次調(diào)用f(1),會(huì)執(zhí)行函數(shù)體內(nèi)的代碼,并將結(jié)果緩存起來(lái)
print(f(1))
Running f(1)
1

第二次調(diào)用函數(shù):

# 第二次調(diào)用f(1),由于結(jié)果已經(jīng)被緩存,不會(huì)再次執(zhí)行函數(shù)體內(nèi)的代碼,而是直接從緩存中讀取結(jié)果
print(f(1))

1

調(diào)用其他函數(shù):

# 調(diào)用f(2),由于輸入?yún)?shù)不同,會(huì)再次執(zhí)行函數(shù)體內(nèi)的代碼,并將結(jié)果緩存起來(lái)
print(f(2))
Running f(2)
2

將Memory類應(yīng)用于numpy數(shù)組

import numpy as np
from joblib import Memory
import os, shutil
cachedir = './run'
if os.path.exists(cachedir):
    shutil.rmtree(cachedir)
memory = Memory(cachedir, verbose=0)
@memory.cache
def g(x):
    print('A long-running calculation, with parameter %s' % x)
    # 返回漢明窗
    return np.hamming(x)
@memory.cache
def h(x):
    print('A second long-running calculation, using g(x)')
    # 生成范德蒙德矩陣
    return np.vander(x)
# 調(diào)用函數(shù)g,傳入?yún)?shù)3,并將結(jié)果存儲(chǔ)在變量a中
a = g(3)
# 打印變量a的值
print(a)
# 再次調(diào)用函數(shù)g,傳入相同的參數(shù)3,由于結(jié)果已被緩存,不會(huì)重新計(jì)算
print(g(3))
A long-running calculation, with parameter 3
[0.08 1.   0.08]
[0.08 1.   0.08]

直接計(jì)算和緩存結(jié)果是等同的:

# 調(diào)用函數(shù)h,傳入變量a作為參數(shù),并將結(jié)果存儲(chǔ)在變量b中
b = h(a)
# 再次調(diào)用函數(shù)h,傳入相同的參數(shù)a,由于結(jié)果已被緩存,不會(huì)重新計(jì)算
b2 = h(a)
# 使用numpy的allclose函數(shù)檢查b和b2是否足夠接近,即它們是否相等
print(np.allclose(b, b2))
A second long-running calculation, using g(x)
True

直接調(diào)用緩存結(jié)果

import numpy as np
from joblib import Memory
import os, shutil
# 設(shè)置緩存目錄的路徑。
cachedir = './run'
# 檢查緩存目錄是否存在。
if os.path.exists(cachedir):
    # 如果緩存目錄存在,使用shutil.rmtree刪除該目錄及其內(nèi)容。
    shutil.rmtree(cachedir)
# 初始化Memory對(duì)象,設(shè)置緩存目錄為上面定義的cachedir,mmap_mode設(shè)置為'r',表示只讀模式。
memory = Memory(cachedir, mmap_mode='r', verbose=0)
# 使用memory.cache裝飾器緩存np.square函數(shù)的結(jié)果。
square = memory.cache(np.square)
a = np.vander(np.arange(3)).astype(float)
# 打印通過square函數(shù)處理后的矩陣a。
print(square(a))
# 獲取a的緩存結(jié)果
result = square.call_and_shelve(a)
print(result.get())  # 獲取并打印緩存的結(jié)果。
[[ 0.  0.  1.]
 [ 1.  1.  1.]
 [16.  4.  1.]]
[[ 0.  0.  1.]
 [ 1.  1.  1.]
 [16.  4.  1.]]

類中使用緩存

Memory類不建議將其直接用于類方法。如果想在類中使用緩存,建議的模式是在類中使用單獨(dú)定義的緩存函數(shù),如下所示:

@memory.cache
def compute_func(arg1, arg2, arg3):
    pass
class Foo(object):
    def __init__(self, args):
        self.data = None
    def compute(self):
        # 類中調(diào)用緩存的函數(shù)
        self.data = compute_func(self.arg1, self.arg2, 40)

1.2 Parallel類

Joblib庫(kù)的Parallel類用于簡(jiǎn)單快速將任務(wù)分解為多個(gè)子任務(wù),并分配到不同的CPU核心或機(jī)器上執(zhí)行,從而顯著提高程序的運(yùn)行效率。

Parallel類構(gòu)造函數(shù)及主要參數(shù)如下:

class joblib.Parallel(n_jobs=default(None), backend=default(None), return_as='list', verbose=default(0), timeout=None, batch_size='auto', pre_dispatch='2 * n_jobs', temp_folder=default(None), max_nbytes=default('1M'), require=default(None))

參數(shù)介紹如下:

  • n_jobs: 指定并行任務(wù)的數(shù)量,為-1時(shí)表示使用所有可用的CPU核心;為None時(shí)表示使用單個(gè)進(jìn)程。
  • backend:指定并行化的后端,可選項(xiàng):
    • 'loky':使用loky庫(kù)實(shí)現(xiàn)多進(jìn)程,該庫(kù)由joblib開發(fā)者開發(fā),默認(rèn)選項(xiàng)。
    • 'threading':使用threading庫(kù)實(shí)現(xiàn)多線程。
    • 'multiprocessing':使用multiprocessing庫(kù)實(shí)現(xiàn)多進(jìn)程。
  • return_as:返回結(jié)果格式,可選項(xiàng):
    • 'list:列表。
    • generator:按照任務(wù)提交順序生成結(jié)果的生成器。
    • generator_unordered:按照?qǐng)?zhí)行結(jié)果完成先后順序的生成器。
  • verbose: 一個(gè)整數(shù),表示日志的詳細(xì)程度。0 表示沒有輸出,1 表示只輸出警告,2 表示輸出信息,3 表示輸出調(diào)試信息。
  • timeout:?jiǎn)蝹€(gè)任務(wù)最大運(yùn)行時(shí)長(zhǎng),超時(shí)將引發(fā)TimeOutError。僅適用于n_jobs不為1的情況。
  • batch_size:當(dāng)Parallel類執(zhí)行任務(wù)時(shí),會(huì)將任務(wù)分批處理。batch_size參數(shù)決定了每個(gè)批次中包含的任務(wù)數(shù)。
  • pre_dispatch: 用來(lái)決定在并行計(jì)算開始之前,每個(gè)批次有多少個(gè)任務(wù)會(huì)被預(yù)先準(zhǔn)備好并等待被分配給單個(gè)工作進(jìn)程。默認(rèn)值為“2*n_jobs”,表示并行計(jì)算時(shí)可以使用2倍工作進(jìn)程的任務(wù)數(shù)量。
  • temp_folder:指定臨時(shí)文件的存儲(chǔ)路徑。
  • max_nbytes:傳遞給工作程序的數(shù)組大小的閾值。
  • require:對(duì)運(yùn)行任務(wù)的要求,可選None和sharedmem。sharedmem表示將使用共享內(nèi)存來(lái)執(zhí)行并行任務(wù),但會(huì)影響計(jì)算性能。

簡(jiǎn)單示例

以下代碼展示了單線程直接運(yùn)行計(jì)算密集型任務(wù)結(jié)果:

from joblib import Parallel, delayed
import numpy as np
import time
start = time.time()
# 定義一個(gè)計(jì)算密集型函數(shù)
def compute_heavy_task(data):
    # 模擬處理時(shí)間
    time.sleep(1)
    # 數(shù)值計(jì)算
    result = np.sum(np.square(data))
    return result
# 生成一些模擬數(shù)據(jù)
# 設(shè)置隨機(jī)數(shù)生成器的種子
np.random.seed(42) 
data = np.random.rand(10, 1000)  # 10個(gè)1000維的向量
results = [compute_heavy_task(d) for d in data]
# 打印結(jié)果的和
print(f"結(jié)果: {sum(results)}")
print(f"耗時(shí):{time.time()-start}s")

結(jié)果: 3269.16485027708
耗時(shí):10.101513624191284s

以下代碼展示利用Parallel類創(chuàng)建多進(jìn)程運(yùn)行計(jì)算密集型任務(wù)結(jié)果:

from joblib import Parallel, delayed
import numpy as np
import time
start = time.time()
# 定義一個(gè)計(jì)算密集型函數(shù)
def compute_heavy_task(data):
    # 模擬處理時(shí)間
    time.sleep(1)
    # 數(shù)值計(jì)算
    result = np.sum(np.square(data))
    return result
# 設(shè)置隨機(jī)數(shù)生成器的種子
np.random.seed(42) 
# 生成一些模擬數(shù)據(jù)
data = np.random.rand(10, 1000)  # 10個(gè)1000維的向量
# 使用Parallel來(lái)并行執(zhí)行任務(wù)
results = Parallel(n_jobs=8, return_as="generator")(delayed(compute_heavy_task)(d) for d in data)
# 打印結(jié)果的和
print(f"結(jié)果: {sum(results)}")
print(f"耗時(shí):{time.time()-start}s")

結(jié)果: 3269.16485027708
耗時(shí):2.381772041320801s

可以看到j(luò)oblib庫(kù)利用多進(jìn)程技術(shù)顯著提高了任務(wù)執(zhí)行的效率。然而,當(dāng)面對(duì)I/O密集型任務(wù)或執(zhí)行時(shí)間極短的任務(wù)時(shí),多線程或多進(jìn)程的優(yōu)勢(shì)可能并不明顯。這是因?yàn)榫€程創(chuàng)建和上下文切換的開銷有時(shí)可能超過任務(wù)本身的執(zhí)行時(shí)間。以上述的compute_heavy_task函數(shù)為例,如果移除了其中的time.sleep函數(shù),多進(jìn)程執(zhí)行所需的時(shí)間將會(huì)顯著增加。

此外獲取當(dāng)前系統(tǒng)的cpu核心數(shù)(邏輯處理器)代碼如下:

import joblib
# 獲取當(dāng)前系統(tǒng)的cpu核心數(shù)
n_cores = joblib.cpu_count()
print(f'系統(tǒng)的核心數(shù)是:{n_cores}')

系統(tǒng)的核心數(shù)是:16

不同并行方式對(duì)比

以下代碼展示了不同并行方式在Parallel類中的應(yīng)用。默認(rèn)使用loky多進(jìn)程:

# 使用loky多進(jìn)程
from joblib import Parallel, delayed
import numpy as np
import time
start = time.time()
# 定義一個(gè)計(jì)算密集型函數(shù)
def compute_heavy_task(data):
    # 模擬處理時(shí)間
    time.sleep(1)
    # 數(shù)值計(jì)算
    result = np.sum(np.square(data))
    return result
# 生成一些模擬數(shù)據(jù)
data = np.random.rand(10, 1000)  # 10個(gè)1000維的向量
results = Parallel(n_jobs=8, return_as="generator", backend='loky')(delayed(compute_heavy_task)(d) for d in data)
# 打印結(jié)果的和
print(f"結(jié)果: {sum(results)}")
print(f"耗時(shí):{time.time()-start}s")

結(jié)果: 3382.3336437893217
耗時(shí):2.042675256729126s

以下代碼展示了threading多線程的使用,注意由于Python的全局解釋器鎖(GIL)確保在任何時(shí)刻只有一個(gè)線程執(zhí)行Python字節(jié)碼。這表明即使在多核處理器上,Python的線程也無(wú)法實(shí)現(xiàn)真正的并行計(jì)算。然而,當(dāng)涉及到處理I/O密集型任務(wù)或需要快速響應(yīng)的小規(guī)模任務(wù)時(shí),多線程依然具有優(yōu)勢(shì):

# 使用threading多線程
start = time.time()
results = Parallel(n_jobs=8, return_as="generator", backend = 'threading')(delayed(compute_heavy_task)(d) for d in data)
# 打印結(jié)果的和
print(f"結(jié)果: {sum(results)}")
print(f"耗時(shí):{time.time()-start}s")

結(jié)果: 3382.3336437893217
耗時(shí):2.040527105331421s

以下代碼展示了multiprocessing多進(jìn)程的使用,注意Windows下需要將multiprocessing相關(guān)代碼放在main函數(shù)中:

from joblib import Parallel, delayed
import numpy as np
import time
# 定義一個(gè)計(jì)算密集型函數(shù)
def compute_heavy_task(data):
    # 模擬處理時(shí)間
    time.sleep(1)
    # 數(shù)值計(jì)算
    result = np.sum(np.square(data))
    return result
def main():
    start = time.time()
    # 生成一些模擬數(shù)據(jù)
    data = np.random.rand(10, 1000)  # 10個(gè)1000維的向量
    # multiprocessing不支持返回rgenerator
    results = Parallel(n_jobs=8, return_as="list",  backend='multiprocessing')(delayed(compute_heavy_task)(d) for d in data)
    # 打印結(jié)果的和
    print(f"結(jié)果: {sum(results)}")
    print(f"耗時(shí):{time.time()-start}s")
if __name__ == '__main__':
    main()

結(jié)果: 3304.6651996375645
耗時(shí):2.4303956031799316s

以下是loky、threadingmultiprocessing的一些關(guān)鍵特性對(duì)比:

特性/庫(kù)lokythreadingmultiprocessing
適用平臺(tái)跨平臺(tái)跨平臺(tái)跨平臺(tái),但Windows上存在限制
進(jìn)程/線程模型進(jìn)程線程進(jìn)程
GIL影響無(wú)無(wú)
適用場(chǎng)景CPU密集型任務(wù)I/O密集型任務(wù)CPU密集型任務(wù)
啟動(dòng)開銷較小較小較大
內(nèi)存使用較高較低較高
進(jìn)程間通信通過管道、隊(duì)列等通過共享數(shù)據(jù)結(jié)構(gòu)通過管道、隊(duì)列等
線程間通信共享數(shù)據(jù)結(jié)構(gòu)共享數(shù)據(jù)結(jié)構(gòu)不適用
異常處理進(jìn)程間獨(dú)立線程間共享進(jìn)程間獨(dú)立
調(diào)試難度較高較低較高
適用框架通用通用通用

Python中線程和進(jìn)程簡(jiǎn)單對(duì)比如下:

  • 資源共享:線程共享同一進(jìn)程的內(nèi)存和資源,而進(jìn)程擁有獨(dú)立的內(nèi)存空間。
  • GIL影響:線程受GIL限制,進(jìn)程不受GIL限制。
  • 開銷:線程的創(chuàng)建和切換開銷小,進(jìn)程的創(chuàng)建和切換開銷大。
  • 適用性:線程適合I/O密集型任務(wù),進(jìn)程適合CPU密集型任務(wù)。
  • 通信:線程間通信簡(jiǎn)單但需要處理同步問題,進(jìn)程間通信復(fù)雜但天然隔離。

在實(shí)際應(yīng)用中,選擇使用線程還是進(jìn)程取決于任務(wù)的特性和性能需求。如果任務(wù)主要是I/O密集型,使用線程可以提高性能;如果任務(wù)是CPU密集型,使用進(jìn)程可以更好地利用多核處理器的計(jì)算能力。

共享內(nèi)存

默認(rèn)情況下,Parallel類執(zhí)行任務(wù)時(shí)各個(gè)任務(wù)不共享內(nèi)存,如下所示:

from joblib import Parallel, delayed
shared_set = set()
def collect(x):
   shared_set.add(x)
Parallel(n_jobs=2)(delayed(collect)(i) for i in range(5))
print(sorted(shared_set))

[]

通過設(shè)置require='sharedmem'可以實(shí)現(xiàn)內(nèi)存共享:

# require='sharedmem'表示需要共享內(nèi)存,以確保多個(gè)進(jìn)程可以訪問shared_set集合
Parallel(n_jobs=2, require='sharedmem')(delayed(collect)(i) for i in range(5))
print(sorted(shared_set))

[0, 1, 2, 3, 4]

上下文管理器

一些算法需要對(duì)一個(gè)并行函數(shù)進(jìn)行多次連續(xù)調(diào)用,但在循環(huán)中多次調(diào)用joblib.Parallel是次優(yōu)的,因?yàn)檫@將多次創(chuàng)建和銷毀一組工作進(jìn)程,從而導(dǎo)致顯著的性能開銷。

對(duì)于這種情況,使用joblib.Parallel類的上下文管理器API更為高效,可以重用同一組工作進(jìn)程進(jìn)行多次調(diào)用joblib.Parallel對(duì)象。如下所示:

from joblib import Parallel, delayed
import math
with Parallel(n_jobs=2) as parallel:
   accumulator = 0.
   n_iter = 0
   while accumulator < 1000:
       results = parallel(delayed(math.sqrt)(accumulator + i ** 2) for i in range(5))
       accumulator += sum(results)
       n_iter += 1
print(accumulator, n_iter)  

1136.5969161564717 14

parallel_config

Joblib提供parallel_config類用于配置并行執(zhí)行的參數(shù),比如并行的后端類型、批處理大小等,這些配置可以影響后續(xù)所有的parallel實(shí)例。它通常在調(diào)用Parallel類之前使用。關(guān)于parallel_config使用見:parallel_config

1.3 序列化

joblib.dump()和joblib.load()提供了一種替代pickle庫(kù)的方法,可以高效地序列化處理包含大量數(shù)據(jù)的任意Python對(duì)象,特別是大型的NumPy數(shù)組。關(guān)于pickle庫(kù)使用見:Python數(shù)據(jù)序列化模塊pickle使用筆記 。兩者效果對(duì)比見:

特點(diǎn)picklejoblib
性能一般針對(duì)NumPy數(shù)組等大數(shù)據(jù)類型有優(yōu)化,通常更快
并行處理不支持內(nèi)置并行處理功能,可以加速任務(wù)
內(nèi)存映射不支持支持內(nèi)存映射,可以高效處理大文件
壓縮支持支持壓縮,可以減少存儲(chǔ)空間
附加功能提供了一些額外的功能,如緩存、延遲加載等

以下代碼展示了joblib.dump的基本使用:

from tempfile import mkdtemp
# 使用mkdtemp創(chuàng)建一個(gè)臨時(shí)目錄,并將目錄路徑存儲(chǔ)在變量savedir中。
savedir = mkdtemp(dir='./')
import os
# 文件保存路徑
filename = os.path.join(savedir, 'test.joblib')
import numpy as np
import pandas as pd
import joblib
# 創(chuàng)建一個(gè)要持久化的字典
to_persist = [('a', [1, 2, 3]), ('b', np.arange(10)), ('c', pd.DataFrame(np.ones((5,5))))]
# 使用joblib.dump函數(shù)將to_persist字典序列化并保存到filename指定的文件中
# 注意pickle庫(kù)無(wú)法序列化numpy數(shù)據(jù)
joblib.dump(to_persist, filename)

['./tmp82ms1z5w\\test.joblib']

使用joblib.load函數(shù)從指定的文件中加載之前保存的序列化數(shù)據(jù):

joblib.load(filename)

[('a', [1, 2, 3]),
 ('b', array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])),
 ('c',
       0    1    2    3    4
  0  1.0  1.0  1.0  1.0  1.0
  1  1.0  1.0  1.0  1.0  1.0
  2  1.0  1.0  1.0  1.0  1.0
  3  1.0  1.0  1.0  1.0  1.0
  4  1.0  1.0  1.0  1.0  1.0)]

joblib.dump和joblib.load函數(shù)還接受文件對(duì)象:

with open(filename, 'wb') as fo:  
    # 使用joblib將對(duì)象to_persist序列化并寫入文件
   joblib.dump(to_persist, fo)
with open(filename, 'rb') as fo:  
   joblib.load(fo)

此外joblib.dump也支持設(shè)置compress參數(shù)以實(shí)現(xiàn)數(shù)據(jù)壓縮:

# compress參數(shù)為壓縮級(jí)別,取值為0到9,值越大壓縮效果越好。為0時(shí)表示不壓縮,默認(rèn)值為0
joblib.dump(to_persist, filename, compress=1)
['./tmp82ms1z5w\\test.joblib']

默認(rèn)情況下,joblib.dump使用zlib壓縮方法,因?yàn)樗谒俣群痛疟P空間之間實(shí)現(xiàn)了最佳平衡。其他支持的壓縮方法包括“gzip”、“bz2”、“lzma”和“xz”。compress參數(shù)輸入帶有壓縮方法和壓縮級(jí)別就可以選擇不同壓縮方法:

joblib.dump(to_persist, filename + '.gz', compress=('gzip', 3))  
joblib.load(filename + '.gz')
joblib.dump(to_persist, filename + '.bz2', compress=('bz2', 5))  
joblib.load(filename + '.bz2')
[('a', [1, 2, 3]),
 ('b', array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])),
 ('c',
       0    1    2    3    4
  0  1.0  1.0  1.0  1.0  1.0
  1  1.0  1.0  1.0  1.0  1.0
  2  1.0  1.0  1.0  1.0  1.0
  3  1.0  1.0  1.0  1.0  1.0
  4  1.0  1.0  1.0  1.0  1.0)]

除了默認(rèn)壓縮方法,lz4壓縮算法也可以用于數(shù)據(jù)壓縮。前提是需要安裝lz4壓縮庫(kù):

pip install lz4

在這些壓縮方法中,lz4和默認(rèn)方法效果較好。lz4使用方式與其他壓縮方式一樣:

joblib.dump(to_persist, filename, compress=('lz4', 3))  
['./tmp82ms1z5w\\test.joblib']

???????2 實(shí)例

2.1 joblib緩存和并行

本實(shí)例展示了利用joblib緩存和并行來(lái)加速任務(wù)執(zhí)行。以下代碼展示了一個(gè)高耗時(shí)任務(wù):

# 導(dǎo)入time模塊,用于實(shí)現(xiàn)延時(shí)功能
import time
# 定義一個(gè)模擬耗時(shí)計(jì)算的函數(shù)
def costly_compute(data, column):
    # 休眠1秒,模擬耗時(shí)操作
    time.sleep(1)
    # 返回傳入數(shù)據(jù)的指定列
    return data[column]
# 定義一個(gè)計(jì)算數(shù)據(jù)列平均值的函數(shù)
def data_processing_mean(data, column):
    # 調(diào)用costly_compute函數(shù)獲取指定列的數(shù)據(jù)
    column_data = costly_compute(data, column)
    # 計(jì)算并返回該列數(shù)據(jù)的平均值
    return column_data.mean()
# 導(dǎo)入numpy庫(kù),并設(shè)置隨機(jī)數(shù)生成器的種子,以保證結(jié)果的可復(fù)現(xiàn)性
import numpy as np
rng = np.random.RandomState(42)
# 生成1000行4列的隨機(jī)數(shù)據(jù)矩陣
data = rng.randn(int(1000), 4)
# 記錄開始時(shí)間
start = time.time()
# 對(duì)數(shù)據(jù)的每一列計(jì)算平均值,并將結(jié)果存儲(chǔ)在results列表中
results = [data_processing_mean(data, col) for col in range(data.shape[1])]
# 記錄結(jié)束時(shí)間
stop = time.time()
# 打印處理過程的描述信息
print('\nSequential processing')
# 打印整個(gè)處理過程的耗時(shí)
print('Elapsed time for the entire processing: {:.2f} s'.format(stop - start))
Sequential processing
Elapsed time for the entire processing: 4.05 s

下段代碼演示了如何使用joblib庫(kù)來(lái)緩存和并行化計(jì)算上述任務(wù):

# 導(dǎo)入time模塊,用于模擬耗時(shí)操作。
import time
# 定義一個(gè)使用緩存的函數(shù),用于計(jì)算數(shù)據(jù)的均值。
def data_processing_mean_using_cache(data, column):
    return costly_compute_cached(data, column).mean()
# 從joblib庫(kù)導(dǎo)入Memory類,用于緩存函數(shù)的輸出。
from joblib import Memory
# 設(shè)置緩存的存儲(chǔ)位置和詳細(xì)程度
location = './cachedir'
memory = Memory(location, verbose=0)
# 使用Memory對(duì)象的cache方法來(lái)緩存costly_compute函數(shù)的輸出。
costly_compute_cached = memory.cache(costly_compute)
# 從joblib庫(kù)導(dǎo)入Parallel和delayed類,用于并行執(zhí)行函數(shù)。
from joblib import Parallel, delayed
# 記錄開始時(shí)間。
start = time.time()
# 使用Parallel類并行執(zhí)行data_processing_mean_using_cache函數(shù),對(duì)數(shù)據(jù)的每一列進(jìn)行處理。
results = Parallel(n_jobs=2)(
    delayed(data_processing_mean_using_cache)(data, col)  
    for col in range(data.shape[1])) 
# 記錄結(jié)束時(shí)間。
stop = time.time()
# 打印第一輪處理的耗時(shí)信息,包括緩存數(shù)據(jù)的時(shí)間。
print('\nFirst round - caching the data')
print('Elapsed time for the entire processing: {:.2f} s'.format(stop - start))
First round - caching the data
Elapsed time for the entire processing: 2.05 s

再次執(zhí)行相同的過程,可以看到結(jié)果被緩存而不是重新執(zhí)行函數(shù):

start = time.time()
results = Parallel(n_jobs=2)(
    delayed(data_processing_mean_using_cache)(data, col)
    for col in range(data.shape[1]))
stop = time.time()
print('\nSecond round - reloading from the cache')
print('Elapsed time for the entire processing: {:.2f} s'.format(stop - start))
# 如果不想使用緩存結(jié)果,可以清除緩存信息
memory.clear(warn=False)
Second round - reloading from the cache
Elapsed time for the entire processing: 0.02 s

???????2.2 序列化

以下示例展示了在joblib.Parallel中使用序列化內(nèi)存映射(numpy.memmap)。內(nèi)存映射可以將大型數(shù)據(jù)集分割成小塊,并在需要時(shí)將其加載到內(nèi)存中。這種方法可以減少內(nèi)存使用,并提高處理速度。

定義耗時(shí)函數(shù):

import numpy as np
data = np.random.random((int(1e7),))
window_size = int(5e5)
slices = [slice(start, start + window_size)
          for start in range(0, data.size - window_size, int(1e5))]
import time
def slow_mean(data, sl):
    time.sleep(0.01)
    return data[sl].mean()

以下代碼是直接調(diào)用函數(shù)的運(yùn)行結(jié)果:

tic = time.time()
results = [slow_mean(data, sl) for sl in slices]
toc = time.time()
print('\nElapsed time computing the average of couple of slices {:.2f} s'.format(toc - tic))
Elapsed time computing the average of couple of slices 1.49 s

以下代碼是調(diào)用Parallel類2個(gè)進(jìn)程運(yùn)行的結(jié)果,由于整體任務(wù)計(jì)算耗時(shí)較少。所以Parallel類并行計(jì)算并沒有比直接調(diào)用函數(shù)有太多速度優(yōu)勢(shì),因?yàn)檫M(jìn)程啟動(dòng)銷毀需要額外時(shí)間:

from joblib import Parallel, delayed
tic = time.time()
results = Parallel(n_jobs=2)(delayed(slow_mean)(data, sl) for sl in slices)
toc = time.time()
print('\nElapsed time computing the average of couple of slices {:.2f} s'.format(toc - tic))
Elapsed time computing the average of couple of slices 1.00 s

以下代碼提供了joblib.dump和load函數(shù)加速數(shù)據(jù)讀取。其中dump函數(shù)用于將data對(duì)象序列化并保存到磁盤上的文件中,同時(shí)創(chuàng)建了一個(gè)內(nèi)存映射,使得該文件可以像內(nèi)存數(shù)組一樣被訪問。當(dāng)程序再次加載這個(gè)文件時(shí),可以使用load函數(shù)以內(nèi)存映射模式打開:

import os 
from joblib import dump, load  # 從joblib庫(kù)導(dǎo)入dump和load函數(shù),用于創(chuàng)建和加載內(nèi)存映射文件
# 設(shè)置內(nèi)存映射文件的文件夾路徑
folder = './memmap'
os.makedirs(folder, exist_ok = True)
# 將內(nèi)存映射文件的名稱與路徑結(jié)合
data_filename_memmap = os.path.join(folder, 'data_memmap.joblib')
# 使用dump函數(shù)將數(shù)據(jù)對(duì)象'data'保存到內(nèi)存映射文件
dump(data, data_filename_memmap)
# 使用load函數(shù)加載內(nèi)存映射文件,mmap_mode='r'表示以只讀模式打開
data_ = load(data_filename_memmap, mmap_mode='r')
# 記錄開始時(shí)間
tic = time.time()
results = Parallel(n_jobs=2)(delayed(slow_mean)(data_, sl) for sl in slices)
# 記錄結(jié)束時(shí)間
toc = time.time()
print('\nElapsed time computing the average of couple of slices {:.2f} s\n'.format(toc - tic))  
import shutil
# 結(jié)束時(shí)刪除映射文件
try:
    shutil.rmtree(folder)
except: 
    pass
Elapsed time computing the average of couple of slices 0.77 s

???????2.3 內(nèi)存監(jiān)視

本實(shí)例展示不同并行方式的內(nèi)存消耗情況。

創(chuàng)建內(nèi)存監(jiān)視器

from psutil import Process
from threading import Thread
class MemoryMonitor(Thread):
    """在單獨(dú)的線程中監(jiān)控內(nèi)存使用情況(以MB為單位)。"""
    def __init__(self):
        super().__init__()  # 調(diào)用父類Thread的構(gòu)造函數(shù)
        self.stop = False  # 用于控制線程停止的標(biāo)記
        self.memory_buffer = []  # 用于存儲(chǔ)內(nèi)存使用記錄的列表
        self.start()  # 啟動(dòng)線程
    def get_memory(self):
        """獲取進(jìn)程及其子進(jìn)程的內(nèi)存使用情況。"""
        p = Process()  # 獲取當(dāng)前進(jìn)程
        memory = p.memory_info().rss  # 獲取當(dāng)前進(jìn)程的內(nèi)存使用量
        for c in p.children():  # 遍歷所有子進(jìn)程
            memory += c.memory_info().rss  # 累加子進(jìn)程的內(nèi)存使用量
        return memory
    def run(self):
        """線程運(yùn)行的主體方法,周期性地記錄內(nèi)存使用情況。"""
        memory_start = self.get_memory()  # 獲取初始內(nèi)存使用量
        while not self.stop:  # 當(dāng)未設(shè)置停止標(biāo)記時(shí)循環(huán)
            self.memory_buffer.append(self.get_memory() - memory_start)  # 記錄當(dāng)前內(nèi)存使用量與初始內(nèi)存使用量的差值
            time.sleep(0.2)  # 休眠0.2秒
    def join(self):
        """重寫join方法,設(shè)置停止標(biāo)記并等待線程結(jié)束。"""
        self.stop = True  # 設(shè)置停止標(biāo)記
        super().join()  # 調(diào)用父類方法等待線程結(jié)束

并行任務(wù)

結(jié)果返回list的并行任務(wù):

import time
import numpy as np
def return_big_object(i):
    """生成并返回一個(gè)大型NumPy數(shù)組對(duì)象。"""
    time.sleep(.1)  # 休眠0.1秒模擬耗時(shí)操作
    return i * np.ones((10000, 200), dtype=np.float64) 
def accumulator_sum(generator):
    """累加生成器生成的所有值,并打印進(jìn)度。"""
    result = 0
    for value in generator:
        result += value
        # print(".", end="", flush=True)  # 打印點(diǎn)號(hào)并刷新輸出
    # print("")  # 打印換行符
    return result
from joblib import Parallel, delayed
monitor = MemoryMonitor()  # 創(chuàng)建內(nèi)存監(jiān)控器實(shí)例,并啟動(dòng)監(jiān)視
print("Running tasks with return_as='list'...")  # 打印啟動(dòng)任務(wù)信息
res = Parallel(n_jobs=2, return_as="list")(
    delayed(return_big_object)(i) for i in range(150)  # 使用joblib的Parallel功能并行執(zhí)行任務(wù)
)
res = accumulator_sum(res)  # 累加結(jié)果
print('All tasks completed and reduced successfully.')  # 打印任務(wù)完成信息
# 報(bào)告內(nèi)存使用情況
del res  # 清理結(jié)果以避免內(nèi)存邊界效應(yīng)
monitor.join()  # 等待內(nèi)存監(jiān)控線程結(jié)束
peak = max(monitor.memory_buffer) / 1e9  # 計(jì)算峰值內(nèi)存使用量,并轉(zhuǎn)換為GB
print(f"Peak memory usage: {peak:.2f}GB")  # 打印峰值內(nèi)存使用量
Running tasks with return_as='list'...
All tasks completed and reduced successfully.
Peak memory usage: 2.44GB

如果改為輸出生成器,那么內(nèi)存使用量將會(huì)大大減少:

monitor_gen = MemoryMonitor()  # 創(chuàng)建內(nèi)存監(jiān)控器實(shí)例,并啟動(dòng)監(jiān)視
print("Running tasks with return_as='generator'...")  # 打印啟動(dòng)任務(wù)信息
res = Parallel(n_jobs=2, return_as="generator")(
    delayed(return_big_object)(i) for i in range(150)  
)
res = accumulator_sum(res)  # 累加結(jié)果
print('All tasks completed and reduced successfully.')  # 打印任務(wù)完成信息
# 報(bào)告內(nèi)存使用情況
del res  # 清理結(jié)果以避免內(nèi)存邊界效應(yīng)
monitor_gen.join()  # 等待內(nèi)存監(jiān)控線程結(jié)束
peak = max(monitor_gen.memory_buffer) / 1e9  # 計(jì)算峰值內(nèi)存使用量,并轉(zhuǎn)換為GB
print(f"Peak memory usage: {peak:.2f}GB")  # 打印峰值內(nèi)存使用量
Running tasks with return_as='generator'...
All tasks completed and reduced successfully.
Peak memory usage: 0.19GB

下圖展示了以上兩種方法的內(nèi)存消耗情況,第一種情況涉及到將所有結(jié)果存儲(chǔ)在內(nèi)存中,直到處理完成,這可能導(dǎo)致內(nèi)存使用量隨著時(shí)間線性增長(zhǎng)。而第二種情況generator則涉及到流式處理,即結(jié)果被實(shí)時(shí)處理,因此不需要同時(shí)在內(nèi)存中存儲(chǔ)所有結(jié)果,從而減少了內(nèi)存使用的需求:

import matplotlib.pyplot as plt
plt.figure(0)
plt.semilogy(
    np.maximum.accumulate(monitor.memory_buffer),
    label='return_as="list"'
)
plt.semilogy(
    np.maximum.accumulate(monitor_gen.memory_buffer),
    label='return_as="generator"'
)
plt.xlabel("Time")
plt.xticks([], [])
plt.ylabel("Memory usage")
plt.yticks([1e7, 1e8, 1e9], ['10MB', '100MB', '1GB'])
plt.legend()
plt.show()

進(jìn)一步節(jié)省內(nèi)存

前一個(gè)例子中的生成器是保持任務(wù)提交的順序的。如果某些進(jìn)程任務(wù)提交晚,但比其他任務(wù)更早完成。相應(yīng)的結(jié)果會(huì)保持在內(nèi)存中,以等待其他任務(wù)完成。如果任務(wù)對(duì)結(jié)果返回順序無(wú)要求,例如最后只是對(duì)所有結(jié)果求和,可以使用generator_unordered減少內(nèi)存消耗。如下所示:

# 創(chuàng)建一個(gè)每個(gè)任務(wù)耗時(shí)可能不同的處理函數(shù)
def return_big_object_delayed(i):
    if (i + 20) % 60:
        time.sleep(0.1)
    else:
        time.sleep(5)
    return i * np.ones((10000, 200), dtype=np.float64)

返回為generator格式的內(nèi)存使用:

monitor_delayed_gen = MemoryMonitor()
print("Create result generator on delayed tasks with return_as='generator'...")
res = Parallel(n_jobs=2, return_as="generator")(
    delayed(return_big_object_delayed)(i) for i in range(150)
)
res = accumulator_sum(res)
print('All tasks completed and reduced successfully.')
del res 
monitor_delayed_gen.join()
peak = max(monitor_delayed_gen.memory_buffer) / 1e6
print(f"Peak memory usage: {peak:.2f}MB")
Create result generator on delayed tasks with return_as='generator'...
All tasks completed and reduced successfully.
Peak memory usage: 784.23MB

返回為generator_unordered格式的內(nèi)存使用:

monitor_delayed_gen_unordered = MemoryMonitor()
print(
  "Create result generator on delayed tasks with "
  "return_as='generator_unordered'..."
)
res = Parallel(n_jobs=2, return_as="generator_unordered")(
    delayed(return_big_object_delayed)(i) for i in range(150)
)
res = accumulator_sum(res)
print('All tasks completed and reduced successfully.')
del res 
monitor_delayed_gen_unordered.join()
peak = max(monitor_delayed_gen_unordered.memory_buffer) / 1e6
print(f"Peak memory usage: {peak:.2f}MB")
Create result generator on delayed tasks with return_as='generator_unordered'...
All tasks completed and reduced successfully.
Peak memory usage: 175.22MB

內(nèi)存使用結(jié)果對(duì)比如下?;趃enerator_unordered選項(xiàng)在執(zhí)行任務(wù)時(shí),能夠獨(dú)立地處理每個(gè)任務(wù),而不需要依賴于其他任務(wù)的完成狀態(tài)。但是要注意的是由于系統(tǒng)負(fù)載、后端實(shí)現(xiàn)等多種可能影響任務(wù)執(zhí)行順序的因素,結(jié)果的返回順序是不確定的:

plt.figure(1)
plt.semilogy(
    np.maximum.accumulate(monitor_delayed_gen.memory_buffer),
    label='return_as="generator"'
)
plt.semilogy(
    np.maximum.accumulate(monitor_delayed_gen_unordered.memory_buffer),
    label='return_as="generator_unordered"'
)
plt.xlabel("Time")
plt.xticks([], [])
plt.ylabel("Memory usage")
plt.yticks([1e7, 1e8, 1e9], ['10MB', '100MB', '1GB'])
plt.legend()
plt.show()

3 參考

本文來(lái)自博客園,作者:落痕的寒假,轉(zhuǎn)載請(qǐng)注明原文鏈接:https://www.cnblogs.com/luohenyueji/p/18351959

到此這篇關(guān)于Python并行計(jì)算庫(kù)Joblib使用指北的文章就介紹到這了,更多相關(guān)Python Joblib使用內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • python使用matplotlib繪制雷達(dá)圖

    python使用matplotlib繪制雷達(dá)圖

    這篇文章主要為大家詳細(xì)介紹了python使用matplotlib繪制雷達(dá)圖,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2019-10-10
  • Django框架ORM操作數(shù)據(jù)庫(kù)不生效問題示例解決方法

    Django框架ORM操作數(shù)據(jù)庫(kù)不生效問題示例解決方法

    本文詳細(xì)描述使用Django 的ORM框架操作PostgreSQL數(shù)據(jù)庫(kù)刪除不生效問題的定位過程及解決方案,并總結(jié)使用ORM框架操作數(shù)據(jù)庫(kù)不生效的問題的通用定位方法,感興趣的朋友跟隨小編一起看看吧
    2023-01-01
  • 如何在Python中將字符串轉(zhuǎn)換為數(shù)組詳解

    如何在Python中將字符串轉(zhuǎn)換為數(shù)組詳解

    最近在用Python,做一個(gè)小腳本,有個(gè)操作就是要把內(nèi)容換成數(shù)組對(duì)象再進(jìn)行相關(guān)操作,下面這篇文章主要給大家介紹了關(guān)于如何在Python中將字符串轉(zhuǎn)換為數(shù)組的相關(guān)資料,需要的朋友可以參考下
    2022-12-12
  • Python使用Numpy實(shí)現(xiàn)Kmeans算法的步驟詳解

    Python使用Numpy實(shí)現(xiàn)Kmeans算法的步驟詳解

    將物理或抽象對(duì)象的集合分成由類似的對(duì)象組成的多個(gè)類的過程被稱為聚類。這篇文章主要介紹了Python使用Numpy實(shí)現(xiàn)Kmeans算法,需要的朋友可以參考下
    2021-11-11
  • python實(shí)現(xiàn)本地批量ping多個(gè)IP的方法示例

    python實(shí)現(xiàn)本地批量ping多個(gè)IP的方法示例

    這篇文章主要介紹了python實(shí)現(xiàn)本地批量ping多個(gè)IP的方法示例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-08-08
  • 獲取python文件擴(kuò)展名和文件名方法

    獲取python文件擴(kuò)展名和文件名方法

    本篇文章通過python寫一個(gè)獲取python文件擴(kuò)展名和文件名的功能,并分享了代碼,有興趣的參考下。
    2018-02-02
  • 利用Python來(lái)控制終端打印字體的顏色和格式

    利用Python來(lái)控制終端打印字體的顏色和格式

    使用python編程時(shí),改變控制臺(tái)或終端中輸出字體的顏色和格式,會(huì)顯著提升代碼質(zhì)量,快速幫助我們定位問題和鎖定重要輸出,但是一般情況下,python控制臺(tái)輸出的字體默認(rèn)為白色,所以這篇文章給大家介紹了如何利用Python控制終端打印字體的顏色和格式,需要的朋友可以參考下
    2024-06-06
  • django數(shù)據(jù)庫(kù)自動(dòng)重連的方法實(shí)例

    django數(shù)據(jù)庫(kù)自動(dòng)重連的方法實(shí)例

    這篇文章主要給大家介紹了關(guān)于django數(shù)據(jù)庫(kù)自動(dòng)重連的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用django具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-07-07
  • Python連接MySQL數(shù)據(jù)庫(kù)并查找表信息

    Python連接MySQL數(shù)據(jù)庫(kù)并查找表信息

    本文主要介紹了Python連接MySQL數(shù)據(jù)庫(kù)并查找表信息,通過使用Python中的MySQL Connector模塊,連接到MySQL服務(wù)器并執(zhí)行SQL查詢語(yǔ)句,可以獲取表的結(jié)構(gòu)、列信息、行數(shù)據(jù)等,感興趣的可以了解一下
    2023-08-08
  • Django使用rest_framework寫出API

    Django使用rest_framework寫出API

    這篇文章主要介紹了Django使用rest_framework寫出API,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-05-05

最新評(píng)論