Python大數(shù)據(jù)量文本文件高效解析方案代碼實現(xiàn)全過程
測試環(huán)境
Python 3.6.2
Win 10 內(nèi)存 8G,CPU I5 1.6 GHz
背景描述
這個作品來源于一個日志解析工具的開發(fā),這個開發(fā)過程中遇到的一個痛點,就是日志文件多,日志數(shù)據(jù)量大,解析耗時長。在這種情況下,尋思一種高效解析數(shù)據(jù)解析方案。
解決方案描述
1、采用多線程讀取文件
2、采用按塊讀取文件替代按行讀取文件
由于日志文件都是文本文件,需要讀取其中每一行進行解析,所以一開始會很自然想到采用按行讀取,后面發(fā)現(xiàn)合理配置下,按塊讀取,會比按行讀取更高效。
按塊讀取來的問題就是,可能導致完整的數(shù)據(jù)行分散在不同數(shù)據(jù)塊中,那怎么解決這個問題呢?解答如下:
將數(shù)據(jù)塊按換行符\n
切分得到日志行列表,列表第一個元素可能是一個完整的日志行,也可能是上一個數(shù)據(jù)塊末尾日志行的組成部分,列表最后一個元素可能是不完整的日志行(即下一個數(shù)據(jù)塊開頭日志行的組成部分),也可能是空字符串(日志塊中的日志行數(shù)據(jù)全部是完整的),根據(jù)這個規(guī)律,得出以下公式,通過該公式,可以得到一個新的數(shù)據(jù)塊,對該數(shù)據(jù)塊二次切分,可以得到數(shù)據(jù)完整的日志行
上一個日志塊首部日志行 +\n + 尾部日志行 + 下一個數(shù)據(jù)塊首部日志行 + \n + 尾部日志行 + ...
3、將數(shù)據(jù)解析操作拆分為可并行解析部分和不可并行解析部分
數(shù)據(jù)解析往往涉及一些不可并行的操作,比如數(shù)據(jù)求和,最值統(tǒng)計等,如果不進行拆分,并行解析時勢必需要添加互斥鎖,避免數(shù)據(jù)覆蓋,這樣就會大大降低執(zhí)行的效率,特別是不可并行操作占比較大的情況下。
對數(shù)據(jù)解析操作進行拆分后,可并行解析操作部分不用加鎖??紤]到Python GIL的問題,不可并行解析部分替換為單進程解析。
4、采用多進程解析替代多線程解析
采用多進程解析替代多線程解析,可以避開Python GIL全局解釋鎖帶來的執(zhí)行效率問題,從而提高解析效率。
5、采用隊列實現(xiàn)“協(xié)同”效果
引入隊列機制,實現(xiàn)一邊讀取日志,一邊進行數(shù)據(jù)解析:
- 日志讀取線程將日志塊存儲到隊列,解析進程從隊列獲取已讀取日志塊,執(zhí)行可并行解析操作
- 并行解析操作進程將解析后的結(jié)果存儲到另一個隊列,另一個解析進程從隊列獲取數(shù)據(jù),執(zhí)行不可并行解析操作。
代碼實現(xiàn)
#!/usr/bin/env python # -*- coding:utf-8 -*- import re import time from datetime import datetime from joblib import Parallel, delayed, parallel_backend from collections import deque from multiprocessing import cpu_count import threading class LogParser(object): def __init__(self, chunk_size=1024*1024*10, process_num_for_log_parsing=cpu_count()): self.log_unparsed_queue = deque() # 用于存儲未解析日志 self.log_line_parsed_queue = deque() # 用于存儲已解析日志行 self.is_all_files_read = False # 標識是否已讀取所有日志文件 self.process_num_for_log_parsing = process_num_for_log_parsing # 并發(fā)解析日志文件進程數(shù) self.chunk_size = chunk_size # 每次讀取日志的日志塊大小 self.files_read_list = [] # 存放已讀取日志文件 self.log_parsing_finished = False # 標識是否完成日志解析 def read_in_chunks(self, filePath, chunk_size=1024*1024): """ 惰性函數(shù)(生成器),用于逐塊讀取文件。 默認區(qū)塊大?。?M """ with open(filePath, 'r', encoding='utf-8') as f: while True: chunk_data = f.read(chunk_size) if not chunk_data: break yield chunk_data def read_log_file(self, logfile_path): ''' 讀取日志文件 這里假設日志文件都是文本文件,按塊讀取后,可按換行符進行二次切分,以便獲取行日志 ''' temp_list = [] # 二次切分后,頭,尾行日志可能是不完整的,所以需要將日志塊頭尾行日志相連接,進行拼接 for chunk in self.read_in_chunks(logfile_path, self.chunk_size): log_chunk = chunk.split('\n') temp_list.extend([log_chunk[0], '\n']) temp_list.append(log_chunk[-1]) self.log_unparsed_queue.append(log_chunk[1:-1]) self.log_unparsed_queue.append(''.join(temp_list).split('\n')) self.files_read_list.remove(logfile_path) def start_processes_for_log_parsing(self): '''啟動日志解析進程''' with parallel_backend("multiprocessing", n_jobs=self.process_num_for_log_parsing): Parallel(require='sharedmem')(delayed(self.parse_logs)() for i in range(self.process_num_for_log_parsing)) self.log_parsing_finished = True def parse_logs(self): '''解析日志''' method_url_re_pattern = re.compile('(HEAD|POST|GET)\s+([^\s]+?)\s+',re.DOTALL) url_time_taken_extractor = re.compile('HTTP/1\.1.+\|(.+)\|\d+\|', re.DOTALL) while self.log_unparsed_queue or self.files_read_list: if not self.log_unparsed_queue: continue log_line_list = self.log_unparsed_queue.popleft() for log_line in log_line_list: #### do something with log_line if not log_line.strip(): continue res = method_url_re_pattern.findall(log_line) if not res: print('日志未匹配到請求URL,已忽略:\n%s' % log_line) continue method = res[0][0] url = res[0][1].split('?')[0] # 去掉了 ?及后面的url參數(shù) # 提取耗時 res = url_time_taken_extractor.findall(log_line) if res: time_taken = float(res[0]) else: print('未從日志提取到請求耗時,已忽略日志:\n%s' % log_line) continue # 存儲解析后的日志信息 self.log_line_parsed_queue.append({'method': method, 'url': url, 'time_taken': time_taken, }) def collect_statistics(self): '''收集統(tǒng)計數(shù)據(jù)''' def _collect_statistics(): while self.log_line_parsed_queue or not self.log_parsing_finished: if not self.log_line_parsed_queue: continue log_info = self.log_line_parsed_queue.popleft() # do something with log_info with parallel_backend("multiprocessing", n_jobs=1): Parallel()(delayed(_collect_statistics)() for i in range(1)) def run(self, file_path_list): # 多線程讀取日志文件 for file_path in file_path_list: thread = threading.Thread(target=self.read_log_file, name="read_log_file", args=(file_path,)) thread.start() self.files_read_list.append(file_path) # 啟動日志解析進程 thread = threading.Thread(target=self.start_processes_for_log_parsing, name="start_processes_for_log_parsing") thread.start() # 啟動日志統(tǒng)計數(shù)據(jù)收集進程 thread = threading.Thread(target=self.collect_statistics, name="collect_statistics") thread.start() start = datetime.now() while threading.active_count() > 1: print('程序正在努力解析日志...') time.sleep(0.5) end = datetime.now() print('解析完成', 'start', start, 'end', end, '耗時', end - start) if __name__ == "__main__": log_parser = LogParser() log_parser.run(['access.log', 'access2.log'])
注意:
需要合理的配置單次讀取文件數(shù)據(jù)塊的大小,不能過大,或者過小,否則都可能會導致數(shù)據(jù)讀取速度變慢。筆者實踐環(huán)境下,發(fā)現(xiàn)10M~15M每次是一個比較高效的配置。
總結(jié)
到此這篇關于Python大數(shù)據(jù)量文本文件高效解析方案代碼實現(xiàn)的文章就介紹到這了,更多相關Python大數(shù)據(jù)量文本文件解析內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
音頻處理 windows10下python三方庫librosa安裝教程
這篇文章主要介紹了音頻處理 windows10下python三方庫librosa安裝方法,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-06-06