Python異步編程入門之實(shí)現(xiàn)文件批處理的并發(fā)處理方式
引言
在現(xiàn)代軟件開發(fā)中,處理大量文件或數(shù)據(jù)時(shí),提高處理效率和并發(fā)性是非常重要的。
Python 的 asyncio
庫(kù)提供了一種強(qiáng)大的方式來(lái)實(shí)現(xiàn)異步編程,從而提高程序的并發(fā)處理能力。
本文將面向 Python 初級(jí)程序員,介紹如何使用 asyncio
和 logging
模塊來(lái)實(shí)現(xiàn)一個(gè)異步批處理文件的并發(fā)處理系統(tǒng)。
代碼實(shí)現(xiàn)
1. 日志配置
首先,我們需要配置日志系統(tǒng),以便在處理文件時(shí)記錄日志信息。
日志配置包括設(shè)置日志格式和輸出位置。
import logging import os # 獲取當(dāng)前文件的絕對(duì)路徑 current_file = os.path.abspath(__file__) # 配置日志格式 log_format = '%(asctime)s - %(levelname)s - %(pathname)s:%(lineno)d - %(message)s' logging.basicConfig(format=log_format, level=logging.INFO) # 創(chuàng)建一個(gè)文件處理器,并將日志輸出到文件 file_handler = logging.FileHandler('app.log') file_handler.setFormatter(logging.Formatter(log_format)) logging.getLogger().addHandler(file_handler)
2. 異步批處理類
接下來(lái),我們定義一個(gè) AsyncBatchProcessor
類,用于處理批量文件。
該類使用 asyncio.Semaphore
來(lái)控制并發(fā)任務(wù)的數(shù)量。
import asyncio import random DEFAULT_MAX_CONCURRENT_TASKS = 2 # 最大并發(fā)任務(wù)數(shù) MAX_RETRIES = 3 # 最大重試次數(shù) class AsyncBatchProcessor: def __init__(self, max_concurrent: int = DEFAULT_MAX_CONCURRENT_TASKS): self.max_concurrent = max_concurrent self.semaphore = asyncio.Semaphore(max_concurrent) async def process_single_file( self, input_file: str, retry_count: int = 0 ) -> None: """處理單個(gè)文件的異步方法""" async with self.semaphore: # 使用信號(hào)量控制并發(fā) try: logging.info(f"Processing file: {input_file}") # 模擬文件處理過(guò)程 await asyncio.sleep(random.uniform(0.5, 2.0)) logging.info(f"Successfully processed {input_file}") except Exception as e: logging.error(f"Error processing {input_file} of Attempt {retry_count}: {str(e)}") if retry_count < MAX_RETRIES: logging.info(f"Retrying {input_file} (Attempt {retry_count + 1})") await asyncio.sleep(1) await self.process_single_file(input_file, retry_count + 1) else: logging.error(f"Failed to process {input_file} after {MAX_RETRIES} attempts") async def process_batch( self, file_list: list ) -> None: total_files = len(file_list) logging.info(f"Found {total_files} files to process") # 創(chuàng)建工作隊(duì)列 queue = asyncio.Queue() # 將所有文件放入隊(duì)列 for file_path in file_list: await queue.put(file_path) # 創(chuàng)建工作協(xié)程 async def worker(worker_id: int): while True: try: # 非阻塞方式獲取任務(wù) input_file_path = await queue.get() logging.info(f"Worker {worker_id} processing: {input_file_path}") try: await self.process_single_file(input_file_path) except Exception as e: logging.error(f"Error processing {input_file_path}: {str(e)}") finally: queue.task_done() except asyncio.QueueEmpty: # 隊(duì)列為空,工作結(jié)束 break except Exception as e: logging.error(f"Worker {worker_id} encountered error: {str(e)}") break # 創(chuàng)建工作任務(wù) workers = [] for i in range(self.max_concurrent): worker_task = asyncio.create_task(worker(i)) workers.append(worker_task) # 等待隊(duì)列處理完成 await queue.join() # 取消所有仍在運(yùn)行的工作任務(wù) for w in workers: w.cancel() # 等待所有工作任務(wù)完成 await asyncio.gather(*workers, return_exceptions=True)
3. 異步批處理入口函數(shù)
最后,我們定義一個(gè)異步批處理入口函數(shù) batch_detect
,用于啟動(dòng)批處理任務(wù)。
async def batch_detect( file_list: list, max_concurrent: int = DEFAULT_MAX_CONCURRENT_TASKS ): """異步批處理入口函數(shù)""" processor = AsyncBatchProcessor(max_concurrent) await processor.process_batch(file_list) # 示例調(diào)用 file_list = ["file1.pdf", "file2.pdf", "file3.pdf", "file4.pdf"] asyncio.run(batch_detect(file_list))
代碼解釋
1.日志配置:
- 使用
logging
模塊記錄日志信息,包括時(shí)間、日志級(jí)別、文件路徑和行號(hào)、以及日志消息。 - 日志輸出到文件
app.log
中,便于后續(xù)查看和分析。
2.異步批處理類 AsyncBatchProcessor
:
__init__
方法初始化最大并發(fā)任務(wù)數(shù)和信號(hào)量。process_single_file
方法處理單個(gè)文件,使用信號(hào)量控制并發(fā),模擬文件處理過(guò)程,并在失敗時(shí)重試。process_batch
方法處理批量文件,創(chuàng)建工作隊(duì)列和協(xié)程,控制并發(fā)任務(wù)的執(zhí)行。
3.異步批處理入口函數(shù) batch_detect
:
- 創(chuàng)建
AsyncBatchProcessor
實(shí)例,并調(diào)用process_batch
方法啟動(dòng)批處理任務(wù)。
總結(jié)
通過(guò)使用 asyncio
和 logging
模塊,我們實(shí)現(xiàn)了一個(gè)高效的異步批處理文件系統(tǒng)。
該系統(tǒng)能夠并發(fā)處理大量文件,并在處理失敗時(shí)自動(dòng)重試,直到達(dá)到最大重試次數(shù)。
日志系統(tǒng)幫助我們記錄每個(gè)文件的處理過(guò)程,便于后續(xù)的調(diào)試和分析。
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
使用python數(shù)據(jù)清洗代碼實(shí)例
這篇文章主要介紹了使用python數(shù)據(jù)清洗代碼實(shí)例,分享一下近期用python做數(shù)據(jù)清洗匯總的相關(guān)代碼,這里我們用到的python包有pandas、numpy、os等,需要的朋友可以參考下2023-07-07Python調(diào)用OpenCV實(shí)現(xiàn)圖像平滑代碼實(shí)例
這篇文章主要介紹了Python調(diào)用OpenCV實(shí)現(xiàn)圖像平滑代碼實(shí)例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-06-06詳解python opencv、scikit-image和PIL圖像處理庫(kù)比較
這篇文章主要介紹了詳解python opencv、scikit-image和PIL圖像處理庫(kù)比較,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-12-12分析Python的Django框架的運(yùn)行方式及處理流程
這篇文章主要介紹了分析Python的Django框架的運(yùn)行方式及處理流程,本文對(duì)于Django框架的機(jī)制總結(jié)得非常之直觀精煉,極力推薦!需要的朋友可以參考下2015-04-04python grpc實(shí)現(xiàn)異步調(diào)用(不用grpc異步接口)
grpc同步調(diào)用更簡(jiǎn)單,但是在處理復(fù)雜任務(wù)時(shí),會(huì)導(dǎo)致請(qǐng)求阻塞,影響吞吐,本文主要介紹了python grpc實(shí)現(xiàn)異步調(diào)用,不用grpc異步接口,具有一定的參考價(jià)值,感興趣的可以了解一下2024-04-04Python計(jì)算兩個(gè)日期相差天數(shù)的方法示例
這篇文章主要介紹了Python計(jì)算兩個(gè)日期相差天數(shù)的方法,結(jié)合簡(jiǎn)單實(shí)例形式分析了Python日期時(shí)間的轉(zhuǎn)換與運(yùn)算相關(guān)操作技巧,需要的朋友可以參考下2017-05-05Python素?cái)?shù)檢測(cè)實(shí)例分析
這篇文章主要介紹了Python素?cái)?shù)檢測(cè)方法,實(shí)例分析了Python判定素?cái)?shù)的相關(guān)技巧,需要的朋友可以參考下2015-06-06