Python異步編程入門之實現(xiàn)文件批處理的并發(fā)處理方式
引言
在現(xiàn)代軟件開發(fā)中,處理大量文件或數(shù)據(jù)時,提高處理效率和并發(fā)性是非常重要的。
Python 的 asyncio 庫提供了一種強大的方式來實現(xiàn)異步編程,從而提高程序的并發(fā)處理能力。
本文將面向 Python 初級程序員,介紹如何使用 asyncio 和 logging 模塊來實現(xiàn)一個異步批處理文件的并發(fā)處理系統(tǒng)。
代碼實現(xiàn)
1. 日志配置
首先,我們需要配置日志系統(tǒng),以便在處理文件時記錄日志信息。
日志配置包括設置日志格式和輸出位置。
import logging
import os
# 獲取當前文件的絕對路徑
current_file = os.path.abspath(__file__)
# 配置日志格式
log_format = '%(asctime)s - %(levelname)s - %(pathname)s:%(lineno)d - %(message)s'
logging.basicConfig(format=log_format, level=logging.INFO)
# 創(chuàng)建一個文件處理器,并將日志輸出到文件
file_handler = logging.FileHandler('app.log')
file_handler.setFormatter(logging.Formatter(log_format))
logging.getLogger().addHandler(file_handler)2. 異步批處理類
接下來,我們定義一個 AsyncBatchProcessor 類,用于處理批量文件。
該類使用 asyncio.Semaphore 來控制并發(fā)任務的數(shù)量。
import asyncio
import random
DEFAULT_MAX_CONCURRENT_TASKS = 2 # 最大并發(fā)任務數(shù)
MAX_RETRIES = 3 # 最大重試次數(shù)
class AsyncBatchProcessor:
def __init__(self, max_concurrent: int = DEFAULT_MAX_CONCURRENT_TASKS):
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
async def process_single_file(
self,
input_file: str,
retry_count: int = 0
) -> None:
"""處理單個文件的異步方法"""
async with self.semaphore: # 使用信號量控制并發(fā)
try:
logging.info(f"Processing file: {input_file}")
# 模擬文件處理過程
await asyncio.sleep(random.uniform(0.5, 2.0))
logging.info(f"Successfully processed {input_file}")
except Exception as e:
logging.error(f"Error processing {input_file} of Attempt {retry_count}: {str(e)}")
if retry_count < MAX_RETRIES:
logging.info(f"Retrying {input_file} (Attempt {retry_count + 1})")
await asyncio.sleep(1)
await self.process_single_file(input_file, retry_count + 1)
else:
logging.error(f"Failed to process {input_file} after {MAX_RETRIES} attempts")
async def process_batch(
self,
file_list: list
) -> None:
total_files = len(file_list)
logging.info(f"Found {total_files} files to process")
# 創(chuàng)建工作隊列
queue = asyncio.Queue()
# 將所有文件放入隊列
for file_path in file_list:
await queue.put(file_path)
# 創(chuàng)建工作協(xié)程
async def worker(worker_id: int):
while True:
try:
# 非阻塞方式獲取任務
input_file_path = await queue.get()
logging.info(f"Worker {worker_id} processing: {input_file_path}")
try:
await self.process_single_file(input_file_path)
except Exception as e:
logging.error(f"Error processing {input_file_path}: {str(e)}")
finally:
queue.task_done()
except asyncio.QueueEmpty:
# 隊列為空,工作結(jié)束
break
except Exception as e:
logging.error(f"Worker {worker_id} encountered error: {str(e)}")
break
# 創(chuàng)建工作任務
workers = []
for i in range(self.max_concurrent):
worker_task = asyncio.create_task(worker(i))
workers.append(worker_task)
# 等待隊列處理完成
await queue.join()
# 取消所有仍在運行的工作任務
for w in workers:
w.cancel()
# 等待所有工作任務完成
await asyncio.gather(*workers, return_exceptions=True)3. 異步批處理入口函數(shù)
最后,我們定義一個異步批處理入口函數(shù) batch_detect,用于啟動批處理任務。
async def batch_detect(
file_list: list,
max_concurrent: int = DEFAULT_MAX_CONCURRENT_TASKS
):
"""異步批處理入口函數(shù)"""
processor = AsyncBatchProcessor(max_concurrent)
await processor.process_batch(file_list)
# 示例調(diào)用
file_list = ["file1.pdf", "file2.pdf", "file3.pdf", "file4.pdf"]
asyncio.run(batch_detect(file_list))代碼解釋
1.日志配置:
- 使用
logging模塊記錄日志信息,包括時間、日志級別、文件路徑和行號、以及日志消息。 - 日志輸出到文件
app.log中,便于后續(xù)查看和分析。
2.異步批處理類 AsyncBatchProcessor:
__init__方法初始化最大并發(fā)任務數(shù)和信號量。process_single_file方法處理單個文件,使用信號量控制并發(fā),模擬文件處理過程,并在失敗時重試。process_batch方法處理批量文件,創(chuàng)建工作隊列和協(xié)程,控制并發(fā)任務的執(zhí)行。
3.異步批處理入口函數(shù) batch_detect:
- 創(chuàng)建
AsyncBatchProcessor實例,并調(diào)用process_batch方法啟動批處理任務。
總結(jié)
通過使用 asyncio 和 logging 模塊,我們實現(xiàn)了一個高效的異步批處理文件系統(tǒng)。
該系統(tǒng)能夠并發(fā)處理大量文件,并在處理失敗時自動重試,直到達到最大重試次數(shù)。
日志系統(tǒng)幫助我們記錄每個文件的處理過程,便于后續(xù)的調(diào)試和分析。
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
Python調(diào)用OpenCV實現(xiàn)圖像平滑代碼實例
這篇文章主要介紹了Python調(diào)用OpenCV實現(xiàn)圖像平滑代碼實例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-06-06
詳解python opencv、scikit-image和PIL圖像處理庫比較
這篇文章主要介紹了詳解python opencv、scikit-image和PIL圖像處理庫比較,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-12-12
python grpc實現(xiàn)異步調(diào)用(不用grpc異步接口)
grpc同步調(diào)用更簡單,但是在處理復雜任務時,會導致請求阻塞,影響吞吐,本文主要介紹了python grpc實現(xiàn)異步調(diào)用,不用grpc異步接口,具有一定的參考價值,感興趣的可以了解一下2024-04-04

