欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python詳解復(fù)雜CSV文件處理方法

 更新時間:2022年07月19日 15:38:54   作者:畢加鎖  
這篇文章主要介紹了Python數(shù)據(jù)讀寫之Python讀寫CSV文件,CSV即逗號分隔值,一種以逗號分隔按行存儲的文本文件,所有的值都表現(xiàn)為字符串類型,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價值,感興趣的小伙伴可以參考一下

項(xiàng)目簡介

鑒于項(xiàng)目保密的需要,不便透露太多項(xiàng)目的信息,因此,簡單介紹一下項(xiàng)目存在的難點(diǎn):

  • 海量數(shù)據(jù):項(xiàng)目是對CSV文件中的數(shù)據(jù)進(jìn)行處理,而特點(diǎn)是數(shù)據(jù)量大...真的大?。?!拿到的第一個CSV示例文件是110多萬行(小CASE),而第二個文件就到了4500萬行,等到第三個文件......好吧,一直沒見到第三個完整示例文件,因?yàn)樘罅耍瑩?jù)說是第二個示例文件的40多倍,大概二十億行......
  • 業(yè)務(wù)邏輯復(fù)雜:項(xiàng)目是需要對CSV文件的每一行數(shù)據(jù)的各種組合可能性進(jìn)行判斷,而判斷的業(yè)務(wù)邏輯較為復(fù)雜,如何在解決復(fù)雜邏輯的同時保證較高的處理效率是難點(diǎn)之一。

項(xiàng)目筆記與心得

1.分批處理與多進(jìn)程及多線程加速

  • 因?yàn)閿?shù)據(jù)量太大,肯定是要分批對數(shù)據(jù)進(jìn)行處理,否則,效率低不談,大概率也沒有足夠的內(nèi)存能夠支撐,需要用到chunksize,此外,為了節(jié)約內(nèi)存,以及提高處理效率,可以將文本類的數(shù)據(jù)存儲為“category”格式:
  • 項(xiàng)目整體是計(jì)算密集型的任務(wù),因此,需要用到多進(jìn)程,充分利用CPU的多核性能;
  • 多線程進(jìn)行讀取與寫入,其中,寫入使用to_csv的增量寫入方法,mode參數(shù)設(shè)置為'a';
  • 多進(jìn)程與多線程開啟一般為死循環(huán),需要在合適的位置,放入結(jié)束循環(huán)的信號,以便處理完畢后退出多進(jìn)程或多線程
"""鑒于項(xiàng)目保密需要,以下代碼僅為示例"""
import time
import pathlib as pl
import pandas as pd
from threading import Thread
from multiprocessing import Queue, Process, cpu_count
# 導(dǎo)入多線程Thread,多進(jìn)程的隊(duì)列Queue,多進(jìn)程Process,CPU核數(shù)cpu_count
# 存放分段讀取的數(shù)據(jù)隊(duì)列,注:maxsize控制隊(duì)列的最大數(shù)量,避免一次性讀取到內(nèi)存中的數(shù)據(jù)量太大
data_queue = Queue(maxsize=cpu_count() * 2)  
# 存放等待寫入磁盤的數(shù)據(jù)隊(duì)列
write_queue = Queue()  
def read_data(path: pl.Path, data_queue: Queue, size: int = 10000):
    """
    讀取數(shù)據(jù)放入隊(duì)列的方法
    :return:
    """
    data_obj = pd.read_csv(path, sep=',', header=0, chunksize=size, dtype='category')
    for idx, df in enumerate(data_obj):
        while data_queue.full():  # 如果隊(duì)列滿了,那就等待
            time.sleep(1)
        data_queue.put((idx + 1, df))
    data_queue.put((None, None))  # 放入結(jié)束信號
def write_data(out_path: pl.Path, write_queue: Queue):
    """
    將數(shù)據(jù)增量寫入CSV的方法
    :return:
    """
    while True:
        while write_queue.empty():
            time.sleep(1)
        idx, df = write_queue.get()
        if df is None:
            return  # 結(jié)束退出
        df.to_csv(out_path, mode='a', header=None, index=False, encoding='ansi')  # 輸出CSV
def parse_data(data_queue: Queue, write_queue: Queue):
    """
    從隊(duì)列中取出數(shù)據(jù),并加工的方法
    :return:
    """
    while True:
        while write_queue.empty():
            time.sleep(1)
        idx, df = data_queue.get()
        if df is None:  # 如果是空的結(jié)束信號,則結(jié)束退出進(jìn)程,
        # 特別注意結(jié)束前把結(jié)束信號放回隊(duì)列,以便其他進(jìn)程也能接收到結(jié)束信號?。?!
            data_queue.put((idx, df))
            return
        """處理數(shù)據(jù)的業(yè)務(wù)邏輯略過"""
        write_queue.put((idx, df))  # 將處理后的數(shù)據(jù)放入寫隊(duì)列
# 創(chuàng)建一個讀取數(shù)據(jù)的線程
read_pool = Thread(target=read_data, args=(read_data_queue, *args))
read_pool.start()  # 開啟讀取線程
# 創(chuàng)建一個增量寫入CSV數(shù)據(jù)的線程
write_pool = Thread(target=write_data, args=(write_data_queue, *args))
write_pool.start()  # 開啟寫進(jìn)程
pools = []  # 存放解析進(jìn)程的隊(duì)列
for i in range(cpu_count()):  # 循環(huán)開啟多進(jìn)程,不確定開多少個進(jìn)程合適的情況下,那么按CPU的核數(shù)開比較合理
    pool = Process(target=parse_data, args=(read_data_queue, write_data_queue, *args))
    pool.start()  # 啟動進(jìn)程
    pools.append(pool)  # 加入隊(duì)列
for pool in pools:
    pool.join()  # 等待所有解析進(jìn)程完成
# 所有解析進(jìn)程完成后,在寫隊(duì)列放入結(jié)束寫線程的信號
write_data_queue.put((None, None))  
write_pool.join()  # 等待寫線程結(jié)束
print('任務(wù)完成')

2.優(yōu)化算法提高效率

將類對象存入dataframe列

在嘗試了n種方案之后,最終使用了將類對象存到dataframe的列中,使用map方法,運(yùn)行類方法,最后,將運(yùn)行結(jié)果展開到多列中的方式。該方案本項(xiàng)目中取得了最佳的處理效率。

"""鑒于保密需要,以下代碼僅為示例"""
class Obj:
    def __init__(self, ser: pd.Series):
        """
        初始化類對象
        :param ser: 傳入series
        """
        self.ser = ser  # 行數(shù)據(jù)
        self.attrs1 = []  # 屬性1
        self.attrs2 = []  # 屬性2
        self.attrs3 = []  # 屬性3
    def __repr__(self):
        """
        自定義輸出
        """
        attrs1 = '_'.join([str(a) for a in self.attrs1])
        attrs2 = '_'.join([str(a) for a in self.attrs2])
        attrs3 = '_'.join([str(a) for a in self.attrs3])
        return '_'.join([attrs1, attrs2, attrs3])
    def run(self):
        """運(yùn)行業(yè)務(wù)邏輯"""
# 創(chuàng)建obj列,存入類對象
data['obj'] = data.apply(lambda x: Obj(x), axis=1)
# 運(yùn)行obj列中的類方法獲得判斷結(jié)果
data['obj'] = data['obj'].map(lambda x: x.run())
# 鏈?zhǔn)秸{(diào)用,1將類對象文本化->2拆分到多列->3刪除空列->4轉(zhuǎn)換為category格式
data[['col1', 'col2', 'col3', ...省略]] = data['obj'].map(str).str.split('_', expand=True).dropna(axis=1).astype('category')
# 刪除obj列
data.drop(columns='obj', inplace=True)  

減少計(jì)算次數(shù)以提高運(yùn)行效率

在整個優(yōu)化過程中,對運(yùn)行效率產(chǎn)生最大優(yōu)化效果的有兩項(xiàng):

  • 一是改變遍歷算法,采用直接對整行數(shù)據(jù)進(jìn)行綜合判斷的方法,使原需要遍歷22個組合的計(jì)算與判斷大大減少
  • 二是提前計(jì)算特征組合,制作成字典,后續(xù)直接查詢結(jié)果,而不再進(jìn)行重復(fù)計(jì)算

使用numpy加速計(jì)算

numpy還是數(shù)據(jù)處理上的神器,使用numpy的方法,比自己實(shí)現(xiàn)的方法效率要高非常多,本項(xiàng)目中就用到了:bincount、argsort,argmax、flipud、in1d、all等,即提高了運(yùn)行效率,又解決了邏輯判斷的問題:

"""numpy方法使用示例"""
import numpy as np
# 計(jì)算數(shù)字的個數(shù)組合bincount
np.bincount([9, 2, 13, 12, 9, 10, 11])
# 輸出結(jié)果:array([0, 0, 1, 0, 0, 0, 0, 0, 0, 2, 1, 1, 1, 1], dtype=int64)
# 取得個數(shù)最多的數(shù)字argmax
np.argmax(np.bincount([9, 2, 13, 12, 9, 10, 11]))
# 輸出結(jié)果: 9
# 將數(shù)字按照個數(shù)優(yōu)先,其次大小進(jìn)行排序argsort
np.argsort(np.bincount([9, 2, 13, 12, 9, 10, 11]))
# 輸出結(jié)果:array([ 0,  1,  3,  4,  5,  6,  7,  8,  2, 10, 11, 12, 13,  9], dtype=int64)
# 翻轉(zhuǎn)列表flipud
np.flipud(np.argsort(np.bincount([9, 2, 13, 12, 9, 10, 11])))
# 輸出結(jié)果: array([ 9, 13, 12, 11, 10,  2,  8,  7,  6,  5,  4,  3,  1,  0], dtype=int64)
# 查找相同值in1d
np.in1d([2, 3, 4], [2, 9, 3])
# 輸出結(jié)果: array([ True,  True, False]) 注:指2,3True,4False
np.all(np.in1d([2, 3], [2, 9, 3]))
# 輸出結(jié)果: array([ True,  True])
# 是否全是all
np.all(np.in1d([2, 3, 4], [2, 9, 3]))  # 判斷組合1是否包含在組合2中
# 輸出結(jié)果: False
np.all(np.in1d([2, 3], [2, 9, 3]))
# 輸出結(jié)果: True

優(yōu)化前后的效率對比

總結(jié)

優(yōu)化算法是在這個項(xiàng)目上時間花費(fèi)最多的工作(沒有之一)。4月12日接單,10天左右出了第1稿,雖能運(yùn)行,但回頭看存在兩個問題:一是有bug需要修正,二是運(yùn)行效率不高(4500萬行數(shù)據(jù),執(zhí)行需要1小時21分鐘,如果只是在這個版本上debug需要增加判斷條件,效率只會更低);后20多天是在不斷的優(yōu)化算法的同時對bug進(jìn)行修正,最后版本執(zhí)行相同數(shù)據(jù)只需要不足30分鐘,效率提高了一倍多?;仡檨砜?,雖然調(diào)優(yōu)花費(fèi)的時間多,但是每一個嘗試不論成功還是失敗都是一次寶貴的經(jīng)驗(yàn)積累。

到此這篇關(guān)于Python詳解復(fù)雜CSV文件處理方法的文章就介紹到這了,更多相關(guān)Python CSV文件處理內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論