詳解pandas apply 并行處理的幾種方法
1. pandarallel (pip install )
對于一個帶有Pandas DataFrame df的簡單用例和一個應(yīng)用func的函數(shù),只需用parallel_apply替換經(jīng)典的apply。
from pandarallel import pandarallel # Initialization pandarallel.initialize() # Standard pandas apply df.apply(func) # Parallel apply df.parallel_apply(func)
注意,如果不想并行化計(jì)算,仍然可以使用經(jīng)典的apply方法。
另外可以通過在initialize函數(shù)中傳遞progress_bar=True來顯示每個工作CPU的一個進(jìn)度條。
2. joblib (pip install )
https://pypi.python.org/pypi/joblib
# Embarrassingly parallel helper: to make it easy to write readable parallel code and debug it quickly from math import sqrt from joblib import Parallel, delayed def test(): start = time.time() result1 = Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10000)) end = time.time() print(end-start) result2 = Parallel(n_jobs=8)(delayed(sqrt)(i**2) for i in range(10000)) end2 = time.time() print(end2-end)
-------輸出結(jié)果----------
0.4434356689453125
0.6346755027770996
3. multiprocessing
import multiprocessing as mp with mp.Pool(mp.cpu_count()) as pool: df['newcol'] = pool.map(f, df['col']) multiprocessing.cpu_count()
返回系統(tǒng)的CPU數(shù)量。
該數(shù)量不同于當(dāng)前進(jìn)程可以使用的CPU數(shù)量。可用的CPU數(shù)量可以由 len(os.sched_getaffinity(0)) 方法獲得。
可能引發(fā) NotImplementedError 。
4. 幾種方法性能比較
(1)代碼
import sys import time import pandas as pd import multiprocessing as mp from joblib import Parallel, delayed from pandarallel import pandarallel from tqdm import tqdm, tqdm_notebook def get_url_len(url): url_list = url.split(".") time.sleep(0.01) # 休眠0.01秒 return len(url_list) def test1(data): """ 不進(jìn)行任何優(yōu)化 """ start = time.time() data['len'] = data['url'].apply(get_url_len) end = time.time() cost_time = end - start res = sum(data['len']) print("res:{}, cost time:{}".format(res, cost_time)) def test_mp(data): """ 采用mp優(yōu)化 """ start = time.time() with mp.Pool(mp.cpu_count()) as pool: data['len'] = pool.map(get_url_len, data['url']) end = time.time() cost_time = end - start res = sum(data['len']) print("test_mp \t res:{}, cost time:{}".format(res, cost_time)) def test_pandarallel(data): """ 采用pandarallel優(yōu)化 """ start = time.time() pandarallel.initialize() data['len'] = data['url'].parallel_apply(get_url_len) end = time.time() cost_time = end - start res = sum(data['len']) print("test_pandarallel \t res:{}, cost time:{}".format(res, cost_time)) def test_delayed(data): """ 采用delayed優(yōu)化 """ def key_func(subset): subset["len"] = subset["url"].apply(get_url_len) return subset start = time.time() data_grouped = data.groupby(data.index) # data_grouped 是一個可迭代的對象,那么就可以使用 tqdm 來可視化進(jìn)度條 results = Parallel(n_jobs=8)(delayed(key_func)(group) for name, group in tqdm(data_grouped)) data = pd.concat(results) end = time.time() cost_time = end - start res = sum(data['len']) print("test_delayed \t res:{}, cost time:{}".format(res, cost_time)) if __name__ == '__main__': columns = ['title', 'url', 'pub_old', 'pub_new'] temp = pd.read_csv("./input.csv", names=columns, nrows=10000) data = temp """ for i in range(99): data = data.append(temp) """ print(len(data)) """ test1(data) test_mp(data) test_pandarallel(data) """ test_delayed(data)
(2) 結(jié)果輸出
1k
res:4338, cost time:0.0018074512481689453
test_mp res:4338, cost time:0.2626469135284424
test_pandarallel res:4338, cost time:0.3467681407928467
1w
res:42936, cost time:0.008773326873779297
test_mp res:42936, cost time:0.26111721992492676
test_pandarallel res:42936, cost time:0.33237743377685547
10w
res:426742, cost time:0.07944369316101074
test_mp res:426742, cost time:0.294996976852417
test_pandarallel res:426742, cost time:0.39208269119262695
100w
res:4267420, cost time:0.8074917793273926
test_mp res:4267420, cost time:0.9741342067718506
test_pandarallel res:4267420, cost time:0.6779992580413818
1000w
res:42674200, cost time:8.027287006378174
test_mp res:42674200, cost time:7.751036882400513
test_pandarallel res:42674200, cost time:4.404983282089233
在get_url_len函數(shù)里加個sleep語句(模擬復(fù)雜邏輯),數(shù)據(jù)量為1k,運(yùn)行結(jié)果如下:
1k
res:4338, cost time:10.054503679275513
test_mp res:4338, cost time:0.35697126388549805
test_pandarallel res:4338, cost time:0.43415403366088867
test_delayed res:4338, cost time:2.294757843017578
5. 小結(jié)
(1)如果數(shù)據(jù)量比較少,并行處理比單次執(zhí)行效率更慢;
(2)如果apply的函數(shù)邏輯簡單,并行處理比單次執(zhí)行效率更慢。
6. 問題及解決方法
(1)ImportError: This platform lacks a functioning sem_open implementation, therefore, the required synchronization primitives needed will not function, see issue 3770.
https://www.jianshu.com/p/0be1b4b27bde
(2)Linux查看物理CPU個數(shù)、核數(shù)、邏輯CPU個數(shù)
https://lover.blog.csdn.net/article/details/113951192
(3) 進(jìn)度條的使用
http://www.dbjr.com.cn/article/206219.htm
到此這篇關(guān)于詳解pandas apply 并行處理的幾種方法的文章就介紹到這了,更多相關(guān)pandas apply 并行處理內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python+mediapipe+opencv實(shí)現(xiàn)手部關(guān)鍵點(diǎn)檢測功能(手勢識別)
這篇文章主要介紹了python+mediapipe+opencv實(shí)現(xiàn)手部關(guān)鍵點(diǎn)檢測功能(手勢識別),本文僅僅簡單介紹了mediapipe的使用,而mediapipe提供了大量關(guān)于圖像識別等的方法,需要的朋友可以參考下2022-01-01django+js+ajax實(shí)現(xiàn)刷新頁面的方法
這篇文章主要介紹了django+js+ajax實(shí)現(xiàn)刷新頁面的方法,結(jié)合實(shí)例形式分析了django實(shí)現(xiàn)ajax刷新頁面功能的具體步驟與相關(guān)操作技巧,需要的朋友可以參考下2017-05-05使用python爬蟲實(shí)現(xiàn)網(wǎng)絡(luò)股票信息爬取的demo
下面小編就為大家分享一篇使用python爬蟲實(shí)現(xiàn)網(wǎng)絡(luò)股票信息爬取的demo,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-01-01分析Python中設(shè)計(jì)模式之Decorator裝飾器模式的要點(diǎn)
這篇文章主要介紹了Python中設(shè)計(jì)模式之Decorator裝飾器模式模式,文中詳細(xì)地講解了裝飾對象的相關(guān)加鎖問題,需要的朋友可以參考下2016-03-03python pandas合并Sheet,處理列亂序和出現(xiàn)Unnamed列的解決
這篇文章主要介紹了python pandas合并Sheet,處理列亂序和出現(xiàn)Unnamed列的解決方案,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-03-03Python圖像處理庫Pillow的簡單實(shí)現(xiàn)
本文主要介紹了Python圖像處理庫Pillow的簡單實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-06-06