Python并行處理實(shí)戰(zhàn)之如何使用ProcessPoolExecutor加速計(jì)算
簡介
在現(xiàn)代計(jì)算中,并行處理是提高程序性能的重要手段。Python提供了多種并行處理的方式,其中concurrent.futures模塊的ProcessPoolExecutor是一個(gè)非常強(qiáng)大且易于使用的工具。本文將通過一個(gè)實(shí)際示例,展示如何使用ProcessPoolExecutor進(jìn)行并行處理,并詳細(xì)解釋代碼的工作原理。
完整代碼示例
import time
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import List
def process_numbers(chunk: List[int], factor: int) -> str:
"""
處理數(shù)字的函數(shù),通過將它們乘以因子來模擬處理。
這個(gè)函數(shù)接受一個(gè)數(shù)字列表和一個(gè)因子,計(jì)算列表中每個(gè)數(shù)字乘以因子的和,
并返回結(jié)果字符串。
"""
result = sum(x * factor for x in chunk)
time.sleep(0.1) # 使用睡眠模擬工作
return f"處理的塊和: {result}"
def main(numbers: List[int] = None, num_chunks: int = 10, factor: int = 2):
"""
演示并行處理的主函數(shù)。
這個(gè)函數(shù)負(fù)責(zé)設(shè)置日志記錄、生成數(shù)字列表、確定最佳工作進(jìn)程數(shù)量、
將數(shù)字分成塊,并使用ProcessPoolExecutor進(jìn)行并行處理。
"""
import logging
logging.basicConfig(level=logging.INFO)
_log = logging.getLogger(__name__)
# 如果沒有提供數(shù)字,則生成示例列表
if numbers is None:
numbers = list(range(1, 101)) # 生成1到100的數(shù)字
total_numbers = len(numbers)
_log.info(f"開始并行處理 {total_numbers} 個(gè)數(shù)字")
cpu_count = multiprocessing.cpu_count()
_log.info(f"檢測到 {cpu_count} 個(gè)CPU核心")
# 確定最佳工作進(jìn)程數(shù)量
optimal_workers = min(cpu_count, num_chunks)
_log.info(f"使用 {optimal_workers} 個(gè)工作進(jìn)程")
# 計(jì)算塊大小
chunk_size = max(1, total_numbers // optimal_workers)
_log.info(f"每個(gè)塊包含 {chunk_size} 個(gè)數(shù)字")
# 將數(shù)字分成塊
chunks = [numbers[i:i + chunk_size] for i in range(0, total_numbers, chunk_size)]
_log.info(f"總共生成了 {len(chunks)} 個(gè)塊")
start_time = time.time()
processed_count = 0
# 使用ProcessPoolExecutor進(jìn)行并行處理
with ProcessPoolExecutor(max_workers=optimal_workers) as executor:
_log.info("啟動(dòng)ProcessPoolExecutor")
# 提交所有任務(wù)
futures = [executor.submit(process_numbers, chunk, factor) for chunk in chunks]
_log.info(f"提交了 {len(futures)} 個(gè)任務(wù)")
# 等待完成并收集結(jié)果
for future in as_completed(futures):
try:
result = future.result()
processed_count += 1
_log.info(f"{'#'*50}\n{result} ({processed_count}/{len(chunks)} 總計(jì))\n{'#'*50}")
except Exception as e:
_log.error(f"處理塊時(shí)出錯(cuò): {str(e)}")
raise
elapsed_time = time.time() - start_time
_log.info(f"并行處理完成,耗時(shí) {elapsed_time:.2f} 秒。")
if __name__ == "__main__":
# 使用數(shù)字列表的示例
main()代碼解釋
1. 導(dǎo)入必要的模塊
import time import multiprocessing from concurrent.futures import ProcessPoolExecutor, as_completed from typing import List
這些模塊提供了我們需要的并行處理功能和類型提示。
2. 定義處理函數(shù)
def process_numbers(chunk: List[int], factor: int) -> str:
"""
處理數(shù)字的函數(shù),通過將它們乘以因子來模擬處理。
這個(gè)函數(shù)接受一個(gè)數(shù)字列表和一個(gè)因子,計(jì)算列表中每個(gè)數(shù)字乘以因子的和,
并返回結(jié)果字符串。
"""
result = sum(x * factor for x in chunk)
time.sleep(0.1) # 使用睡眠模擬工作
return f"處理的塊和: {result}"這個(gè)函數(shù)模擬了對數(shù)字列表的處理,通過將每個(gè)數(shù)字乘以一個(gè)因子并求和。time.sleep(0.1)用于模擬實(shí)際工作。
3. 主函數(shù)
def main(numbers: List[int] = None, num_chunks: int = 10, factor: int = 2):
"""
演示并行處理的主函數(shù)。
這個(gè)函數(shù)負(fù)責(zé)設(shè)置日志記錄、生成數(shù)字列表、確定最佳工作進(jìn)程數(shù)量、
將數(shù)字分成塊,并使用ProcessPoolExecutor進(jìn)行并行處理。
"""
import logging
logging.basicConfig(level=logging.INFO)
_log = logging.getLogger(__name__)主函數(shù)負(fù)責(zé)設(shè)置日志記錄、生成數(shù)字列表、確定最佳工作進(jìn)程數(shù)量、將數(shù)字分成塊,并使用ProcessPoolExecutor進(jìn)行并行處理。
4. 生成數(shù)字列表
# 如果沒有提供數(shù)字,則生成示例列表
if numbers is None:
numbers = list(range(1, 101)) # 生成1到100的數(shù)字如果沒有提供數(shù)字列表,則生成1到100的數(shù)字列表。
5. 確定最佳工作進(jìn)程數(shù)量
cpu_count = multiprocessing.cpu_count()
_log.info(f"檢測到 {cpu_count} 個(gè)CPU核心")
# 確定最佳工作進(jìn)程數(shù)量
optimal_workers = min(cpu_count, num_chunks)
_log.info(f"使用 {optimal_workers} 個(gè)工作進(jìn)程")根據(jù)CPU核心數(shù)和用戶指定的塊數(shù),確定最佳工作進(jìn)程數(shù)量。
6. 將數(shù)字分成塊
# 計(jì)算塊大小
chunk_size = max(1, total_numbers // optimal_workers)
_log.info(f"每個(gè)塊包含 {chunk_size} 個(gè)數(shù)字")
# 將數(shù)字分成塊
chunks = [numbers[i:i + chunk_size] for i in range(0, total_numbers, chunk_size)]
_log.info(f"總共生成了 {len(chunks)} 個(gè)塊")將數(shù)字列表分成多個(gè)塊,每個(gè)塊的大小根據(jù)總數(shù)和工作進(jìn)程數(shù)量計(jì)算。
7. 并行處理
start_time = time.time()
processed_count = 0
# 使用ProcessPoolExecutor進(jìn)行并行處理
with ProcessPoolExecutor(max_workers=optimal_workers) as executor:
_log.info("啟動(dòng)ProcessPoolExecutor")
# 提交所有任務(wù)
futures = [executor.submit(process_numbers, chunk, factor) for chunk in chunks]
_log.info(f"提交了 {len(futures)} 個(gè)任務(wù)")
# 等待完成并收集結(jié)果
for future in as_completed(futures):
try:
result = future.result()
processed_count += 1
_log.info(f"{'#'*50}\n{result} ({processed_count}/{len(chunks)} 總計(jì))\n{'#'*50}")
except Exception as e:
_log.error(f"處理塊時(shí)出錯(cuò): {str(e)}")
raise使用ProcessPoolExecutor進(jìn)行并行處理,提交所有任務(wù)并等待完成。
8. 計(jì)算耗時(shí)
elapsed_time = time.time() - start_time
_log.info(f"并行處理完成,耗時(shí) {elapsed_time:.2f} 秒。")計(jì)算并行處理的總耗時(shí)并輸出。
并行處理的基本概念和優(yōu)勢
并行處理是指同時(shí)執(zhí)行多個(gè)任務(wù),以提高程序的執(zhí)行效率。Python的concurrent.futures模塊提供了一個(gè)高級接口,用于并行執(zhí)行任務(wù)。ProcessPoolExecutor是其中一個(gè)重要的類,它使用多進(jìn)程來并行執(zhí)行任務(wù)。
并行處理的優(yōu)勢包括:
- 提高程序的執(zhí)行效率
- 充分利用多核CPU的計(jì)算能力
- 簡化多線程或多進(jìn)程編程的復(fù)雜性
如何運(yùn)行和測試這個(gè)示例
- 將上述代碼保存為
parallel_processing_example.py文件。 - 確保你的Python環(huán)境中安裝了必要的模塊(本示例不需要額外安裝模塊)。
- 在終端或命令行中運(yùn)行以下命令:
python parallel_processing_example.py
你將看到程序的執(zhí)行過程和并行處理的結(jié)果。
總結(jié)
通過這個(gè)示例,我們展示了如何使用Python的ProcessPoolExecutor進(jìn)行并行處理。并行處理是提高程序性能的重要手段,特別是在處理大量數(shù)據(jù)或計(jì)算密集型任務(wù)時(shí)。希望這個(gè)示例能幫助你更好地理解并行處理的概念和實(shí)現(xiàn)。
到此這篇關(guān)于Python并行處理實(shí)戰(zhàn)之如何使用ProcessPoolExecutor加速計(jì)算的文章就介紹到這了,更多相關(guān)Python ProcessPoolExecutor加速計(jì)算內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python sqlalchemy動(dòng)態(tài)修改tablename兩種實(shí)現(xiàn)方式
這篇文章主要介紹了python sqlalchemy動(dòng)態(tài)修改tablename兩種實(shí)現(xiàn)方式,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧2023-03-03
Python實(shí)現(xiàn)概率分布公式及可視化詳解
在機(jī)器學(xué)習(xí)或者深度學(xué)習(xí)課題里,時(shí)常要頻繁地使用統(tǒng)計(jì)概率的理論來輔助進(jìn)行數(shù)據(jù)處理與研究,所以本文我們就來聊聊Python實(shí)現(xiàn)概率分布公式及可視化的相關(guān)知識吧2025-05-05
Python新版極驗(yàn)驗(yàn)證碼識別驗(yàn)證碼教程詳解
這篇文章主要介紹了Python新版極驗(yàn)驗(yàn)證碼識別驗(yàn)證碼,極驗(yàn)驗(yàn)證是一種在計(jì)算機(jī)領(lǐng)域用于區(qū)分自然人和機(jī)器人的,通過簡單集成的方式,為開發(fā)者提供安全、便捷的云端驗(yàn)證服務(wù)2023-02-02
PyQt6/PySide6中QTreeView類的實(shí)現(xiàn)
QTreeView是PyQt6或PySide6庫中用于顯示分層數(shù)據(jù)的控件,本文主要介紹了PyQt6/PySide6中QTreeView類的實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的可以了解一下2025-04-04
Python數(shù)據(jù)合并的concat函數(shù)與merge函數(shù)詳解
大家都知道concat()函數(shù)可以沿著一條軸將多個(gè)對象進(jìn)行堆疊,其使用方式類似數(shù)據(jù)庫中的數(shù)據(jù)表合并,在使用merge()函數(shù)進(jìn)行合并時(shí),默認(rèn)會(huì)使用重疊的列索引做為合并鍵,即取行索引重疊的部分,本文給大家介紹python?數(shù)據(jù)合并concat函數(shù)與merge函數(shù),感興趣的朋友一起看看吧2022-05-05
Python中驚艷的一行代碼簡潔強(qiáng)大表達(dá)力技巧實(shí)例
在Python中,語言的設(shè)計(jì)理念之一是簡潔優(yōu)雅,這使得我們能夠用一行代碼完成一些令人驚嘆的任務(wù),本文將分享一些在一行代碼中展現(xiàn)出Python強(qiáng)大表達(dá)力的示例,涵蓋各種領(lǐng)域的實(shí)用技巧2024-01-01
python的time模塊和datetime模塊實(shí)例解析
這篇文章主要介紹了python的time模塊和datetime模塊實(shí)例解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-11-11

