Python并行處理實戰(zhàn)之如何使用ProcessPoolExecutor加速計算
簡介
在現(xiàn)代計算中,并行處理是提高程序性能的重要手段。Python提供了多種并行處理的方式,其中concurrent.futures
模塊的ProcessPoolExecutor
是一個非常強大且易于使用的工具。本文將通過一個實際示例,展示如何使用ProcessPoolExecutor
進(jìn)行并行處理,并詳細(xì)解釋代碼的工作原理。
完整代碼示例
import time import multiprocessing from concurrent.futures import ProcessPoolExecutor, as_completed from typing import List def process_numbers(chunk: List[int], factor: int) -> str: """ 處理數(shù)字的函數(shù),通過將它們乘以因子來模擬處理。 這個函數(shù)接受一個數(shù)字列表和一個因子,計算列表中每個數(shù)字乘以因子的和, 并返回結(jié)果字符串。 """ result = sum(x * factor for x in chunk) time.sleep(0.1) # 使用睡眠模擬工作 return f"處理的塊和: {result}" def main(numbers: List[int] = None, num_chunks: int = 10, factor: int = 2): """ 演示并行處理的主函數(shù)。 這個函數(shù)負(fù)責(zé)設(shè)置日志記錄、生成數(shù)字列表、確定最佳工作進(jìn)程數(shù)量、 將數(shù)字分成塊,并使用ProcessPoolExecutor進(jìn)行并行處理。 """ import logging logging.basicConfig(level=logging.INFO) _log = logging.getLogger(__name__) # 如果沒有提供數(shù)字,則生成示例列表 if numbers is None: numbers = list(range(1, 101)) # 生成1到100的數(shù)字 total_numbers = len(numbers) _log.info(f"開始并行處理 {total_numbers} 個數(shù)字") cpu_count = multiprocessing.cpu_count() _log.info(f"檢測到 {cpu_count} 個CPU核心") # 確定最佳工作進(jìn)程數(shù)量 optimal_workers = min(cpu_count, num_chunks) _log.info(f"使用 {optimal_workers} 個工作進(jìn)程") # 計算塊大小 chunk_size = max(1, total_numbers // optimal_workers) _log.info(f"每個塊包含 {chunk_size} 個數(shù)字") # 將數(shù)字分成塊 chunks = [numbers[i:i + chunk_size] for i in range(0, total_numbers, chunk_size)] _log.info(f"總共生成了 {len(chunks)} 個塊") start_time = time.time() processed_count = 0 # 使用ProcessPoolExecutor進(jìn)行并行處理 with ProcessPoolExecutor(max_workers=optimal_workers) as executor: _log.info("啟動ProcessPoolExecutor") # 提交所有任務(wù) futures = [executor.submit(process_numbers, chunk, factor) for chunk in chunks] _log.info(f"提交了 {len(futures)} 個任務(wù)") # 等待完成并收集結(jié)果 for future in as_completed(futures): try: result = future.result() processed_count += 1 _log.info(f"{'#'*50}\n{result} ({processed_count}/{len(chunks)} 總計)\n{'#'*50}") except Exception as e: _log.error(f"處理塊時出錯: {str(e)}") raise elapsed_time = time.time() - start_time _log.info(f"并行處理完成,耗時 {elapsed_time:.2f} 秒。") if __name__ == "__main__": # 使用數(shù)字列表的示例 main()
代碼解釋
1. 導(dǎo)入必要的模塊
import time import multiprocessing from concurrent.futures import ProcessPoolExecutor, as_completed from typing import List
這些模塊提供了我們需要的并行處理功能和類型提示。
2. 定義處理函數(shù)
def process_numbers(chunk: List[int], factor: int) -> str: """ 處理數(shù)字的函數(shù),通過將它們乘以因子來模擬處理。 這個函數(shù)接受一個數(shù)字列表和一個因子,計算列表中每個數(shù)字乘以因子的和, 并返回結(jié)果字符串。 """ result = sum(x * factor for x in chunk) time.sleep(0.1) # 使用睡眠模擬工作 return f"處理的塊和: {result}"
這個函數(shù)模擬了對數(shù)字列表的處理,通過將每個數(shù)字乘以一個因子并求和。time.sleep(0.1)
用于模擬實際工作。
3. 主函數(shù)
def main(numbers: List[int] = None, num_chunks: int = 10, factor: int = 2): """ 演示并行處理的主函數(shù)。 這個函數(shù)負(fù)責(zé)設(shè)置日志記錄、生成數(shù)字列表、確定最佳工作進(jìn)程數(shù)量、 將數(shù)字分成塊,并使用ProcessPoolExecutor進(jìn)行并行處理。 """ import logging logging.basicConfig(level=logging.INFO) _log = logging.getLogger(__name__)
主函數(shù)負(fù)責(zé)設(shè)置日志記錄、生成數(shù)字列表、確定最佳工作進(jìn)程數(shù)量、將數(shù)字分成塊,并使用ProcessPoolExecutor
進(jìn)行并行處理。
4. 生成數(shù)字列表
# 如果沒有提供數(shù)字,則生成示例列表 if numbers is None: numbers = list(range(1, 101)) # 生成1到100的數(shù)字
如果沒有提供數(shù)字列表,則生成1到100的數(shù)字列表。
5. 確定最佳工作進(jìn)程數(shù)量
cpu_count = multiprocessing.cpu_count() _log.info(f"檢測到 {cpu_count} 個CPU核心") # 確定最佳工作進(jìn)程數(shù)量 optimal_workers = min(cpu_count, num_chunks) _log.info(f"使用 {optimal_workers} 個工作進(jìn)程")
根據(jù)CPU核心數(shù)和用戶指定的塊數(shù),確定最佳工作進(jìn)程數(shù)量。
6. 將數(shù)字分成塊
# 計算塊大小 chunk_size = max(1, total_numbers // optimal_workers) _log.info(f"每個塊包含 {chunk_size} 個數(shù)字") # 將數(shù)字分成塊 chunks = [numbers[i:i + chunk_size] for i in range(0, total_numbers, chunk_size)] _log.info(f"總共生成了 {len(chunks)} 個塊")
將數(shù)字列表分成多個塊,每個塊的大小根據(jù)總數(shù)和工作進(jìn)程數(shù)量計算。
7. 并行處理
start_time = time.time() processed_count = 0 # 使用ProcessPoolExecutor進(jìn)行并行處理 with ProcessPoolExecutor(max_workers=optimal_workers) as executor: _log.info("啟動ProcessPoolExecutor") # 提交所有任務(wù) futures = [executor.submit(process_numbers, chunk, factor) for chunk in chunks] _log.info(f"提交了 {len(futures)} 個任務(wù)") # 等待完成并收集結(jié)果 for future in as_completed(futures): try: result = future.result() processed_count += 1 _log.info(f"{'#'*50}\n{result} ({processed_count}/{len(chunks)} 總計)\n{'#'*50}") except Exception as e: _log.error(f"處理塊時出錯: {str(e)}") raise
使用ProcessPoolExecutor
進(jìn)行并行處理,提交所有任務(wù)并等待完成。
8. 計算耗時
elapsed_time = time.time() - start_time _log.info(f"并行處理完成,耗時 {elapsed_time:.2f} 秒。")
計算并行處理的總耗時并輸出。
并行處理的基本概念和優(yōu)勢
并行處理是指同時執(zhí)行多個任務(wù),以提高程序的執(zhí)行效率。Python的concurrent.futures
模塊提供了一個高級接口,用于并行執(zhí)行任務(wù)。ProcessPoolExecutor
是其中一個重要的類,它使用多進(jìn)程來并行執(zhí)行任務(wù)。
并行處理的優(yōu)勢包括:
- 提高程序的執(zhí)行效率
- 充分利用多核CPU的計算能力
- 簡化多線程或多進(jìn)程編程的復(fù)雜性
如何運行和測試這個示例
- 將上述代碼保存為
parallel_processing_example.py
文件。 - 確保你的Python環(huán)境中安裝了必要的模塊(本示例不需要額外安裝模塊)。
- 在終端或命令行中運行以下命令:
python parallel_processing_example.py
你將看到程序的執(zhí)行過程和并行處理的結(jié)果。
總結(jié)
通過這個示例,我們展示了如何使用Python的ProcessPoolExecutor
進(jìn)行并行處理。并行處理是提高程序性能的重要手段,特別是在處理大量數(shù)據(jù)或計算密集型任務(wù)時。希望這個示例能幫助你更好地理解并行處理的概念和實現(xiàn)。
到此這篇關(guān)于Python并行處理實戰(zhàn)之如何使用ProcessPoolExecutor加速計算的文章就介紹到這了,更多相關(guān)Python ProcessPoolExecutor加速計算內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python sqlalchemy動態(tài)修改tablename兩種實現(xiàn)方式
這篇文章主要介紹了python sqlalchemy動態(tài)修改tablename兩種實現(xiàn)方式,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧2023-03-03PyQt6/PySide6中QTreeView類的實現(xiàn)
QTreeView是PyQt6或PySide6庫中用于顯示分層數(shù)據(jù)的控件,本文主要介紹了PyQt6/PySide6中QTreeView類的實現(xiàn),具有一定的參考價值,感興趣的可以了解一下2025-04-04Python數(shù)據(jù)合并的concat函數(shù)與merge函數(shù)詳解
大家都知道concat()函數(shù)可以沿著一條軸將多個對象進(jìn)行堆疊,其使用方式類似數(shù)據(jù)庫中的數(shù)據(jù)表合并,在使用merge()函數(shù)進(jìn)行合并時,默認(rèn)會使用重疊的列索引做為合并鍵,即取行索引重疊的部分,本文給大家介紹python?數(shù)據(jù)合并concat函數(shù)與merge函數(shù),感興趣的朋友一起看看吧2022-05-05