Python多進(jìn)程通信Queue、Pipe、Value、Array實例
queue和pipe的區(qū)別: pipe用來在兩個進(jìn)程間通信。queue用來在多個進(jìn)程間實現(xiàn)通信。 此兩種方法為所有系統(tǒng)多進(jìn)程通信的基本方法,幾乎所有的語言都支持此兩種方法。
1)Queue & JoinableQueue
queue用來在進(jìn)程間傳遞消息,任何可以pickle-able的對象都可以在加入到queue。
multiprocessing.JoinableQueue 是 Queue的子類,增加了task_done()和join()方法。
task_done()用來告訴queue一個task完成。一般地在調(diào)用get()獲得一個task,在task結(jié)束后調(diào)用task_done()來通知Queue當(dāng)前task完成。
join() 阻塞直到queue中的所有的task都被處理(即task_done方法被調(diào)用)。
代碼:
import multiprocessing
import time
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
# Poison pill means shutdown
print ('%s: Exiting' % proc_name)
self.task_queue.task_done()
break
print ('%s: %s' % (proc_name, next_task))
answer = next_task() # __call__()
self.task_queue.task_done()
self.result_queue.put(answer)
return
class Task(object):
def __init__(self, a, b):
self.a = a
self.b = b
def __call__(self):
time.sleep(0.1) # pretend to take some time to do the work
return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
def __str__(self):
return '%s * %s' % (self.a, self.b)
if __name__ == '__main__':
# Establish communication queues
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
# Start consumers
num_consumers = multiprocessing.cpu_count()
print ('Creating %d consumers' % num_consumers)
consumers = [ Consumer(tasks, results)
for i in range(num_consumers) ]
for w in consumers:
w.start()
# Enqueue jobs
num_jobs = 10
for i in range(num_jobs):
tasks.put(Task(i, i))
# Add a poison pill for each consumer
for i in range(num_consumers):
tasks.put(None)
# Wait for all of the tasks to finish
tasks.join()
# Start printing results
while num_jobs:
result = results.get()
print ('Result:', result)
num_jobs -= 1
注意小技巧: 使用None來表示task處理完畢。
運(yùn)行結(jié)果:
2)pipe
pipe()返回一對連接對象,代表了pipe的兩端。每個對象都有send()和recv()方法。
代碼:
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
p.join()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
3)Value + Array
Value + Array 是python中共享內(nèi)存 映射文件的方法,速度比較快。
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = n.value + 1
for i in range(len(a)):
a[i] = a[i] * 10
if __name__ == '__main__':
num = Value('i', 1)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
p2 = Process(target=f, args=(num, arr))
p2.start()
p2.join()
print(num.value)
print(arr[:])
# the output is :
# 2
# [0, 10, 20, 30, 40, 50, 60, 70, 80, 90]
# 3
# [0, 100, 200, 300, 400, 500, 600, 700, 800, 900]
相關(guān)文章
Python cookbook(數(shù)據(jù)結(jié)構(gòu)與算法)篩選及提取序列中元素的方法
這篇文章主要介紹了Python cookbook(數(shù)據(jù)結(jié)構(gòu)與算法)篩選及提取序列中元素的方法,涉及Python列表推導(dǎo)式、生成器表達(dá)式及filter()函數(shù)相關(guān)使用技巧,需要的朋友可以參考下2018-03-03python中24小時制轉(zhuǎn)換為12小時制的方法
最近需要實現(xiàn)一個需求,求用戶輸入24小時制的時間,然后顯示12小時制的時間。具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-06-06詳解python中的三種命令行模塊(sys.argv,argparse,click)
這篇文章主要介紹了python中的三種命令行模塊(sys.argv,argparse,click)的相關(guān)資料,幫助大家更好的理解和使用python,感興趣的朋友可以了解下2020-12-12python OpenCV的imread不能讀取中文路徑問題及解決
這篇文章主要介紹了python OpenCV的imread不能讀取中文路徑問題及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-07-07基于 Django 的手機(jī)管理系統(tǒng)實現(xiàn)過程詳解
這篇文章主要介紹了基于 Django 的手機(jī)管理系統(tǒng)過程詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-08-08Pandas?Groupby之在Python中匯總、聚合和分組數(shù)據(jù)的示例詳解
GroupBy是一個非常簡單的概念,我們可以創(chuàng)建一個類別分組,并對這些類別應(yīng)用一個函數(shù),本文給大家介紹Pandas?Groupby之如何在Python中匯總、聚合和分組數(shù)據(jù),感興趣的朋友跟隨小編一起看看吧2023-07-07Python api構(gòu)建tensorrt加速模型的步驟詳解
小編個人認(rèn)為python比c++更容易讀并且已經(jīng)有很多包裝很好的科學(xué)運(yùn)算庫(numpy,scikit等),今天通過本文給大家分享Python api構(gòu)建tensorrt加速模型的步驟,感興趣的朋友一起看看吧2021-09-09