Python使用multiprocessing模塊實(shí)現(xiàn)多進(jìn)程并行計(jì)算
引言
Python 的 multiprocessing 模塊是一個(gè)標(biāo)準(zhǔn)庫(kù)模塊,用于實(shí)現(xiàn)多進(jìn)程并行計(jì)算。它通過(guò)創(chuàng)建獨(dú)立的進(jìn)程,繞過(guò) Python 的全局解釋器鎖(GIL),在多核 CPU 上實(shí)現(xiàn)真正的并行,特別適合 CPU 密集型任務(wù)(如數(shù)值計(jì)算、圖像處理)。相比線程(threading
模塊),multiprocessing
更適合需要高性能計(jì)算的場(chǎng)景。本文將詳細(xì)介紹 multiprocessing
模塊的定義、功能、用法、示例、應(yīng)用場(chǎng)景、最佳實(shí)踐和注意事項(xiàng)。
1. multiprocessing 模塊的定義和原理
1.1 定義
multiprocessing
是一個(gè)跨平臺(tái)的模塊,提供創(chuàng)建和管理進(jìn)程的 API,支持進(jìn)程間通信(IPC)、同步機(jī)制和共享資源管理。它模仿了 threading
模塊的接口,方便開(kāi)發(fā)者從線程遷移到進(jìn)程。
核心功能:
- 進(jìn)程創(chuàng)建:創(chuàng)建獨(dú)立進(jìn)程,運(yùn)行指定函數(shù)或任務(wù)。
- 進(jìn)程池:管理一組工作進(jìn)程,分配任務(wù)。
- 進(jìn)程通信:支持管道(
Pipe
)、隊(duì)列(Queue
)等 IPC 機(jī)制。 - 同步原語(yǔ):提供鎖(
Lock
)、信號(hào)量(Semaphore
)、事件(Event
)等。 - 共享內(nèi)存:支持共享基本數(shù)據(jù)類(lèi)型(
Value
)和數(shù)組(Array
)。 - 跨平臺(tái):在 Windows、Linux、macOS 上運(yùn)行一致。
依賴(lài):標(biāo)準(zhǔn)庫(kù),無(wú)需額外安裝。
1.2 原理
- 進(jìn)程 vs 線程:
- 進(jìn)程:獨(dú)立的內(nèi)存空間,擁有自己的 Python 解釋器和 GIL,適合 CPU 密集型任務(wù)。
- 線程:共享內(nèi)存空間,受 GIL 限制,適合 I/O 密集型任務(wù)。
- GIL 繞過(guò):每個(gè)進(jìn)程有獨(dú)立的 GIL,允許多核并行。
- 進(jìn)程創(chuàng)建:
- Linux/macOS:使用
fork
(復(fù)制父進(jìn)程),或spawn
(新進(jìn)程)。 - Windows:始終使用
spawn
,啟動(dòng)新解釋器。
- Linux/macOS:使用
- 通信開(kāi)銷(xiāo):進(jìn)程間通信(如
Queue
)比線程慢,需優(yōu)化設(shè)計(jì)。
1.3 導(dǎo)入
import multiprocessing
2. multiprocessing 的核心組件和功能
2.1 進(jìn)程創(chuàng)建(Process)
通過(guò) multiprocessing.Process
創(chuàng)建進(jìn)程,運(yùn)行指定函數(shù)。
構(gòu)造函數(shù):
Process(target=None, args=(), kwargs={}, name=None, daemon=None)
target
:目標(biāo)函數(shù)。args
/kwargs
:函數(shù)參數(shù)。name
:進(jìn)程名稱(chēng)。daemon
:是否為守護(hù)進(jìn)程(隨主進(jìn)程退出)。
主要方法:
start()
:?jiǎn)?dòng)進(jìn)程。join()
:等待進(jìn)程結(jié)束。terminate()
:強(qiáng)制終止進(jìn)程。is_alive()
:檢查進(jìn)程是否存活。
示例:
import multiprocessing def worker(num): print(f"Worker {num} running in process {multiprocessing.current_process().name}") if __name__ == "__main__": processes = [multiprocessing.Process(target=worker, args=(i,)) for i in range(3)] for p in processes: p.start() for p in processes: p.join()
輸出(順序可能不同):
Worker 0 running in process Process-1 Worker 1 running in process Process-2 Worker 2 running in process Process-3
- 說(shuō)明:創(chuàng)建 3 個(gè)進(jìn)程,每個(gè)運(yùn)行
worker
函數(shù)。
2.2 進(jìn)程池(Pool)
Pool
用于管理固定數(shù)量的進(jìn)程,適合并行處理大量任務(wù)。
構(gòu)造函數(shù):
Pool(processes=None, initializer=None, initargs=())
processes
:進(jìn)程數(shù)(默認(rèn) CPU 核心數(shù))。initializer
:每個(gè)進(jìn)程的初始化函數(shù)。initargs
:初始化函數(shù)參數(shù)。
主要方法:
map(func, iterable)
:并行執(zhí)行func
應(yīng)用于iterable
,返回結(jié)果列表。imap(func, iterable)
:惰性版本,返回迭代器。apply(func, args=(), kwds={})
:同步執(zhí)行單任務(wù)。apply_async(func, args=(), kwds={})
:異步執(zhí)行單任務(wù)。close()
:關(guān)閉池,禁止新任務(wù)。join()
:等待池內(nèi)進(jìn)程完成。
示例:
from multiprocessing import Pool def square(n): return n * n if __name__ == "__main__": with Pool(processes=4) as pool: results = pool.map(square, range(10)) print(results) # 輸出: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
2.3 進(jìn)程通信
支持 Pipe
和 Queue
實(shí)現(xiàn)進(jìn)程間數(shù)據(jù)交換。
Pipe
- 雙向或單向管道,適合兩個(gè)進(jìn)程通信。
構(gòu)造函數(shù):
Pipe(duplex=True)
- 返回
(conn1, conn2)
,兩個(gè)連接對(duì)象。 duplex=True
:雙向;False
:?jiǎn)蜗颉?/li>
示例:
from multiprocessing import Process, Pipe def sender(conn): conn.send("Hello from sender") conn.close() def receiver(conn): print(conn.recv()) conn.close() if __name__ == "__main__": parent_conn, child_conn = Pipe() p1 = Process(target=sender, args=(child_conn,)) p2 = Process(target=receiver, args=(parent_conn,)) p1.start() p2.start() p1.join() p2.join()
輸出:
Hello from sender
Queue
- 線程和進(jìn)程安全的隊(duì)列,適合多生產(chǎn)者/消費(fèi)者場(chǎng)景。
構(gòu)造函數(shù):
Queue(maxsize=0)
maxsize
:最大容量(0 表示無(wú)限制)。
示例:
from multiprocessing import Process, Queue def producer(queue): queue.put("Data from producer") def consumer(queue): print(queue.get()) if __name__ == "__main__": queue = Queue() p1 = Process(target=producer, args=(queue,)) p2 = Process(target=consumer, args=(queue,)) p1.start() p2.start() p1.join() p2.join()
2.4 同步機(jī)制
提供鎖、信號(hào)量等原語(yǔ),確保進(jìn)程安全訪問(wèn)共享資源。
Lock
- 互斥鎖,防止多個(gè)進(jìn)程同時(shí)訪問(wèn)資源。
- 示例:
from multiprocessing import Process, Lock def printer(lock, msg): with lock: print(msg) if __name__ == "__main__": lock = Lock() processes = [Process(target=printer, args=(lock, f"Message {i}")) for i in range(3)] for p in processes: p.start() for p in processes: p.join()
Semaphore
- 控制有限資源的并發(fā)訪問(wèn)。
- 示例:
from multiprocessing import Process, Semaphore def worker(sem, name): with sem: print(f"{name} acquired resource") # 模擬工作 if __name__ == "__main__": sem = Semaphore(2) # 允許 2 個(gè)進(jìn)程同時(shí)訪問(wèn) processes = [Process(target=worker, args=(sem, f"Worker {i}")) for i in range(5)] for p in processes: p.start() for p in processes: p.join()
Event
- 進(jìn)程間信號(hào)通知。
- 示例:
from multiprocessing import Process, Event import time def wait_for_event(event): event.wait() print("Event triggered") if __name__ == "__main__": event = Event() p = Process(target=wait_for_event, args=(event,)) p.start() time.sleep(1) event.set() # 觸發(fā)事件 p.join()
2.5 共享內(nèi)存
通過(guò) Value
和 Array
共享基本數(shù)據(jù)類(lèi)型。
- Value:?jiǎn)蝹€(gè)共享值。
- Array:共享數(shù)組。
示例:
from multiprocessing import Process, Value, Array def modify(shared_num, shared_arr): shared_num.value += 1 for i in range(len(shared_arr)): shared_arr[i] += 1 if __name__ == "__main__": num = Value("i", 0) # 共享整數(shù) arr = Array("i", [1, 2, 3]) # 共享數(shù)組 p = Process(target=modify, args=(num, arr)) p.start() p.join() print(num.value) # 輸出: 1 print(list(arr)) # 輸出: [2, 3, 4]
3. 應(yīng)用場(chǎng)景
數(shù)值計(jì)算:
- 并行處理矩陣運(yùn)算、蒙特卡洛模擬。
- 示例:計(jì)算大數(shù)組的平方。
圖像處理:
- 并行處理圖像濾波、特征提取。
- 示例:批量應(yīng)用卷積濾波。
機(jī)器學(xué)習(xí):
- 并行訓(xùn)練模型或處理數(shù)據(jù)預(yù)處理。
- 示例:并行特征提取。
數(shù)據(jù)處理:
- 并行處理 CSV 文件、數(shù)據(jù)庫(kù)查詢(xún)。
- 示例:多進(jìn)程解析日志文件。
爬蟲(chóng):
- 并行抓取網(wǎng)頁(yè)(注意網(wǎng)絡(luò)限制)。
- 示例:結(jié)合
urllib
并發(fā)下載。
4. 示例:多進(jìn)程爬蟲(chóng)
結(jié)合 urllib
和 Queue
實(shí)現(xiàn)并行網(wǎng)頁(yè)抓取。
示例:
import urllib.request from multiprocessing import Process, Queue from urllib.error import URLError def fetch_url(queue, url): try: with urllib.request.urlopen(url) as response: content = response.read().decode("utf-8") queue.put((url, len(content))) except URLError as e: queue.put((url, str(e))) def main(): urls = ["https://example.com", "https://python.org", "https://invalid-url"] queue = Queue() processes = [Process(target=fetch_url, args=(queue, url)) for url in urls] for p in processes: p.start() for p in processes: p.join() while not queue.empty(): url, result = queue.get() print(f"{url}: {result}") if __name__ == "__main__": main()
輸出(示例):
https://example.com: 1256 https://python.org: 50000 https://invalid-url: [Errno 11001] getaddrinfo failed
5. 最佳實(shí)踐
使用 if __name__ == "__main__":
:
- 防止 Windows 和某些 Unix 系統(tǒng)重復(fù)導(dǎo)入模塊。
示例:
if __name__ == "__main__": p = Process(target=worker) p.start()
選擇進(jìn)程池:
- 對(duì)于批量任務(wù),使用
Pool
簡(jiǎn)化管理。
示例:
with Pool(4) as pool: results = pool.map(func, data)
優(yōu)化通信:
- 盡量減少進(jìn)程間通信,使用共享內(nèi)存或批量傳遞數(shù)據(jù)。
示例:
arr = Array("i", [0] * size)
異常處理:
- 在子進(jìn)程中捕獲異常,通過(guò)
Queue
或日志返回。
示例:
def worker(queue): try: # 工作代碼 except Exception as e: queue.put(str(e))
測(cè)試代碼:
- 使用
pytest
測(cè)試多進(jìn)程行為。
示例:
import pytest from multiprocessing import Process def test_process(): def worker(): print("Test") p = Process(target=worker) p.start() p.join() assert p.exitcode == 0
進(jìn)程數(shù)選擇:
- 默認(rèn)使用 CPU 核心數(shù)(
multiprocessing.cpu_count()
)。
示例:
processes = min(len(tasks), multiprocessing.cpu_count())
6. 注意事項(xiàng)
GIL 限制:
multiprocessing
繞過(guò) GIL,適合 CPU 密集型任務(wù);I/O 密集型任務(wù)考慮threading
或asyncio
。
示例:
# I/O 密集型:使用 asyncio import asyncio async def fetch(): pass
Windows 兼容性:
- Windows 使用
spawn
,需確保代碼在if __name__ == "__main__":
中。
示例:
if __name__ == "__main__": main()
資源管理:
- 及時(shí)關(guān)閉進(jìn)程和池,釋放資源。
示例:
with Pool() as pool: pool.map(func, data)
序列化開(kāi)銷(xiāo):
- 傳遞大數(shù)據(jù)到子進(jìn)程(如通過(guò)
Queue
)可能慢,使用共享內(nèi)存。
示例:
shared_data = Value("d", 0.0)
調(diào)試難度:
- 子進(jìn)程錯(cuò)誤可能不易捕獲,使用日志或
Queue
返回錯(cuò)誤。
示例:
import logging logging.basicConfig(level=logging.INFO)
7. 總結(jié)
Python 的 multiprocessing
模塊是實(shí)現(xiàn)多進(jìn)程并行的強(qiáng)大工具,繞過(guò) GIL,適合 CPU 密集型任務(wù)。其核心特點(diǎn)包括:
- 定義:提供進(jìn)程創(chuàng)建、通信、同步和共享內(nèi)存的 API。
- 功能:支持
Process
、Pool
、Queue
、Pipe
、Lock
等。 - 應(yīng)用:數(shù)值計(jì)算、圖像處理、機(jī)器學(xué)習(xí)、數(shù)據(jù)處理、爬蟲(chóng)。
- 最佳實(shí)踐:使用
if __name__ == "__main__":
、優(yōu)化通信、測(cè)試代碼。
以上就是Python使用multiprocessing模塊實(shí)現(xiàn)多進(jìn)程并行計(jì)算的詳細(xì)內(nèi)容,更多關(guān)于Python multiprocessing多進(jìn)程并行計(jì)算的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Python文件操作指南解鎖三個(gè)txt文件合并技術(shù)
本文將深入介紹如何利用Python編寫(xiě)腳本,將三個(gè)文本文件中指定的列數(shù)據(jù)合并成一個(gè)新文件,通過(guò)豐富的示例代碼和詳細(xì)解釋,幫助掌握這一實(shí)用而靈活的數(shù)據(jù)處理技巧2024-01-01Python中socket網(wǎng)絡(luò)通信是干嘛的
在本篇文章里小編給大家分享的是關(guān)于Python中socket網(wǎng)絡(luò)通信知識(shí)點(diǎn)內(nèi)容,需要的朋友們可以跟著學(xué)習(xí)下。2020-05-05在Pycharm中設(shè)置默認(rèn)自動(dòng)換行的方法
今天小編就為大家分享一篇在Pycharm中設(shè)置默認(rèn)自動(dòng)換行的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-01-01如何通過(guò)Python實(shí)現(xiàn)一個(gè)消息隊(duì)列
這篇文章主要為大家詳細(xì)介紹了如何通過(guò)Python實(shí)現(xiàn)一個(gè)簡(jiǎn)單的消息隊(duì)列,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2025-02-02使用Python提取PDF文件中內(nèi)容的代碼示例和使用技巧
在文檔自動(dòng)化處理、數(shù)據(jù)提取和信息分析等任務(wù)中,從 PDF 文件中提取文本是一項(xiàng)常見(jiàn)需求,PDF 文件通常分為兩種類(lèi)型:基于文本的 PDF 和 包含掃描圖像的 PDF,本文將介紹如何使用 Python 分別提取這兩種類(lèi)型的 PDF 內(nèi)容,需要的朋友可以參考下2025-07-07Python基礎(chǔ)之賦值,淺拷貝,深拷貝的區(qū)別
這篇文章主要介紹了Python基礎(chǔ)之賦值,淺拷貝,深拷貝的區(qū)別,文中有非常詳細(xì)的代碼示例,對(duì)正在學(xué)習(xí)python基礎(chǔ)的小伙伴們也有非常好的幫助,需要的朋友可以參考下2021-04-04極簡(jiǎn)Python庫(kù)CherryPy構(gòu)建高性能Web應(yīng)用實(shí)例探索
今天為大家介紹的是 CherryPy,它是一個(gè)極簡(jiǎn)、穩(wěn)定且功能強(qiáng)大的Web框架,可以幫助開(kāi)發(fā)者快速構(gòu)建高性能的 Web 應(yīng)用程序,使用 CherryPy,你可以輕松地創(chuàng)建RESTful API、靜態(tài)網(wǎng)站、異步任務(wù)和 WebSocket 等應(yīng)用2024-01-01對(duì)python的bytes類(lèi)型數(shù)據(jù)split分割切片方法
今天小編就為大家分享一篇對(duì)python的bytes類(lèi)型數(shù)據(jù)split分割切片方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-12-12