欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python子進(jìn)程中創(chuàng)建多線程的完整指南

 更新時(shí)間:2025年08月28日 09:09:24   作者:Yant224  
在操作系統(tǒng)中,進(jìn)程是資源分配的基本單位,每個(gè)進(jìn)程都有獨(dú)立的內(nèi)存空間、文件描述符等系統(tǒng)資源,而線程是進(jìn)程內(nèi)的執(zhí)行單元,多個(gè)線程共享同一進(jìn)程的資源,本文給大家介紹了Python子進(jìn)程中創(chuàng)建多線程的完整指南,需要的朋友可以參考下

一、理解進(jìn)程與線程的關(guān)系

1.1 進(jìn)程與線程的基本概念

在操作系統(tǒng)中,進(jìn)程是資源分配的基本單位,每個(gè)進(jìn)程都有獨(dú)立的內(nèi)存空間、文件描述符等系統(tǒng)資源。而線程是進(jìn)程內(nèi)的執(zhí)行單元,多個(gè)線程共享同一進(jìn)程的資源。

Python中的特殊之處在于**全局解釋器鎖(GIL)**的存在,這使得在單個(gè)進(jìn)程中,多線程無(wú)法真正并行執(zhí)行CPU密集型任務(wù)。但在I/O密集型任務(wù)中,多線程仍然能顯著提升性能。

1.2 子進(jìn)程內(nèi)多線程的架構(gòu)模型

這種架構(gòu)的優(yōu)勢(shì)在于:

  • 充分利用多核CPU:每個(gè)子進(jìn)程可以在不同的CPU核心上運(yùn)行
  • 資源共享與隔離平衡:線程共享進(jìn)程資源,進(jìn)程間資源隔離
  • 靈活的任務(wù)分配:可以根據(jù)任務(wù)特性選擇進(jìn)程級(jí)或線程級(jí)并行

二、實(shí)現(xiàn)原理與技術(shù)細(xì)節(jié)

2.1 Python的多進(jìn)程模塊

Python提供了multiprocessing模塊來(lái)創(chuàng)建和管理進(jìn)程:

import multiprocessing
import os

def worker():
    print(f"進(jìn)程ID: {os.getpid()}, 進(jìn)程名稱: {multiprocessing.current_process().name}")

if __name__ == "__main__":
    processes = []
    for i in range(3):
        p = multiprocessing.Process(target=worker, name=f"Process-{i}")
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()

2.2 進(jìn)程間通信(IPC)機(jī)制

由于進(jìn)程有獨(dú)立的內(nèi)存空間,必須使用特殊的通信機(jī)制:

通信方式描述適用場(chǎng)景
Queue先進(jìn)先出的隊(duì)列生產(chǎn)者-消費(fèi)者模式
Pipe雙向通信通道一對(duì)一通信
Shared Memory共享內(nèi)存區(qū)域高性能數(shù)據(jù)共享
Manager托管共享對(duì)象復(fù)雜數(shù)據(jù)結(jié)構(gòu)共享

三、三種實(shí)現(xiàn)方法詳解

3.1 方法一:基礎(chǔ)組合方式

import multiprocessing
import threading
import time

def thread_task(thread_id):
    """線程工作函數(shù)"""
    print(f"線程 {thread_id} 在進(jìn)程 {multiprocessing.current_process().name} 中運(yùn)行")
    time.sleep(2)
    return f"線程 {thread_id} 完成"

def process_task():
    """進(jìn)程工作函數(shù)"""
    print(f"進(jìn)程 {multiprocessing.current_process().name} 啟動(dòng)")
    
    # 創(chuàng)建并啟動(dòng)多個(gè)線程
    threads = []
    results = []
    
    for i in range(4):
        t = threading.Thread(target=thread_task, args=(i,))
        threads.append(t)
        t.start()
    
    # 等待所有線程完成
    for t in threads:
        t.join()
    
    print(f"進(jìn)程 {multiprocessing.current_process().name} 結(jié)束")

if __name__ == "__main__":
    # 創(chuàng)建3個(gè)子進(jìn)程
    processes = []
    for i in range(3):
        p = multiprocessing.Process(target=process_task, name=f"SubProcess-{i}")
        processes.append(p)
        p.start()
    
    # 等待所有子進(jìn)程完成
    for p in processes:
        p.join()
    
    print("所有進(jìn)程完成")

3.2 方法二:使用進(jìn)程池和線程池

import concurrent.futures
import multiprocessing
import threading
import time

def thread_worker(data):
    """線程池工作函數(shù)"""
    process_name = multiprocessing.current_process().name
    thread_name = threading.current_thread().name
    time.sleep(0.5)  # 模擬工作負(fù)載
    return f"{process_name}-{thread_name} 處理: {data}"

def process_worker(data_chunk):
    """進(jìn)程池工作函數(shù)"""
    print(f"進(jìn)程 {multiprocessing.current_process().name} 開(kāi)始處理 {len(data_chunk)} 個(gè)項(xiàng)目")
    
    # 使用線程池處理數(shù)據(jù)
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(thread_worker, data_chunk))
    
    return results

if __name__ == "__main__":
    # 準(zhǔn)備數(shù)據(jù)
    all_data = [f"data_{i}" for i in range(20)]
    chunk_size = 5
    data_chunks = [all_data[i:i+chunk_size] for i in range(0, len(all_data), chunk_size)]
    
    # 使用進(jìn)程池
    with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
        futures = [executor.submit(process_worker, chunk) for chunk in data_chunks]
        
        # 收集結(jié)果
        all_results = []
        for future in concurrent.futures.as_completed(futures):
            all_results.extend(future.result())
    
    print("處理完成,結(jié)果:")
    for result in all_results:
        print(f"  {result}")

3.3 方法三:自定義進(jìn)程類

import multiprocessing
import threading
import time

class ThreadedProcess(multiprocessing.Process):
    def __init__(self, task_id, data_list):
        super().__init__()
        self.task_id = task_id
        self.data_list = data_list
        self.results = multiprocessing.Manager().list()
    
    def run(self):
        print(f"進(jìn)程 {self.name} 開(kāi)始處理任務(wù) {self.task_id}")
        
        # 創(chuàng)建線程
        threads = []
        for i, data in enumerate(self.data_list):
            thread = threading.Thread(
                target=self.process_item,
                args=(data, i)
            )
            threads.append(thread)
            thread.start()
        
        # 等待所有線程完成
        for thread in threads:
            thread.join()
        
        print(f"進(jìn)程 {self.name} 完成任務(wù) {self.task_id}")
    
    def process_item(self, data, index):
        """處理單個(gè)數(shù)據(jù)項(xiàng)"""
        thread_name = threading.current_thread().name
        print(f"線程 {thread_name} 處理: {data}")
        time.sleep(0.3)  # 模擬處理時(shí)間
        
        # 處理數(shù)據(jù)并保存結(jié)果
        result = {
            'index': index,
            'data': data,
            'processed': data.upper(),  # 示例處理
            'thread': thread_name,
            'process': self.name
        }
        self.results.append(result)

if __name__ == "__main__":
    # 準(zhǔn)備任務(wù)數(shù)據(jù)
    tasks = [
        (1, ['apple', 'banana', 'cherry']),
        (2, ['dog', 'elephant', 'fox']),
        (3, ['green', 'blue', 'red', 'yellow'])
    ]
    
    # 創(chuàng)建并啟動(dòng)進(jìn)程
    processes = []
    for task_id, data_list in tasks:
        process = ThreadedProcess(task_id, data_list)
        processes.append(process)
        process.start()
    
    # 等待所有進(jìn)程完成
    for process in processes:
        process.join()
    
    # 匯總結(jié)果
    all_results = []
    for process in processes:
        all_results.extend(list(process.results))
    
    print("\n所有任務(wù)完成,結(jié)果匯總:")
    for result in all_results:
        print(f"  任務(wù){(diào)result['index']}: {result['data']} -> {result['processed']}")

四、進(jìn)程間通信實(shí)戰(zhàn)

4.1 使用Queue進(jìn)行進(jìn)程間通信

import multiprocessing
import threading
import time
import random

def producer(queue, producer_id):
    """生產(chǎn)者線程函數(shù)"""
    for i in range(5):
        item = f"生產(chǎn)者{producer_id}-項(xiàng)目{i}"
        queue.put(item)
        print(f"生產(chǎn): {item}")
        time.sleep(random.uniform(0.1, 0.5))
    queue.put(f"生產(chǎn)者{producer_id}-完成")

def consumer_process(queue, consumer_id):
    """消費(fèi)者進(jìn)程函數(shù)"""
    print(f"消費(fèi)者進(jìn)程 {consumer_id} 啟動(dòng)")
    
    completed_producers = 0
    total_producers = 2  # 假設(shè)有2個(gè)生產(chǎn)者
    
    while completed_producers < total_producers:
        try:
            item = queue.get(timeout=5)
            if item.endswith("-完成"):
                completed_producers += 1
                print(f"消費(fèi)者{consumer_id} 收到完成信號(hào): {item}")
            else:
                print(f"消費(fèi)者{consumer_id} 處理: {item}")
                time.sleep(random.uniform(0.2, 0.8))  # 模擬處理時(shí)間
        except queue.Empty:
            print(f"消費(fèi)者{consumer_id} 等待超時(shí)")
            break
    
    print(f"消費(fèi)者進(jìn)程 {consumer_id} 結(jié)束")

def producer_process(queue, process_id):
    """生產(chǎn)者進(jìn)程函數(shù)"""
    print(f"生產(chǎn)者進(jìn)程 {process_id} 啟動(dòng)")
    
    # 在生產(chǎn)者進(jìn)程中創(chuàng)建多個(gè)線程
    producer_threads = []
    for i in range(2):  # 每個(gè)進(jìn)程創(chuàng)建2個(gè)生產(chǎn)者線程
        thread = threading.Thread(
            target=producer,
            args=(queue, f"P{process_id}-T{i}")
        )
        producer_threads.append(thread)
        thread.start()
    
    # 等待所有生產(chǎn)者線程完成
    for thread in producer_threads:
        thread.join()
    
    print(f"生產(chǎn)者進(jìn)程 {process_id} 結(jié)束")

if __name__ == "__main__":
    # 創(chuàng)建進(jìn)程間通信隊(duì)列
    queue = multiprocessing.Queue(maxsize=10)
    
    # 創(chuàng)建生產(chǎn)者進(jìn)程
    producer_processes = []
    for i in range(2):
        p = multiprocessing.Process(
            target=producer_process,
            args=(queue, i)
        )
        producer_processes.append(p)
        p.start()
    
    # 創(chuàng)建消費(fèi)者進(jìn)程
    consumer_processes = []
    for i in range(2):
        c = multiprocessing.Process(
            target=consumer_process,
            args=(queue, i)
        )
        consumer_processes.append(c)
        c.start()
    
    # 等待所有進(jìn)程完成
    for p in producer_processes:
        p.join()
    
    for c in consumer_processes:
        c.join()
    
    print("所有生產(chǎn)消費(fèi)任務(wù)完成")

五、性能優(yōu)化與最佳實(shí)踐

5.1 資源管理策略

1.合理設(shè)置進(jìn)程和線程數(shù)量

import os

# 根據(jù)CPU核心數(shù)設(shè)置進(jìn)程數(shù)
cpu_count = os.cpu_count()
process_pool_size = max(1, cpu_count - 1)  # 留一個(gè)核心給系統(tǒng)

# 根據(jù)任務(wù)類型設(shè)置線程數(shù)
if task_type == "io_intensive":
    thread_pool_size = 10  # I/O密集型可以更多線程
else:
    thread_pool_size = cpu_count  # CPU密集型不宜過(guò)多

2.使用連接池管理資源

from multiprocessing import Pool
import threading
import database  # 假設(shè)的數(shù)據(jù)庫(kù)模塊

# 進(jìn)程級(jí)別的連接池
process_conn_pool = None

def init_process():
    global process_conn_pool
    process_conn_pool = database.ConnectionPool(max_connections=5)

def thread_task(query):
    # 從進(jìn)程級(jí)連接池獲取連接
    conn = process_conn_pool.get_connection()
    try:
        result = conn.execute(query)
        return result
    finally:
        process_conn_pool.release_connection(conn)

def process_worker(queries):
    with threading.ThreadPoolExecutor(max_workers=3) as executor:
        results = list(executor.map(thread_task, queries))
    return results

if __name__ == "__main__":
    queries = [f"SELECT * FROM table WHERE id = {i}" for i in range(10)]

    with Pool(processes=2, initializer=init_process) as pool:
        results = pool.map(process_worker, [queries[:5], queries[5:]])

5.2 錯(cuò)誤處理與重試機(jī)制

import multiprocessing
import threading
import time
from functools import wraps

def retry(max_attempts=3, delay=1):
    """重試裝飾器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            attempts = 0
            while attempts < max_attempts:
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    attempts += 1
                    print(f"嘗試 {attempts} 失敗: {e}")
                    if attempts < max_attempts:
                        time.sleep(delay)
                    else:
                        raise
        return wrapper
    return decorator

@retry(max_attempts=3, delay=2)
def reliable_thread_task(data):
    """可靠的線程任務(wù)"""
    # 模擬可能失敗的操作
    if random.random() < 0.3:  # 30%概率失敗
        raise ValueError("隨機(jī)失敗")
    
    time.sleep(0.5)
    return f"成功處理: {data}"

def robust_process():
    """健壯的進(jìn)程函數(shù)"""
    try:
        threads = []
        results = []
        
        for i in range(5):
            t = threading.Thread(
                target=lambda: results.append(reliable_thread_task(f"data-{i}"))
            )
            threads.append(t)
            t.start()
        
        for t in threads:
            t.join()
        
        print(f"處理結(jié)果: {results}")
        
    except Exception as e:
        print(f"進(jìn)程失敗: {e}")
        # 這里可以添加更復(fù)雜的錯(cuò)誤處理邏輯

if __name__ == "__main__":
    processes = []
    for i in range(2):
        p = multiprocessing.Process(target=robust_process)
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()

六、實(shí)戰(zhàn)應(yīng)用:Web服務(wù)請(qǐng)求處理

import multiprocessing
import threading
import time
import random
from http.server import HTTPServer, BaseHTTPRequestHandler
import json

class RequestHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        # 模擬處理時(shí)間
        processing_time = random.uniform(0.1, 1.0)
        time.sleep(processing_time)
        
        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        
        response = {
            'path': self.path,
            'processing_time': processing_time,
            'process': multiprocessing.current_process().name,
            'thread': threading.current_thread().name
        }
        
        self.wfile.write(json.dumps(response).encode())

def run_server(port):
    """運(yùn)行HTTP服務(wù)器"""
    server = HTTPServer(('localhost', port), RequestHandler)
    print(f"服務(wù)器在進(jìn)程 {multiprocessing.current_process().name} 中啟動(dòng),端口: {port}")
    server.serve_forever()

def health_checker(server_ports):
    """健康檢查線程"""
    while True:
        time.sleep(5)
        print(f"健康檢查: 服務(wù)器進(jìn)程正常運(yùn)行,監(jiān)控端口: {server_ports}")

def server_process(port):
    """服務(wù)器進(jìn)程函數(shù)"""
    # 創(chuàng)建服務(wù)器線程
    server_thread = threading.Thread(
        target=run_server,
        args=(port,),
        daemon=True
    )
    
    # 創(chuàng)建健康檢查線程
    health_thread = threading.Thread(
        target=health_checker,
        args=([port],),
        daemon=True
    )
    
    server_thread.start()
    health_thread.start()
    
    # 等待服務(wù)器線程結(jié)束
    server_thread.join()

if __name__ == "__main__":
    # 啟動(dòng)多個(gè)服務(wù)器進(jìn)程,每個(gè)進(jìn)程在不同的端口上運(yùn)行
    ports = [8000, 8001, 8002]
    processes = []
    
    for port in ports:
        p = multiprocessing.Process(
            target=server_process,
            args=(port,),
            name=f"ServerProcess-{port}"
        )
        processes.append(p)
        p.start()
    
    print(f"啟動(dòng)了 {len(processes)} 個(gè)服務(wù)器進(jìn)程")
    
    try:
        # 主進(jìn)程保持運(yùn)行
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print("正在關(guān)閉服務(wù)器...")
        for p in processes:
            p.terminate()
        for p in processes:
            p.join()
        print("所有服務(wù)器已關(guān)閉")

以上就是Python子進(jìn)程中創(chuàng)建多線程的完整指南的詳細(xì)內(nèi)容,更多關(guān)于Python子進(jìn)程創(chuàng)建多線程的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • YOLOv5構(gòu)建安全帽檢測(cè)和識(shí)別系統(tǒng)使用詳解

    YOLOv5構(gòu)建安全帽檢測(cè)和識(shí)別系統(tǒng)使用詳解

    這篇文章主要為大家介紹了YOLOv5構(gòu)建安全帽檢測(cè)和識(shí)別系統(tǒng)使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-04-04
  • python 追蹤except信息方式

    python 追蹤except信息方式

    這篇文章主要介紹了python 追蹤except信息方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2020-04-04
  • Python Django搭建文件下載服務(wù)器的實(shí)現(xiàn)

    Python Django搭建文件下載服務(wù)器的實(shí)現(xiàn)

    這篇文章主要介紹了Python Django搭建文件下載服務(wù)器的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2021-05-05
  • Python實(shí)現(xiàn)一個(gè)優(yōu)先級(jí)隊(duì)列的方法

    Python實(shí)現(xiàn)一個(gè)優(yōu)先級(jí)隊(duì)列的方法

    這篇文章主要介紹了Python實(shí)現(xiàn)一個(gè)優(yōu)先級(jí)隊(duì)列的方法,文中講解非常細(xì)致,代碼幫助大家更好的理解和學(xué)習(xí),感興趣的朋友可以了解下
    2020-07-07
  • python3的print()函數(shù)的用法圖文講解

    python3的print()函數(shù)的用法圖文講解

    在本篇內(nèi)容里小編給各位分享的是關(guān)于python3的print()函數(shù)的用法知識(shí)點(diǎn),對(duì)此有需要的朋友們跟著學(xué)習(xí)下吧。
    2019-07-07
  • Python+Tkinter制作股票數(shù)據(jù)抓取小程序

    Python+Tkinter制作股票數(shù)據(jù)抓取小程序

    這篇文章主要為大家詳細(xì)介紹了如何實(shí)現(xiàn)一個(gè)Tkinter?GUI程序,完成無(wú)代碼股票抓取!文中的示例代碼講解詳細(xì),快跟小編一起動(dòng)手試一試吧
    2022-08-08
  • 一文詳解Python中常用的初等函數(shù)(內(nèi)置函數(shù))

    一文詳解Python中常用的初等函數(shù)(內(nèi)置函數(shù))

    初等函數(shù)是由基本初等函數(shù)經(jīng)過(guò)有限次的四則運(yùn)算和復(fù)合運(yùn)算所得到的函數(shù),這篇文章主要介紹了Python中常用初等函數(shù)(內(nèi)置函數(shù))的相關(guān)資料,文中通過(guò)代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2025-06-06
  • python?GUI多行輸入文本Text的實(shí)現(xiàn)

    python?GUI多行輸入文本Text的實(shí)現(xiàn)

    這篇文章主要介紹了python?GUI多行輸入文本Text的實(shí)現(xiàn)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-06-06
  • 詳解pandas DataFrame的查詢方法(loc,iloc,at,iat,ix的用法和區(qū)別)

    詳解pandas DataFrame的查詢方法(loc,iloc,at,iat,ix的用法和區(qū)別)

    這篇文章主要介紹了詳解pandas DataFrame的查詢方法(loc,iloc,at,iat,ix的用法和區(qū)別),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-08-08
  • springboot整合單機(jī)緩存ehcache的實(shí)現(xiàn)

    springboot整合單機(jī)緩存ehcache的實(shí)現(xiàn)

    本文主要介紹了springboot整合單機(jī)緩存ehcache的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-02-02

最新評(píng)論