欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python使用multiprocessing模塊實(shí)現(xiàn)多進(jìn)程并行計(jì)算

 更新時(shí)間:2025年07月21日 09:44:51   作者:彬彬俠  
Python的multiprocessing模塊是一個(gè)標(biāo)準(zhǔn)庫(kù)模塊,用于實(shí)現(xiàn)多進(jìn)程并行計(jì)算,相比線程(threading 模塊),multiprocessing更適合需要高性能計(jì)算的場(chǎng)景,本文將詳細(xì)介紹multiprocessing模塊的定義、功能、用法、示例、應(yīng)用場(chǎng)景、最佳實(shí)踐和注意事項(xiàng),需要的朋友可以參考下

引言

Python 的 multiprocessing 模塊是一個(gè)標(biāo)準(zhǔn)庫(kù)模塊,用于實(shí)現(xiàn)多進(jìn)程并行計(jì)算。它通過(guò)創(chuàng)建獨(dú)立的進(jìn)程,繞過(guò) Python 的全局解釋器鎖(GIL),在多核 CPU 上實(shí)現(xiàn)真正的并行,特別適合 CPU 密集型任務(wù)(如數(shù)值計(jì)算、圖像處理)。相比線程(threading 模塊),multiprocessing 更適合需要高性能計(jì)算的場(chǎng)景。本文將詳細(xì)介紹 multiprocessing 模塊的定義、功能、用法、示例、應(yīng)用場(chǎng)景、最佳實(shí)踐和注意事項(xiàng)。

1. multiprocessing 模塊的定義和原理

1.1 定義

multiprocessing 是一個(gè)跨平臺(tái)的模塊,提供創(chuàng)建和管理進(jìn)程的 API,支持進(jìn)程間通信(IPC)、同步機(jī)制和共享資源管理。它模仿了 threading 模塊的接口,方便開(kāi)發(fā)者從線程遷移到進(jìn)程。

核心功能

  • 進(jìn)程創(chuàng)建:創(chuàng)建獨(dú)立進(jìn)程,運(yùn)行指定函數(shù)或任務(wù)。
  • 進(jìn)程池:管理一組工作進(jìn)程,分配任務(wù)。
  • 進(jìn)程通信:支持管道(Pipe)、隊(duì)列(Queue)等 IPC 機(jī)制。
  • 同步原語(yǔ):提供鎖(Lock)、信號(hào)量(Semaphore)、事件(Event)等。
  • 共享內(nèi)存:支持共享基本數(shù)據(jù)類(lèi)型(Value)和數(shù)組(Array)。
  • 跨平臺(tái):在 Windows、Linux、macOS 上運(yùn)行一致。

依賴(lài):標(biāo)準(zhǔn)庫(kù),無(wú)需額外安裝。

1.2 原理

  • 進(jìn)程 vs 線程
    • 進(jìn)程:獨(dú)立的內(nèi)存空間,擁有自己的 Python 解釋器和 GIL,適合 CPU 密集型任務(wù)。
    • 線程:共享內(nèi)存空間,受 GIL 限制,適合 I/O 密集型任務(wù)。
  • GIL 繞過(guò):每個(gè)進(jìn)程有獨(dú)立的 GIL,允許多核并行。
  • 進(jìn)程創(chuàng)建
    • Linux/macOS:使用 fork(復(fù)制父進(jìn)程),或 spawn(新進(jìn)程)。
    • Windows:始終使用 spawn,啟動(dòng)新解釋器。
  • 通信開(kāi)銷(xiāo):進(jìn)程間通信(如 Queue)比線程慢,需優(yōu)化設(shè)計(jì)。

1.3 導(dǎo)入

import multiprocessing

2. multiprocessing 的核心組件和功能

2.1 進(jìn)程創(chuàng)建(Process)

通過(guò) multiprocessing.Process 創(chuàng)建進(jìn)程,運(yùn)行指定函數(shù)。

構(gòu)造函數(shù)

Process(target=None, args=(), kwargs={}, name=None, daemon=None)
  • target:目標(biāo)函數(shù)。
  • args/kwargs:函數(shù)參數(shù)。
  • name:進(jìn)程名稱(chēng)。
  • daemon:是否為守護(hù)進(jìn)程(隨主進(jìn)程退出)。

主要方法

  • start():?jiǎn)?dòng)進(jìn)程。
  • join():等待進(jìn)程結(jié)束。
  • terminate():強(qiáng)制終止進(jìn)程。
  • is_alive():檢查進(jìn)程是否存活。

示例

import multiprocessing

def worker(num):
    print(f"Worker {num} running in process {multiprocessing.current_process().name}")

if __name__ == "__main__":
    processes = [multiprocessing.Process(target=worker, args=(i,)) for i in range(3)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

輸出(順序可能不同):

Worker 0 running in process Process-1
Worker 1 running in process Process-2
Worker 2 running in process Process-3
  • 說(shuō)明:創(chuàng)建 3 個(gè)進(jìn)程,每個(gè)運(yùn)行 worker 函數(shù)。

2.2 進(jìn)程池(Pool)

Pool 用于管理固定數(shù)量的進(jìn)程,適合并行處理大量任務(wù)。

構(gòu)造函數(shù)

Pool(processes=None, initializer=None, initargs=())
  • processes:進(jìn)程數(shù)(默認(rèn) CPU 核心數(shù))。
  • initializer:每個(gè)進(jìn)程的初始化函數(shù)。
  • initargs:初始化函數(shù)參數(shù)。

主要方法

  • map(func, iterable):并行執(zhí)行 func 應(yīng)用于 iterable,返回結(jié)果列表。
  • imap(func, iterable):惰性版本,返回迭代器。
  • apply(func, args=(), kwds={}):同步執(zhí)行單任務(wù)。
  • apply_async(func, args=(), kwds={}):異步執(zhí)行單任務(wù)。
  • close():關(guān)閉池,禁止新任務(wù)。
  • join():等待池內(nèi)進(jìn)程完成。

示例

from multiprocessing import Pool

def square(n):
    return n * n

if __name__ == "__main__":
    with Pool(processes=4) as pool:
        results = pool.map(square, range(10))
    print(results)  # 輸出: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

2.3 進(jìn)程通信

支持 PipeQueue 實(shí)現(xiàn)進(jìn)程間數(shù)據(jù)交換。

Pipe

  • 雙向或單向管道,適合兩個(gè)進(jìn)程通信。

構(gòu)造函數(shù)

Pipe(duplex=True)
  • 返回 (conn1, conn2),兩個(gè)連接對(duì)象。
  • duplex=True:雙向;False:?jiǎn)蜗颉?/li>

示例

from multiprocessing import Process, Pipe

def sender(conn):
    conn.send("Hello from sender")
    conn.close()

def receiver(conn):
    print(conn.recv())
    conn.close()

if __name__ == "__main__":
    parent_conn, child_conn = Pipe()
    p1 = Process(target=sender, args=(child_conn,))
    p2 = Process(target=receiver, args=(parent_conn,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

輸出

Hello from sender

Queue

  • 線程和進(jìn)程安全的隊(duì)列,適合多生產(chǎn)者/消費(fèi)者場(chǎng)景。

構(gòu)造函數(shù)

Queue(maxsize=0)
  • maxsize:最大容量(0 表示無(wú)限制)。

示例

from multiprocessing import Process, Queue

def producer(queue):
    queue.put("Data from producer")

def consumer(queue):
    print(queue.get())

if __name__ == "__main__":
    queue = Queue()
    p1 = Process(target=producer, args=(queue,))
    p2 = Process(target=consumer, args=(queue,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

2.4 同步機(jī)制

提供鎖、信號(hào)量等原語(yǔ),確保進(jìn)程安全訪問(wèn)共享資源。

Lock

  • 互斥鎖,防止多個(gè)進(jìn)程同時(shí)訪問(wèn)資源。
  • 示例
from multiprocessing import Process, Lock

def printer(lock, msg):
    with lock:
        print(msg)

if __name__ == "__main__":
    lock = Lock()
    processes = [Process(target=printer, args=(lock, f"Message {i}")) for i in range(3)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

Semaphore

  • 控制有限資源的并發(fā)訪問(wèn)。
  • 示例
from multiprocessing import Process, Semaphore

def worker(sem, name):
    with sem:
        print(f"{name} acquired resource")
        # 模擬工作

if __name__ == "__main__":
    sem = Semaphore(2)  # 允許 2 個(gè)進(jìn)程同時(shí)訪問(wèn)
    processes = [Process(target=worker, args=(sem, f"Worker {i}")) for i in range(5)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

Event

  • 進(jìn)程間信號(hào)通知。
  • 示例
from multiprocessing import Process, Event
import time

def wait_for_event(event):
    event.wait()
    print("Event triggered")

if __name__ == "__main__":
    event = Event()
    p = Process(target=wait_for_event, args=(event,))
    p.start()
    time.sleep(1)
    event.set()  # 觸發(fā)事件
    p.join()

2.5 共享內(nèi)存

通過(guò) ValueArray 共享基本數(shù)據(jù)類(lèi)型。

  • Value:?jiǎn)蝹€(gè)共享值。
  • Array:共享數(shù)組。

示例

from multiprocessing import Process, Value, Array

def modify(shared_num, shared_arr):
    shared_num.value += 1
    for i in range(len(shared_arr)):
        shared_arr[i] += 1

if __name__ == "__main__":
    num = Value("i", 0)  # 共享整數(shù)
    arr = Array("i", [1, 2, 3])  # 共享數(shù)組
    p = Process(target=modify, args=(num, arr))
    p.start()
    p.join()
    print(num.value)  # 輸出: 1
    print(list(arr))  # 輸出: [2, 3, 4]

3. 應(yīng)用場(chǎng)景

數(shù)值計(jì)算

  • 并行處理矩陣運(yùn)算、蒙特卡洛模擬。
  • 示例:計(jì)算大數(shù)組的平方。

圖像處理

  • 并行處理圖像濾波、特征提取。
  • 示例:批量應(yīng)用卷積濾波。

機(jī)器學(xué)習(xí)

  • 并行訓(xùn)練模型或處理數(shù)據(jù)預(yù)處理。
  • 示例:并行特征提取。

數(shù)據(jù)處理

  • 并行處理 CSV 文件、數(shù)據(jù)庫(kù)查詢(xún)。
  • 示例:多進(jìn)程解析日志文件。

爬蟲(chóng)

  • 并行抓取網(wǎng)頁(yè)(注意網(wǎng)絡(luò)限制)。
  • 示例:結(jié)合 urllib 并發(fā)下載。

4. 示例:多進(jìn)程爬蟲(chóng)

結(jié)合 urllibQueue 實(shí)現(xiàn)并行網(wǎng)頁(yè)抓取。

示例

import urllib.request
from multiprocessing import Process, Queue
from urllib.error import URLError

def fetch_url(queue, url):
    try:
        with urllib.request.urlopen(url) as response:
            content = response.read().decode("utf-8")
            queue.put((url, len(content)))
    except URLError as e:
        queue.put((url, str(e)))

def main():
    urls = ["https://example.com", "https://python.org", "https://invalid-url"]
    queue = Queue()
    processes = [Process(target=fetch_url, args=(queue, url)) for url in urls]
    for p in processes:
        p.start()
    for p in processes:
        p.join()
    while not queue.empty():
        url, result = queue.get()
        print(f"{url}: {result}")

if __name__ == "__main__":
    main()

輸出(示例):

https://example.com: 1256
https://python.org: 50000
https://invalid-url: [Errno 11001] getaddrinfo failed

5. 最佳實(shí)踐

使用 if __name__ == "__main__":

  • 防止 Windows 和某些 Unix 系統(tǒng)重復(fù)導(dǎo)入模塊。

示例

if __name__ == "__main__":
    p = Process(target=worker)
    p.start()

選擇進(jìn)程池

  • 對(duì)于批量任務(wù),使用 Pool 簡(jiǎn)化管理。

示例

with Pool(4) as pool:
    results = pool.map(func, data)

優(yōu)化通信

  • 盡量減少進(jìn)程間通信,使用共享內(nèi)存或批量傳遞數(shù)據(jù)。

示例

arr = Array("i", [0] * size)

異常處理

  • 在子進(jìn)程中捕獲異常,通過(guò) Queue 或日志返回。

示例

def worker(queue):
    try:
        # 工作代碼
    except Exception as e:
        queue.put(str(e))

測(cè)試代碼

  • 使用 pytest 測(cè)試多進(jìn)程行為。

示例

import pytest
from multiprocessing import Process

def test_process():
    def worker():
        print("Test")
    p = Process(target=worker)
    p.start()
    p.join()
    assert p.exitcode == 0

進(jìn)程數(shù)選擇

  • 默認(rèn)使用 CPU 核心數(shù)(multiprocessing.cpu_count())。

示例

processes = min(len(tasks), multiprocessing.cpu_count())

6. 注意事項(xiàng)

GIL 限制

  • multiprocessing 繞過(guò) GIL,適合 CPU 密集型任務(wù);I/O 密集型任務(wù)考慮 threadingasyncio。

示例

# I/O 密集型:使用 asyncio
import asyncio
async def fetch():
    pass

Windows 兼容性

  • Windows 使用 spawn,需確保代碼在 if __name__ == "__main__": 中。

示例

if __name__ == "__main__":
    main()

資源管理

  • 及時(shí)關(guān)閉進(jìn)程和池,釋放資源。

示例

with Pool() as pool:
    pool.map(func, data)

序列化開(kāi)銷(xiāo)

  • 傳遞大數(shù)據(jù)到子進(jìn)程(如通過(guò) Queue)可能慢,使用共享內(nèi)存。

示例

shared_data = Value("d", 0.0)

調(diào)試難度

  • 子進(jìn)程錯(cuò)誤可能不易捕獲,使用日志或 Queue 返回錯(cuò)誤。

示例

import logging
logging.basicConfig(level=logging.INFO)

7. 總結(jié)

Python 的 multiprocessing 模塊是實(shí)現(xiàn)多進(jìn)程并行的強(qiáng)大工具,繞過(guò) GIL,適合 CPU 密集型任務(wù)。其核心特點(diǎn)包括:

  • 定義:提供進(jìn)程創(chuàng)建、通信、同步和共享內(nèi)存的 API。
  • 功能:支持 Process、Pool、Queue、PipeLock 等。
  • 應(yīng)用:數(shù)值計(jì)算、圖像處理、機(jī)器學(xué)習(xí)、數(shù)據(jù)處理、爬蟲(chóng)。
  • 最佳實(shí)踐:使用 if __name__ == "__main__":、優(yōu)化通信、測(cè)試代碼。

以上就是Python使用multiprocessing模塊實(shí)現(xiàn)多進(jìn)程并行計(jì)算的詳細(xì)內(nèi)容,更多關(guān)于Python multiprocessing多進(jìn)程并行計(jì)算的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Python文件操作指南解鎖三個(gè)txt文件合并技術(shù)

    Python文件操作指南解鎖三個(gè)txt文件合并技術(shù)

    本文將深入介紹如何利用Python編寫(xiě)腳本,將三個(gè)文本文件中指定的列數(shù)據(jù)合并成一個(gè)新文件,通過(guò)豐富的示例代碼和詳細(xì)解釋,幫助掌握這一實(shí)用而靈活的數(shù)據(jù)處理技巧
    2024-01-01
  • Python中socket網(wǎng)絡(luò)通信是干嘛的

    Python中socket網(wǎng)絡(luò)通信是干嘛的

    在本篇文章里小編給大家分享的是關(guān)于Python中socket網(wǎng)絡(luò)通信知識(shí)點(diǎn)內(nèi)容,需要的朋友們可以跟著學(xué)習(xí)下。
    2020-05-05
  • 在Pycharm中設(shè)置默認(rèn)自動(dòng)換行的方法

    在Pycharm中設(shè)置默認(rèn)自動(dòng)換行的方法

    今天小編就為大家分享一篇在Pycharm中設(shè)置默認(rèn)自動(dòng)換行的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2019-01-01
  • 如何通過(guò)Python實(shí)現(xiàn)一個(gè)消息隊(duì)列

    如何通過(guò)Python實(shí)現(xiàn)一個(gè)消息隊(duì)列

    這篇文章主要為大家詳細(xì)介紹了如何通過(guò)Python實(shí)現(xiàn)一個(gè)簡(jiǎn)單的消息隊(duì)列,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下
    2025-02-02
  • 使用Python提取PDF文件中內(nèi)容的代碼示例和使用技巧

    使用Python提取PDF文件中內(nèi)容的代碼示例和使用技巧

    在文檔自動(dòng)化處理、數(shù)據(jù)提取和信息分析等任務(wù)中,從 PDF 文件中提取文本是一項(xiàng)常見(jiàn)需求,PDF 文件通常分為兩種類(lèi)型:基于文本的 PDF 和 包含掃描圖像的 PDF,本文將介紹如何使用 Python 分別提取這兩種類(lèi)型的 PDF 內(nèi)容,需要的朋友可以參考下
    2025-07-07
  • python gdal安裝與簡(jiǎn)單使用

    python gdal安裝與簡(jiǎn)單使用

    這篇文章主要介紹了python gdal安裝與簡(jiǎn)單使用,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2019-08-08
  • python可以用哪些數(shù)據(jù)庫(kù)

    python可以用哪些數(shù)據(jù)庫(kù)

    在本篇文章里小編給大家整理的是關(guān)于python支持哪些數(shù)據(jù)庫(kù)的相關(guān)知識(shí)點(diǎn)內(nèi)容,有興趣的朋友們可以學(xué)習(xí)下。
    2020-06-06
  • Python基礎(chǔ)之賦值,淺拷貝,深拷貝的區(qū)別

    Python基礎(chǔ)之賦值,淺拷貝,深拷貝的區(qū)別

    這篇文章主要介紹了Python基礎(chǔ)之賦值,淺拷貝,深拷貝的區(qū)別,文中有非常詳細(xì)的代碼示例,對(duì)正在學(xué)習(xí)python基礎(chǔ)的小伙伴們也有非常好的幫助,需要的朋友可以參考下
    2021-04-04
  • 極簡(jiǎn)Python庫(kù)CherryPy構(gòu)建高性能Web應(yīng)用實(shí)例探索

    極簡(jiǎn)Python庫(kù)CherryPy構(gòu)建高性能Web應(yīng)用實(shí)例探索

    今天為大家介紹的是 CherryPy,它是一個(gè)極簡(jiǎn)、穩(wěn)定且功能強(qiáng)大的Web框架,可以幫助開(kāi)發(fā)者快速構(gòu)建高性能的 Web 應(yīng)用程序,使用 CherryPy,你可以輕松地創(chuàng)建RESTful API、靜態(tài)網(wǎng)站、異步任務(wù)和 WebSocket 等應(yīng)用
    2024-01-01
  • 對(duì)python的bytes類(lèi)型數(shù)據(jù)split分割切片方法

    對(duì)python的bytes類(lèi)型數(shù)據(jù)split分割切片方法

    今天小編就為大家分享一篇對(duì)python的bytes類(lèi)型數(shù)據(jù)split分割切片方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2018-12-12

最新評(píng)論