欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python異步編程入門之實(shí)現(xiàn)文件批處理的并發(fā)處理方式

 更新時(shí)間:2024年10月28日 10:56:00   作者:engchina  
本文以Python初級(jí)程序員為對(duì)象,介紹了如何使用asyncio和logging模塊實(shí)現(xiàn)一個(gè)異步批處理文件的并發(fā)處理系統(tǒng),以提高處理大量文件或數(shù)據(jù)時(shí)的效率,其中,通過(guò)配置日志系統(tǒng)記錄處理文件的日志信息,定義AsyncBatchProcessor類控制并發(fā)任務(wù)的數(shù)量

引言

在現(xiàn)代軟件開發(fā)中,處理大量文件或數(shù)據(jù)時(shí),提高處理效率和并發(fā)性是非常重要的。

Python 的 asyncio 庫(kù)提供了一種強(qiáng)大的方式來(lái)實(shí)現(xiàn)異步編程,從而提高程序的并發(fā)處理能力。

本文將面向 Python 初級(jí)程序員,介紹如何使用 asynciologging 模塊來(lái)實(shí)現(xiàn)一個(gè)異步批處理文件的并發(fā)處理系統(tǒng)。

代碼實(shí)現(xiàn)

1. 日志配置

首先,我們需要配置日志系統(tǒng),以便在處理文件時(shí)記錄日志信息。

日志配置包括設(shè)置日志格式和輸出位置。

import logging
import os

# 獲取當(dāng)前文件的絕對(duì)路徑
current_file = os.path.abspath(__file__)

# 配置日志格式
log_format = '%(asctime)s - %(levelname)s - %(pathname)s:%(lineno)d - %(message)s'
logging.basicConfig(format=log_format, level=logging.INFO)

# 創(chuàng)建一個(gè)文件處理器,并將日志輸出到文件
file_handler = logging.FileHandler('app.log')
file_handler.setFormatter(logging.Formatter(log_format))
logging.getLogger().addHandler(file_handler)

2. 異步批處理類

接下來(lái),我們定義一個(gè) AsyncBatchProcessor 類,用于處理批量文件。

該類使用 asyncio.Semaphore 來(lái)控制并發(fā)任務(wù)的數(shù)量。

import asyncio
import random

DEFAULT_MAX_CONCURRENT_TASKS = 2  # 最大并發(fā)任務(wù)數(shù)
MAX_RETRIES = 3  # 最大重試次數(shù)

class AsyncBatchProcessor:
    def __init__(self, max_concurrent: int = DEFAULT_MAX_CONCURRENT_TASKS):
        self.max_concurrent = max_concurrent
        self.semaphore = asyncio.Semaphore(max_concurrent)

    async def process_single_file(
            self,
            input_file: str,
            retry_count: int = 0
    ) -> None:
        """處理單個(gè)文件的異步方法"""
        async with self.semaphore:  # 使用信號(hào)量控制并發(fā)
            try:
                logging.info(f"Processing file: {input_file}")

                # 模擬文件處理過(guò)程
                await asyncio.sleep(random.uniform(0.5, 2.0))

                logging.info(f"Successfully processed {input_file}")

            except Exception as e:
                logging.error(f"Error processing {input_file} of Attempt {retry_count}: {str(e)}")
                if retry_count < MAX_RETRIES:
                    logging.info(f"Retrying {input_file} (Attempt {retry_count + 1})")
                    await asyncio.sleep(1)
                    await self.process_single_file(input_file, retry_count + 1)
                else:
                    logging.error(f"Failed to process {input_file} after {MAX_RETRIES} attempts")

    async def process_batch(
            self,
            file_list: list
    ) -> None:
        total_files = len(file_list)
        logging.info(f"Found {total_files} files to process")

        # 創(chuàng)建工作隊(duì)列
        queue = asyncio.Queue()

        # 將所有文件放入隊(duì)列
        for file_path in file_list:
            await queue.put(file_path)

        # 創(chuàng)建工作協(xié)程
        async def worker(worker_id: int):
            while True:
                try:
                    # 非阻塞方式獲取任務(wù)
                    input_file_path = await queue.get()
                    logging.info(f"Worker {worker_id} processing: {input_file_path}")

                    try:
                        await self.process_single_file(input_file_path)
                    except Exception as e:
                        logging.error(f"Error processing {input_file_path}: {str(e)}")
                    finally:
                        queue.task_done()

                except asyncio.QueueEmpty:
                    # 隊(duì)列為空,工作結(jié)束
                    break
                except Exception as e:
                    logging.error(f"Worker {worker_id} encountered error: {str(e)}")
                    break

        # 創(chuàng)建工作任務(wù)
        workers = []
        for i in range(self.max_concurrent):
            worker_task = asyncio.create_task(worker(i))
            workers.append(worker_task)

        # 等待隊(duì)列處理完成
        await queue.join()

        # 取消所有仍在運(yùn)行的工作任務(wù)
        for w in workers:
            w.cancel()

        # 等待所有工作任務(wù)完成
        await asyncio.gather(*workers, return_exceptions=True)

3. 異步批處理入口函數(shù)

最后,我們定義一個(gè)異步批處理入口函數(shù) batch_detect,用于啟動(dòng)批處理任務(wù)。

async def batch_detect(
        file_list: list,
        max_concurrent: int = DEFAULT_MAX_CONCURRENT_TASKS
):
    """異步批處理入口函數(shù)"""
    processor = AsyncBatchProcessor(max_concurrent)
    await processor.process_batch(file_list)

# 示例調(diào)用
file_list = ["file1.pdf", "file2.pdf", "file3.pdf", "file4.pdf"]
asyncio.run(batch_detect(file_list))

代碼解釋

1.日志配置

  • 使用 logging 模塊記錄日志信息,包括時(shí)間、日志級(jí)別、文件路徑和行號(hào)、以及日志消息。
  • 日志輸出到文件 app.log 中,便于后續(xù)查看和分析。

2.異步批處理類 AsyncBatchProcessor

  • __init__ 方法初始化最大并發(fā)任務(wù)數(shù)和信號(hào)量。
  • process_single_file 方法處理單個(gè)文件,使用信號(hào)量控制并發(fā),模擬文件處理過(guò)程,并在失敗時(shí)重試。
  • process_batch 方法處理批量文件,創(chuàng)建工作隊(duì)列和協(xié)程,控制并發(fā)任務(wù)的執(zhí)行。

3.異步批處理入口函數(shù) batch_detect

  • 創(chuàng)建 AsyncBatchProcessor 實(shí)例,并調(diào)用 process_batch 方法啟動(dòng)批處理任務(wù)。

總結(jié)

通過(guò)使用 asynciologging 模塊,我們實(shí)現(xiàn)了一個(gè)高效的異步批處理文件系統(tǒng)。

該系統(tǒng)能夠并發(fā)處理大量文件,并在處理失敗時(shí)自動(dòng)重試,直到達(dá)到最大重試次數(shù)。

日志系統(tǒng)幫助我們記錄每個(gè)文件的處理過(guò)程,便于后續(xù)的調(diào)試和分析。

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • 使用python數(shù)據(jù)清洗代碼實(shí)例

    使用python數(shù)據(jù)清洗代碼實(shí)例

    這篇文章主要介紹了使用python數(shù)據(jù)清洗代碼實(shí)例,分享一下近期用python做數(shù)據(jù)清洗匯總的相關(guān)代碼,這里我們用到的python包有pandas、numpy、os等,需要的朋友可以參考下
    2023-07-07
  • Python調(diào)用OpenCV實(shí)現(xiàn)圖像平滑代碼實(shí)例

    Python調(diào)用OpenCV實(shí)現(xiàn)圖像平滑代碼實(shí)例

    這篇文章主要介紹了Python調(diào)用OpenCV實(shí)現(xiàn)圖像平滑代碼實(shí)例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-06-06
  • 詳解python opencv、scikit-image和PIL圖像處理庫(kù)比較

    詳解python opencv、scikit-image和PIL圖像處理庫(kù)比較

    這篇文章主要介紹了詳解python opencv、scikit-image和PIL圖像處理庫(kù)比較,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-12-12
  • 分析Python的Django框架的運(yùn)行方式及處理流程

    分析Python的Django框架的運(yùn)行方式及處理流程

    這篇文章主要介紹了分析Python的Django框架的運(yùn)行方式及處理流程,本文對(duì)于Django框架的機(jī)制總結(jié)得非常之直觀精煉,極力推薦!需要的朋友可以參考下
    2015-04-04
  • 基于Python編寫一個(gè)DOS命令輔助工具

    基于Python編寫一個(gè)DOS命令輔助工具

    在日常系統(tǒng)管理和維護(hù)工作中,執(zhí)行DOS(Disk?Operating?System)命令是一項(xiàng)必不可少的任務(wù),下面我們就來(lái)看看如何使用Python編寫一個(gè)簡(jiǎn)單的DOS命令輔助工具,簡(jiǎn)化系統(tǒng)管理任務(wù)吧
    2024-01-01
  • python grpc實(shí)現(xiàn)異步調(diào)用(不用grpc異步接口)

    python grpc實(shí)現(xiàn)異步調(diào)用(不用grpc異步接口)

    grpc同步調(diào)用更簡(jiǎn)單,但是在處理復(fù)雜任務(wù)時(shí),會(huì)導(dǎo)致請(qǐng)求阻塞,影響吞吐,本文主要介紹了python grpc實(shí)現(xiàn)異步調(diào)用,不用grpc異步接口,具有一定的參考價(jià)值,感興趣的可以了解一下
    2024-04-04
  • PyQt5 在label顯示的圖片中繪制矩形的方法

    PyQt5 在label顯示的圖片中繪制矩形的方法

    今天小編就為大家分享一篇PyQt5 在label顯示的圖片中繪制矩形的方法,具有很好的參考價(jià)值。希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2019-06-06
  • Python目錄和文件處理總結(jié)詳解

    Python目錄和文件處理總結(jié)詳解

    這篇文章主要介紹了Python目錄和文件處理總結(jié)詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-09-09
  • Python計(jì)算兩個(gè)日期相差天數(shù)的方法示例

    Python計(jì)算兩個(gè)日期相差天數(shù)的方法示例

    這篇文章主要介紹了Python計(jì)算兩個(gè)日期相差天數(shù)的方法,結(jié)合簡(jiǎn)單實(shí)例形式分析了Python日期時(shí)間的轉(zhuǎn)換與運(yùn)算相關(guān)操作技巧,需要的朋友可以參考下
    2017-05-05
  • Python素?cái)?shù)檢測(cè)實(shí)例分析

    Python素?cái)?shù)檢測(cè)實(shí)例分析

    這篇文章主要介紹了Python素?cái)?shù)檢測(cè)方法,實(shí)例分析了Python判定素?cái)?shù)的相關(guān)技巧,需要的朋友可以參考下
    2015-06-06

最新評(píng)論