Python子進(jìn)程中創(chuàng)建多線程的完整指南
一、理解進(jìn)程與線程的關(guān)系
1.1 進(jìn)程與線程的基本概念
在操作系統(tǒng)中,進(jìn)程是資源分配的基本單位,每個(gè)進(jìn)程都有獨(dú)立的內(nèi)存空間、文件描述符等系統(tǒng)資源。而線程是進(jìn)程內(nèi)的執(zhí)行單元,多個(gè)線程共享同一進(jìn)程的資源。
Python中的特殊之處在于**全局解釋器鎖(GIL)**的存在,這使得在單個(gè)進(jìn)程中,多線程無(wú)法真正并行執(zhí)行CPU密集型任務(wù)。但在I/O密集型任務(wù)中,多線程仍然能顯著提升性能。
1.2 子進(jìn)程內(nèi)多線程的架構(gòu)模型

這種架構(gòu)的優(yōu)勢(shì)在于:
- 充分利用多核CPU:每個(gè)子進(jìn)程可以在不同的CPU核心上運(yùn)行
- 資源共享與隔離平衡:線程共享進(jìn)程資源,進(jìn)程間資源隔離
- 靈活的任務(wù)分配:可以根據(jù)任務(wù)特性選擇進(jìn)程級(jí)或線程級(jí)并行
二、實(shí)現(xiàn)原理與技術(shù)細(xì)節(jié)
2.1 Python的多進(jìn)程模塊
Python提供了multiprocessing模塊來(lái)創(chuàng)建和管理進(jìn)程:
import multiprocessing
import os
def worker():
print(f"進(jìn)程ID: {os.getpid()}, 進(jìn)程名稱: {multiprocessing.current_process().name}")
if __name__ == "__main__":
processes = []
for i in range(3):
p = multiprocessing.Process(target=worker, name=f"Process-{i}")
processes.append(p)
p.start()
for p in processes:
p.join()
2.2 進(jìn)程間通信(IPC)機(jī)制
由于進(jìn)程有獨(dú)立的內(nèi)存空間,必須使用特殊的通信機(jī)制:
| 通信方式 | 描述 | 適用場(chǎng)景 |
|---|---|---|
| Queue | 先進(jìn)先出的隊(duì)列 | 生產(chǎn)者-消費(fèi)者模式 |
| Pipe | 雙向通信通道 | 一對(duì)一通信 |
| Shared Memory | 共享內(nèi)存區(qū)域 | 高性能數(shù)據(jù)共享 |
| Manager | 托管共享對(duì)象 | 復(fù)雜數(shù)據(jù)結(jié)構(gòu)共享 |
三、三種實(shí)現(xiàn)方法詳解
3.1 方法一:基礎(chǔ)組合方式
import multiprocessing
import threading
import time
def thread_task(thread_id):
"""線程工作函數(shù)"""
print(f"線程 {thread_id} 在進(jìn)程 {multiprocessing.current_process().name} 中運(yùn)行")
time.sleep(2)
return f"線程 {thread_id} 完成"
def process_task():
"""進(jìn)程工作函數(shù)"""
print(f"進(jìn)程 {multiprocessing.current_process().name} 啟動(dòng)")
# 創(chuàng)建并啟動(dòng)多個(gè)線程
threads = []
results = []
for i in range(4):
t = threading.Thread(target=thread_task, args=(i,))
threads.append(t)
t.start()
# 等待所有線程完成
for t in threads:
t.join()
print(f"進(jìn)程 {multiprocessing.current_process().name} 結(jié)束")
if __name__ == "__main__":
# 創(chuàng)建3個(gè)子進(jìn)程
processes = []
for i in range(3):
p = multiprocessing.Process(target=process_task, name=f"SubProcess-{i}")
processes.append(p)
p.start()
# 等待所有子進(jìn)程完成
for p in processes:
p.join()
print("所有進(jìn)程完成")
3.2 方法二:使用進(jìn)程池和線程池
import concurrent.futures
import multiprocessing
import threading
import time
def thread_worker(data):
"""線程池工作函數(shù)"""
process_name = multiprocessing.current_process().name
thread_name = threading.current_thread().name
time.sleep(0.5) # 模擬工作負(fù)載
return f"{process_name}-{thread_name} 處理: {data}"
def process_worker(data_chunk):
"""進(jìn)程池工作函數(shù)"""
print(f"進(jìn)程 {multiprocessing.current_process().name} 開(kāi)始處理 {len(data_chunk)} 個(gè)項(xiàng)目")
# 使用線程池處理數(shù)據(jù)
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(thread_worker, data_chunk))
return results
if __name__ == "__main__":
# 準(zhǔn)備數(shù)據(jù)
all_data = [f"data_{i}" for i in range(20)]
chunk_size = 5
data_chunks = [all_data[i:i+chunk_size] for i in range(0, len(all_data), chunk_size)]
# 使用進(jìn)程池
with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(process_worker, chunk) for chunk in data_chunks]
# 收集結(jié)果
all_results = []
for future in concurrent.futures.as_completed(futures):
all_results.extend(future.result())
print("處理完成,結(jié)果:")
for result in all_results:
print(f" {result}")
3.3 方法三:自定義進(jìn)程類
import multiprocessing
import threading
import time
class ThreadedProcess(multiprocessing.Process):
def __init__(self, task_id, data_list):
super().__init__()
self.task_id = task_id
self.data_list = data_list
self.results = multiprocessing.Manager().list()
def run(self):
print(f"進(jìn)程 {self.name} 開(kāi)始處理任務(wù) {self.task_id}")
# 創(chuàng)建線程
threads = []
for i, data in enumerate(self.data_list):
thread = threading.Thread(
target=self.process_item,
args=(data, i)
)
threads.append(thread)
thread.start()
# 等待所有線程完成
for thread in threads:
thread.join()
print(f"進(jìn)程 {self.name} 完成任務(wù) {self.task_id}")
def process_item(self, data, index):
"""處理單個(gè)數(shù)據(jù)項(xiàng)"""
thread_name = threading.current_thread().name
print(f"線程 {thread_name} 處理: {data}")
time.sleep(0.3) # 模擬處理時(shí)間
# 處理數(shù)據(jù)并保存結(jié)果
result = {
'index': index,
'data': data,
'processed': data.upper(), # 示例處理
'thread': thread_name,
'process': self.name
}
self.results.append(result)
if __name__ == "__main__":
# 準(zhǔn)備任務(wù)數(shù)據(jù)
tasks = [
(1, ['apple', 'banana', 'cherry']),
(2, ['dog', 'elephant', 'fox']),
(3, ['green', 'blue', 'red', 'yellow'])
]
# 創(chuàng)建并啟動(dòng)進(jìn)程
processes = []
for task_id, data_list in tasks:
process = ThreadedProcess(task_id, data_list)
processes.append(process)
process.start()
# 等待所有進(jìn)程完成
for process in processes:
process.join()
# 匯總結(jié)果
all_results = []
for process in processes:
all_results.extend(list(process.results))
print("\n所有任務(wù)完成,結(jié)果匯總:")
for result in all_results:
print(f" 任務(wù){(diào)result['index']}: {result['data']} -> {result['processed']}")
四、進(jìn)程間通信實(shí)戰(zhàn)
4.1 使用Queue進(jìn)行進(jìn)程間通信
import multiprocessing
import threading
import time
import random
def producer(queue, producer_id):
"""生產(chǎn)者線程函數(shù)"""
for i in range(5):
item = f"生產(chǎn)者{producer_id}-項(xiàng)目{i}"
queue.put(item)
print(f"生產(chǎn): {item}")
time.sleep(random.uniform(0.1, 0.5))
queue.put(f"生產(chǎn)者{producer_id}-完成")
def consumer_process(queue, consumer_id):
"""消費(fèi)者進(jìn)程函數(shù)"""
print(f"消費(fèi)者進(jìn)程 {consumer_id} 啟動(dòng)")
completed_producers = 0
total_producers = 2 # 假設(shè)有2個(gè)生產(chǎn)者
while completed_producers < total_producers:
try:
item = queue.get(timeout=5)
if item.endswith("-完成"):
completed_producers += 1
print(f"消費(fèi)者{consumer_id} 收到完成信號(hào): {item}")
else:
print(f"消費(fèi)者{consumer_id} 處理: {item}")
time.sleep(random.uniform(0.2, 0.8)) # 模擬處理時(shí)間
except queue.Empty:
print(f"消費(fèi)者{consumer_id} 等待超時(shí)")
break
print(f"消費(fèi)者進(jìn)程 {consumer_id} 結(jié)束")
def producer_process(queue, process_id):
"""生產(chǎn)者進(jìn)程函數(shù)"""
print(f"生產(chǎn)者進(jìn)程 {process_id} 啟動(dòng)")
# 在生產(chǎn)者進(jìn)程中創(chuàng)建多個(gè)線程
producer_threads = []
for i in range(2): # 每個(gè)進(jìn)程創(chuàng)建2個(gè)生產(chǎn)者線程
thread = threading.Thread(
target=producer,
args=(queue, f"P{process_id}-T{i}")
)
producer_threads.append(thread)
thread.start()
# 等待所有生產(chǎn)者線程完成
for thread in producer_threads:
thread.join()
print(f"生產(chǎn)者進(jìn)程 {process_id} 結(jié)束")
if __name__ == "__main__":
# 創(chuàng)建進(jìn)程間通信隊(duì)列
queue = multiprocessing.Queue(maxsize=10)
# 創(chuàng)建生產(chǎn)者進(jìn)程
producer_processes = []
for i in range(2):
p = multiprocessing.Process(
target=producer_process,
args=(queue, i)
)
producer_processes.append(p)
p.start()
# 創(chuàng)建消費(fèi)者進(jìn)程
consumer_processes = []
for i in range(2):
c = multiprocessing.Process(
target=consumer_process,
args=(queue, i)
)
consumer_processes.append(c)
c.start()
# 等待所有進(jìn)程完成
for p in producer_processes:
p.join()
for c in consumer_processes:
c.join()
print("所有生產(chǎn)消費(fèi)任務(wù)完成")
五、性能優(yōu)化與最佳實(shí)踐
5.1 資源管理策略
1.合理設(shè)置進(jìn)程和線程數(shù)量:
import os
# 根據(jù)CPU核心數(shù)設(shè)置進(jìn)程數(shù)
cpu_count = os.cpu_count()
process_pool_size = max(1, cpu_count - 1) # 留一個(gè)核心給系統(tǒng)
# 根據(jù)任務(wù)類型設(shè)置線程數(shù)
if task_type == "io_intensive":
thread_pool_size = 10 # I/O密集型可以更多線程
else:
thread_pool_size = cpu_count # CPU密集型不宜過(guò)多
2.使用連接池管理資源:
from multiprocessing import Pool
import threading
import database # 假設(shè)的數(shù)據(jù)庫(kù)模塊
# 進(jìn)程級(jí)別的連接池
process_conn_pool = None
def init_process():
global process_conn_pool
process_conn_pool = database.ConnectionPool(max_connections=5)
def thread_task(query):
# 從進(jìn)程級(jí)連接池獲取連接
conn = process_conn_pool.get_connection()
try:
result = conn.execute(query)
return result
finally:
process_conn_pool.release_connection(conn)
def process_worker(queries):
with threading.ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(thread_task, queries))
return results
if __name__ == "__main__":
queries = [f"SELECT * FROM table WHERE id = {i}" for i in range(10)]
with Pool(processes=2, initializer=init_process) as pool:
results = pool.map(process_worker, [queries[:5], queries[5:]])
5.2 錯(cuò)誤處理與重試機(jī)制
import multiprocessing
import threading
import time
from functools import wraps
def retry(max_attempts=3, delay=1):
"""重試裝飾器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
attempts = 0
while attempts < max_attempts:
try:
return func(*args, **kwargs)
except Exception as e:
attempts += 1
print(f"嘗試 {attempts} 失敗: {e}")
if attempts < max_attempts:
time.sleep(delay)
else:
raise
return wrapper
return decorator
@retry(max_attempts=3, delay=2)
def reliable_thread_task(data):
"""可靠的線程任務(wù)"""
# 模擬可能失敗的操作
if random.random() < 0.3: # 30%概率失敗
raise ValueError("隨機(jī)失敗")
time.sleep(0.5)
return f"成功處理: {data}"
def robust_process():
"""健壯的進(jìn)程函數(shù)"""
try:
threads = []
results = []
for i in range(5):
t = threading.Thread(
target=lambda: results.append(reliable_thread_task(f"data-{i}"))
)
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"處理結(jié)果: {results}")
except Exception as e:
print(f"進(jìn)程失敗: {e}")
# 這里可以添加更復(fù)雜的錯(cuò)誤處理邏輯
if __name__ == "__main__":
processes = []
for i in range(2):
p = multiprocessing.Process(target=robust_process)
processes.append(p)
p.start()
for p in processes:
p.join()
六、實(shí)戰(zhàn)應(yīng)用:Web服務(wù)請(qǐng)求處理
import multiprocessing
import threading
import time
import random
from http.server import HTTPServer, BaseHTTPRequestHandler
import json
class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
# 模擬處理時(shí)間
processing_time = random.uniform(0.1, 1.0)
time.sleep(processing_time)
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
response = {
'path': self.path,
'processing_time': processing_time,
'process': multiprocessing.current_process().name,
'thread': threading.current_thread().name
}
self.wfile.write(json.dumps(response).encode())
def run_server(port):
"""運(yùn)行HTTP服務(wù)器"""
server = HTTPServer(('localhost', port), RequestHandler)
print(f"服務(wù)器在進(jìn)程 {multiprocessing.current_process().name} 中啟動(dòng),端口: {port}")
server.serve_forever()
def health_checker(server_ports):
"""健康檢查線程"""
while True:
time.sleep(5)
print(f"健康檢查: 服務(wù)器進(jìn)程正常運(yùn)行,監(jiān)控端口: {server_ports}")
def server_process(port):
"""服務(wù)器進(jìn)程函數(shù)"""
# 創(chuàng)建服務(wù)器線程
server_thread = threading.Thread(
target=run_server,
args=(port,),
daemon=True
)
# 創(chuàng)建健康檢查線程
health_thread = threading.Thread(
target=health_checker,
args=([port],),
daemon=True
)
server_thread.start()
health_thread.start()
# 等待服務(wù)器線程結(jié)束
server_thread.join()
if __name__ == "__main__":
# 啟動(dòng)多個(gè)服務(wù)器進(jìn)程,每個(gè)進(jìn)程在不同的端口上運(yùn)行
ports = [8000, 8001, 8002]
processes = []
for port in ports:
p = multiprocessing.Process(
target=server_process,
args=(port,),
name=f"ServerProcess-{port}"
)
processes.append(p)
p.start()
print(f"啟動(dòng)了 {len(processes)} 個(gè)服務(wù)器進(jìn)程")
try:
# 主進(jìn)程保持運(yùn)行
while True:
time.sleep(1)
except KeyboardInterrupt:
print("正在關(guān)閉服務(wù)器...")
for p in processes:
p.terminate()
for p in processes:
p.join()
print("所有服務(wù)器已關(guān)閉")
以上就是Python子進(jìn)程中創(chuàng)建多線程的完整指南的詳細(xì)內(nèi)容,更多關(guān)于Python子進(jìn)程創(chuàng)建多線程的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
YOLOv5構(gòu)建安全帽檢測(cè)和識(shí)別系統(tǒng)使用詳解
這篇文章主要為大家介紹了YOLOv5構(gòu)建安全帽檢測(cè)和識(shí)別系統(tǒng)使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-04-04
Python Django搭建文件下載服務(wù)器的實(shí)現(xiàn)
這篇文章主要介紹了Python Django搭建文件下載服務(wù)器的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-05-05
Python實(shí)現(xiàn)一個(gè)優(yōu)先級(jí)隊(duì)列的方法
這篇文章主要介紹了Python實(shí)現(xiàn)一個(gè)優(yōu)先級(jí)隊(duì)列的方法,文中講解非常細(xì)致,代碼幫助大家更好的理解和學(xué)習(xí),感興趣的朋友可以了解下2020-07-07
Python+Tkinter制作股票數(shù)據(jù)抓取小程序
這篇文章主要為大家詳細(xì)介紹了如何實(shí)現(xiàn)一個(gè)Tkinter?GUI程序,完成無(wú)代碼股票抓取!文中的示例代碼講解詳細(xì),快跟小編一起動(dòng)手試一試吧2022-08-08
一文詳解Python中常用的初等函數(shù)(內(nèi)置函數(shù))
初等函數(shù)是由基本初等函數(shù)經(jīng)過(guò)有限次的四則運(yùn)算和復(fù)合運(yùn)算所得到的函數(shù),這篇文章主要介紹了Python中常用初等函數(shù)(內(nèi)置函數(shù))的相關(guān)資料,文中通過(guò)代碼介紹的非常詳細(xì),需要的朋友可以參考下2025-06-06
python?GUI多行輸入文本Text的實(shí)現(xiàn)
這篇文章主要介紹了python?GUI多行輸入文本Text的實(shí)現(xiàn)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-06-06
詳解pandas DataFrame的查詢方法(loc,iloc,at,iat,ix的用法和區(qū)別)
這篇文章主要介紹了詳解pandas DataFrame的查詢方法(loc,iloc,at,iat,ix的用法和區(qū)別),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-08-08
springboot整合單機(jī)緩存ehcache的實(shí)現(xiàn)
本文主要介紹了springboot整合單機(jī)緩存ehcache的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-02-02

