Python詳解復(fù)雜CSV文件處理方法
項(xiàng)目簡介
鑒于項(xiàng)目保密的需要,不便透露太多項(xiàng)目的信息,因此,簡單介紹一下項(xiàng)目存在的難點(diǎn):
- 海量數(shù)據(jù):項(xiàng)目是對CSV文件中的數(shù)據(jù)進(jìn)行處理,而特點(diǎn)是數(shù)據(jù)量大...真的大?。?!拿到的第一個CSV示例文件是110多萬行(小CASE),而第二個文件就到了4500萬行,等到第三個文件......好吧,一直沒見到第三個完整示例文件,因?yàn)樘罅耍瑩?jù)說是第二個示例文件的40多倍,大概二十億行......
- 業(yè)務(wù)邏輯復(fù)雜:項(xiàng)目是需要對CSV文件的每一行數(shù)據(jù)的各種組合可能性進(jìn)行判斷,而判斷的業(yè)務(wù)邏輯較為復(fù)雜,如何在解決復(fù)雜邏輯的同時保證較高的處理效率是難點(diǎn)之一。
項(xiàng)目筆記與心得
1.分批處理與多進(jìn)程及多線程加速
- 因?yàn)閿?shù)據(jù)量太大,肯定是要分批對數(shù)據(jù)進(jìn)行處理,否則,效率低不談,大概率也沒有足夠的內(nèi)存能夠支撐,需要用到chunksize,此外,為了節(jié)約內(nèi)存,以及提高處理效率,可以將文本類的數(shù)據(jù)存儲為“category”格式:
- 項(xiàng)目整體是計(jì)算密集型的任務(wù),因此,需要用到多進(jìn)程,充分利用CPU的多核性能;
- 多線程進(jìn)行讀取與寫入,其中,寫入使用to_csv的增量寫入方法,mode參數(shù)設(shè)置為'a';
- 多進(jìn)程與多線程開啟一般為死循環(huán),需要在合適的位置,放入結(jié)束循環(huán)的信號,以便處理完畢后退出多進(jìn)程或多線程
"""鑒于項(xiàng)目保密需要,以下代碼僅為示例""" import time import pathlib as pl import pandas as pd from threading import Thread from multiprocessing import Queue, Process, cpu_count # 導(dǎo)入多線程Thread,多進(jìn)程的隊(duì)列Queue,多進(jìn)程Process,CPU核數(shù)cpu_count # 存放分段讀取的數(shù)據(jù)隊(duì)列,注:maxsize控制隊(duì)列的最大數(shù)量,避免一次性讀取到內(nèi)存中的數(shù)據(jù)量太大 data_queue = Queue(maxsize=cpu_count() * 2) # 存放等待寫入磁盤的數(shù)據(jù)隊(duì)列 write_queue = Queue() def read_data(path: pl.Path, data_queue: Queue, size: int = 10000): """ 讀取數(shù)據(jù)放入隊(duì)列的方法 :return: """ data_obj = pd.read_csv(path, sep=',', header=0, chunksize=size, dtype='category') for idx, df in enumerate(data_obj): while data_queue.full(): # 如果隊(duì)列滿了,那就等待 time.sleep(1) data_queue.put((idx + 1, df)) data_queue.put((None, None)) # 放入結(jié)束信號 def write_data(out_path: pl.Path, write_queue: Queue): """ 將數(shù)據(jù)增量寫入CSV的方法 :return: """ while True: while write_queue.empty(): time.sleep(1) idx, df = write_queue.get() if df is None: return # 結(jié)束退出 df.to_csv(out_path, mode='a', header=None, index=False, encoding='ansi') # 輸出CSV def parse_data(data_queue: Queue, write_queue: Queue): """ 從隊(duì)列中取出數(shù)據(jù),并加工的方法 :return: """ while True: while write_queue.empty(): time.sleep(1) idx, df = data_queue.get() if df is None: # 如果是空的結(jié)束信號,則結(jié)束退出進(jìn)程, # 特別注意結(jié)束前把結(jié)束信號放回隊(duì)列,以便其他進(jìn)程也能接收到結(jié)束信號?。?! data_queue.put((idx, df)) return """處理數(shù)據(jù)的業(yè)務(wù)邏輯略過""" write_queue.put((idx, df)) # 將處理后的數(shù)據(jù)放入寫隊(duì)列 # 創(chuàng)建一個讀取數(shù)據(jù)的線程 read_pool = Thread(target=read_data, args=(read_data_queue, *args)) read_pool.start() # 開啟讀取線程 # 創(chuàng)建一個增量寫入CSV數(shù)據(jù)的線程 write_pool = Thread(target=write_data, args=(write_data_queue, *args)) write_pool.start() # 開啟寫進(jìn)程 pools = [] # 存放解析進(jìn)程的隊(duì)列 for i in range(cpu_count()): # 循環(huán)開啟多進(jìn)程,不確定開多少個進(jìn)程合適的情況下,那么按CPU的核數(shù)開比較合理 pool = Process(target=parse_data, args=(read_data_queue, write_data_queue, *args)) pool.start() # 啟動進(jìn)程 pools.append(pool) # 加入隊(duì)列 for pool in pools: pool.join() # 等待所有解析進(jìn)程完成 # 所有解析進(jìn)程完成后,在寫隊(duì)列放入結(jié)束寫線程的信號 write_data_queue.put((None, None)) write_pool.join() # 等待寫線程結(jié)束 print('任務(wù)完成')
2.優(yōu)化算法提高效率
將類對象存入dataframe列
在嘗試了n種方案之后,最終使用了將類對象存到dataframe的列中,使用map方法,運(yùn)行類方法,最后,將運(yùn)行結(jié)果展開到多列中的方式。該方案本項(xiàng)目中取得了最佳的處理效率。
"""鑒于保密需要,以下代碼僅為示例""" class Obj: def __init__(self, ser: pd.Series): """ 初始化類對象 :param ser: 傳入series """ self.ser = ser # 行數(shù)據(jù) self.attrs1 = [] # 屬性1 self.attrs2 = [] # 屬性2 self.attrs3 = [] # 屬性3 def __repr__(self): """ 自定義輸出 """ attrs1 = '_'.join([str(a) for a in self.attrs1]) attrs2 = '_'.join([str(a) for a in self.attrs2]) attrs3 = '_'.join([str(a) for a in self.attrs3]) return '_'.join([attrs1, attrs2, attrs3]) def run(self): """運(yùn)行業(yè)務(wù)邏輯""" # 創(chuàng)建obj列,存入類對象 data['obj'] = data.apply(lambda x: Obj(x), axis=1) # 運(yùn)行obj列中的類方法獲得判斷結(jié)果 data['obj'] = data['obj'].map(lambda x: x.run()) # 鏈?zhǔn)秸{(diào)用,1將類對象文本化->2拆分到多列->3刪除空列->4轉(zhuǎn)換為category格式 data[['col1', 'col2', 'col3', ...省略]] = data['obj'].map(str).str.split('_', expand=True).dropna(axis=1).astype('category') # 刪除obj列 data.drop(columns='obj', inplace=True)
減少計(jì)算次數(shù)以提高運(yùn)行效率
在整個優(yōu)化過程中,對運(yùn)行效率產(chǎn)生最大優(yōu)化效果的有兩項(xiàng):
- 一是改變遍歷算法,采用直接對整行數(shù)據(jù)進(jìn)行綜合判斷的方法,使原需要遍歷22個組合的計(jì)算與判斷大大減少
- 二是提前計(jì)算特征組合,制作成字典,后續(xù)直接查詢結(jié)果,而不再進(jìn)行重復(fù)計(jì)算
使用numpy加速計(jì)算
numpy還是數(shù)據(jù)處理上的神器,使用numpy的方法,比自己實(shí)現(xiàn)的方法效率要高非常多,本項(xiàng)目中就用到了:bincount、argsort,argmax、flipud、in1d、all等,即提高了運(yùn)行效率,又解決了邏輯判斷的問題:
"""numpy方法使用示例""" import numpy as np # 計(jì)算數(shù)字的個數(shù)組合bincount np.bincount([9, 2, 13, 12, 9, 10, 11]) # 輸出結(jié)果:array([0, 0, 1, 0, 0, 0, 0, 0, 0, 2, 1, 1, 1, 1], dtype=int64) # 取得個數(shù)最多的數(shù)字argmax np.argmax(np.bincount([9, 2, 13, 12, 9, 10, 11])) # 輸出結(jié)果: 9 # 將數(shù)字按照個數(shù)優(yōu)先,其次大小進(jìn)行排序argsort np.argsort(np.bincount([9, 2, 13, 12, 9, 10, 11])) # 輸出結(jié)果:array([ 0, 1, 3, 4, 5, 6, 7, 8, 2, 10, 11, 12, 13, 9], dtype=int64) # 翻轉(zhuǎn)列表flipud np.flipud(np.argsort(np.bincount([9, 2, 13, 12, 9, 10, 11]))) # 輸出結(jié)果: array([ 9, 13, 12, 11, 10, 2, 8, 7, 6, 5, 4, 3, 1, 0], dtype=int64) # 查找相同值in1d np.in1d([2, 3, 4], [2, 9, 3]) # 輸出結(jié)果: array([ True, True, False]) 注:指2,3True,4False np.all(np.in1d([2, 3], [2, 9, 3])) # 輸出結(jié)果: array([ True, True]) # 是否全是all np.all(np.in1d([2, 3, 4], [2, 9, 3])) # 判斷組合1是否包含在組合2中 # 輸出結(jié)果: False np.all(np.in1d([2, 3], [2, 9, 3])) # 輸出結(jié)果: True
優(yōu)化前后的效率對比
總結(jié)
優(yōu)化算法是在這個項(xiàng)目上時間花費(fèi)最多的工作(沒有之一)。4月12日接單,10天左右出了第1稿,雖能運(yùn)行,但回頭看存在兩個問題:一是有bug需要修正,二是運(yùn)行效率不高(4500萬行數(shù)據(jù),執(zhí)行需要1小時21分鐘,如果只是在這個版本上debug需要增加判斷條件,效率只會更低);后20多天是在不斷的優(yōu)化算法的同時對bug進(jìn)行修正,最后版本執(zhí)行相同數(shù)據(jù)只需要不足30分鐘,效率提高了一倍多?;仡檨砜?,雖然調(diào)優(yōu)花費(fèi)的時間多,但是每一個嘗試不論成功還是失敗都是一次寶貴的經(jīng)驗(yàn)積累。
到此這篇關(guān)于Python詳解復(fù)雜CSV文件處理方法的文章就介紹到這了,更多相關(guān)Python CSV文件處理內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python對多個sheet表進(jìn)行整合實(shí)例講解
在本篇文章里小編給大家整理的是一篇關(guān)于Python對多個sheet表進(jìn)行整合實(shí)例講解內(nèi)容,有興趣的朋友們可以學(xué)習(xí)下。2021-04-04Python 照片人物背景替換的實(shí)現(xiàn)方法
本文主要介紹了如何通過Python實(shí)現(xiàn)照片中人物背景圖的替換,甚至可以精細(xì)到頭發(fā)絲,感興趣的小伙伴可以看看2021-11-11Python 可視化matplotlib模塊基礎(chǔ)知識
這篇文章主要給大家分享的是Python 可視化matplotlib模塊基礎(chǔ)知識,文章對matplotlib.pyplot 模塊繪制相關(guān)如折線、柱狀、散點(diǎn)、圓餅圖表進(jìn)行簡單地學(xué)習(xí),具有一定的參考價值,需要的朋友可以參考一下2021-12-12Python關(guān)于print的操作(倒計(jì)時、轉(zhuǎn)圈顯示、進(jìn)度條)
這篇文章主要介紹了Python關(guān)于print的操作(倒計(jì)時、轉(zhuǎn)圈顯示、進(jìn)度條),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-05-05python3.5 + PyQt5 +Eric6 實(shí)現(xiàn)的一個計(jì)算器代碼
這篇文章主要介紹了python3.5 + PyQt5 +Eric6 實(shí)現(xiàn)的一個計(jì)算器代碼,在windows7 32位系統(tǒng)可以完美運(yùn)行 計(jì)算器,有興趣的可以了解一下。2017-03-03Django調(diào)用百度AI接口實(shí)現(xiàn)人臉注冊登錄代碼實(shí)例
這篇文章主要介紹了Django調(diào)用百度AI接口實(shí)現(xiàn)人臉注冊登錄,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-04-04Python用于學(xué)習(xí)重要算法的模塊pygorithm實(shí)例淺析
這篇文章主要介紹了Python用于學(xué)習(xí)重要算法的模塊pygorithm,結(jié)合實(shí)例形式簡單分析了pygorithm模塊的功能、算法調(diào)用、源碼獲取、時間復(fù)雜度計(jì)算等相關(guān)操作技巧,需要的朋友可以參考下2018-08-08