Python如何實現(xiàn)線程間通信
問題
你的程序中有多個線程,你需要在這些線程之間安全地交換信息或數(shù)據(jù)
解決方案
從一個線程向另一個線程發(fā)送數(shù)據(jù)最安全的方式可能就是使用 queue 庫中的隊列了。創(chuàng)建一個被多個線程共享的 Queue 對象,這些線程通過使用 put() 和 get() 操作來向隊列中添加或者刪除元素。 例如:
from queue import Queue
from threading import Thread
# A thread that produces data
def producer(out_q):
while True:
# Produce some data
...
out_q.put(data)
# A thread that consumes data
def consumer(in_q):
while True:
# Get some data
data = in_q.get()
# Process the data
...
# Create the shared queue and launch both threads
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()
Queue 對象已經包含了必要的鎖,所以你可以通過它在多個線程間多安全地共享數(shù)據(jù)。 當使用隊列時,協(xié)調生產者和消費者的關閉問題可能會有一些麻煩。一個通用的解決方法是在隊列中放置一個特殊的值,當消費者讀到這個值的時候,終止執(zhí)行。例如:
from queue import Queue
from threading import Thread
# Object that signals shutdown
_sentinel = object()
# A thread that produces data
def producer(out_q):
while running:
# Produce some data
...
out_q.put(data)
# Put the sentinel on the queue to indicate completion
out_q.put(_sentinel)
# A thread that consumes data
def consumer(in_q):
while True:
# Get some data
data = in_q.get()
# Check for termination
if data is _sentinel:
in_q.put(_sentinel)
break
# Process the data
...
本例中有一個特殊的地方:消費者在讀到這個特殊值之后立即又把它放回到隊列中,將之傳遞下去。這樣,所有監(jiān)聽這個隊列的消費者線程就可以全部關閉了。 盡管隊列是最常見的線程間通信機制,但是仍然可以自己通過創(chuàng)建自己的數(shù)據(jù)結構并添加所需的鎖和同步機制來實現(xiàn)線程間通信。最常見的方法是使用 Condition 變量來包裝你的數(shù)據(jù)結構。下邊這個例子演示了如何創(chuàng)建一個線程安全的優(yōu)先級隊列
import heapq
import threading
class PriorityQueue:
def __init__(self):
self._queue = []
self._count = 0
self._cv = threading.Condition()
def put(self, item, priority):
with self._cv:
heapq.heappush(self._queue, (-priority, self._count, item))
self._count += 1
self._cv.notify()
def get(self):
with self._cv:
while len(self._queue) == 0:
self._cv.wait()
return heapq.heappop(self._queue)[-1]
使用隊列來進行線程間通信是一個單向、不確定的過程。通常情況下,你沒有辦法知道接收數(shù)據(jù)的線程是什么時候接收到的數(shù)據(jù)并開始工作的。不過隊列對象提供一些基本完成的特性,比如下邊這個例子中的 task_done() 和 join() :
from queue import Queue
from threading import Thread
# A thread that produces data
def producer(out_q):
while running:
# Produce some data
...
out_q.put(data)
# A thread that consumes data
def consumer(in_q):
while True:
# Get some data
data = in_q.get()
# Process the data
...
# Indicate completion
in_q.task_done()
# Create the shared queue and launch both threads
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()
# Wait for all produced items to be consumed
q.join()
如果一個線程需要在一個“消費者”線程處理完特定的數(shù)據(jù)項時立即得到通知,你可以把要發(fā)送的數(shù)據(jù)和一個 Event 放到一起使用,這樣“生產者”就可以通過這個Event對象來監(jiān)測處理的過程了。示例如下:
from queue import Queue
from threading import Thread, Event
# A thread that produces data
def producer(out_q):
while running:
# Produce some data
...
# Make an (data, event) pair and hand it to the consumer
evt = Event()
out_q.put((data, evt))
...
# Wait for the consumer to process the item
evt.wait()
# A thread that consumes data
def consumer(in_q):
while True:
# Get some data
data, evt = in_q.get()
# Process the data
...
# Indicate completion
evt.set()
討論
基于簡單隊列編寫多線程程序在多數(shù)情況下是一個比較明智的選擇。從線程安全隊列的底層實現(xiàn)來看,你無需在你的代碼中使用鎖和其他底層的同步機制,這些只會把你的程序弄得亂七八糟。此外,使用隊列這種基于消息的通信機制可以被擴展到更大的應用范疇,比如,你可以把你的程序放入多個進程甚至是分布式系統(tǒng)而無需改變底層的隊列結構。 使用線程隊列有一個要注意的問題是,向隊列中添加數(shù)據(jù)項時并不會復制此數(shù)據(jù)項,線程間通信實際上是在線程間傳遞對象引用。如果你擔心對象的共享狀態(tài),那你最好只傳遞不可修改的數(shù)據(jù)結構(如:整型、字符串或者元組)或者一個對象的深拷貝。例如:
from queue import Queue
from threading import Thread
import copy
# A thread that produces data
def producer(out_q):
while True:
# Produce some data
...
out_q.put(copy.deepcopy(data))
# A thread that consumes data
def consumer(in_q):
while True:
# Get some data
data = in_q.get()
# Process the data
...
Queue 對象提供一些在當前上下文很有用的附加特性。比如在創(chuàng)建 Queue 對象時提供可選的 size 參數(shù)來限制可以添加到隊列中的元素數(shù)量。對于“生產者”與“消費者”速度有差異的情況,為隊列中的元素數(shù)量添加上限是有意義的。比如,一個“生產者”產生項目的速度比“消費者” “消費”的速度快,那么使用固定大小的隊列就可以在隊列已滿的時候阻塞隊列,以免未預期的連鎖效應擴散整個程序造成死鎖或者程序運行失常。在通信的線程之間進行“流量控制”是一個看起來容易實現(xiàn)起來困難的問題。如果你發(fā)現(xiàn)自己曾經試圖通過擺弄隊列大小來解決一個問題,這也許就標志著你的程序可能存在脆弱設計或者固有的可伸縮問題。 get() 和 put() 方法都支持非阻塞方式和設定超時,例如:
import queue q = queue.Queue() try: data = q.get(block=False) except queue.Empty: ... try: q.put(item, block=False) except queue.Full: ... try: data = q.get(timeout=5.0) except queue.Empty: ...
這些操作都可以用來避免當執(zhí)行某些特定隊列操作時發(fā)生無限阻塞的情況,比如,一個非阻塞的 put() 方法和一個固定大小的隊列一起使用,這樣當隊列已滿時就可以執(zhí)行不同的代碼。比如輸出一條日志信息并丟棄。
def producer(q):
...
try:
q.put(item, block=False)
except queue.Full:
log.warning('queued item %r discarded!', item)
如果你試圖讓消費者線程在執(zhí)行像 q.get() 這樣的操作時,超時自動終止以便檢查終止標志,你應該使用 q.get() 的可選參數(shù) timeout ,如下:
_running = True
def consumer(q):
while _running:
try:
item = q.get(timeout=5.0)
# Process item
...
except queue.Empty:
pass
最后,有 q.qsize() , q.full() , q.empty() 等實用方法可以獲取一個隊列的當前大小和狀態(tài)。但要注意,這些方法都不是線程安全的??赡苣銓σ粋€隊列使用 empty() 判斷出這個隊列為空,但同時另外一個線程可能已經向這個隊列中插入一個數(shù)據(jù)項。所以,你最好不要在你的代碼中使用這些方法。
以上就是Python如何實現(xiàn)線程間通信的詳細內容,更多關于Python 線程間通信的資料請關注腳本之家其它相關文章!
- Python TCP通信客戶端服務端代碼實例
- Python警察與小偷的實現(xiàn)之一客戶端與服務端通信實例
- Python通過隊列來實現(xiàn)進程間通信的示例
- Python 串口通信的實現(xiàn)
- Python使用socket模塊實現(xiàn)簡單tcp通信
- Python實現(xiàn)UDP程序通信過程圖解
- 如何通過Python3和ssl實現(xiàn)加密通信功能
- Python通過4種方式實現(xiàn)進程數(shù)據(jù)通信
- python實現(xiàn)串口通信的示例代碼
- python實現(xiàn)局域網(wǎng)內實時通信代碼
- python 實現(xiàn)客戶端與服務端的通信
相關文章
解決Pytorch 訓練與測試時爆顯存(out of memory)的問題
今天小編就為大家分享一篇解決Pytorch 訓練與測試時爆顯存(out of memory)的問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-08-08
matplotlib交互式數(shù)據(jù)光標mpldatacursor的實現(xiàn)
這篇文章主要介紹了matplotlib交互式數(shù)據(jù)光標mpldatacursor的實現(xiàn) ,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2021-02-02
Python自動化處理Excel數(shù)據(jù)的操作過程
在實際數(shù)據(jù)處理和分析過程中,經常會遇到需要從大量數(shù)據(jù)中提取出特定日期范圍內的信息的需求,本文將介紹如何使用Python的pandas庫來處理Excel文件,感興趣的朋友跟隨小編一起看看吧2023-11-11
Python下使用Scrapy爬取網(wǎng)頁內容的實例
今天小編就為大家分享一篇Python下使用Scrapy爬取網(wǎng)頁內容的實例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-05-05
python計算書頁碼的統(tǒng)計數(shù)字問題實例
這篇文章主要介紹了python計算書頁碼的統(tǒng)計數(shù)字問題實例,對比2個實例講述了數(shù)字統(tǒng)計的技巧,非常實用,需要的朋友可以參考下2014-09-09

