Python并發(fā)編程線程消息通信機制詳解
前面我已經(jīng)向大家介紹了,如何使用創(chuàng)建線程,啟動線程。相信大家都會有這樣一個想法,線程無非就是創(chuàng)建一下,然后再start()下,實在是太簡單了。
可是要知道,在真實的項目中,實際場景可要我們舉的例子要復雜的多得多,不同線程的執(zhí)行可能是有順序的,或者說他們的執(zhí)行是有條件的,是要受控制的。如果僅僅依靠前面學的那點淺薄的知識,是遠遠不夠的。
那今天,我們就來探討一下如何控制線程的觸發(fā)執(zhí)行。
要實現(xiàn)對多個線程進行控制,其實本質(zhì)上就是消息通信機制在起作用,利用這個機制發(fā)送指令,告訴線程,什么時候可以執(zhí)行,什么時候不可以執(zhí)行,執(zhí)行什么內(nèi)容。
經(jīng)過我的總結,線程中通信方法大致有如下三種:
threading.Event
threading.Condition
queue.Queue
接下來我們來一一探討下。
1 Event事件
Python提供了非常簡單的通信機制 Threading.Event,通用的條件變量。多個線程可以等待某個事件的發(fā)生,在事件發(fā)生后,所有的線程都會被激活。
關于Event的使用也超級簡單,就三個函數(shù)
event = threading.Event() # 重置event,使得所有該event事件都處于待命狀態(tài) event.clear() # 等待接收event的指令,決定是否阻塞程序執(zhí)行 event.wait() # 發(fā)送event指令,使所有設置該event事件的線程執(zhí)行 event.set()
舉個例子來看下。
import time
import threading
class MyThread(threading.Thread):
def __init__(self, name, event):
super().__init__()
self.name = name
self.event = event
def run(self):
print('Thread: {} start at {}'.format(self.name, time.ctime(time.time())))
# 等待event.set()后,才能往下執(zhí)行
self.event.wait()
print('Thread: {} finish at {}'.format(self.name, time.ctime(time.time())))
threads = []
event = threading.Event()
# 定義五個線程
[threads.append(MyThread(str(i), event)) for i in range(1,5)]
# 重置event,使得event.wait()起到阻塞作用
event.clear()
# 啟動所有線程
[t.start() for t in threads]
print('等待5s...')
time.sleep(5)
print('喚醒所有線程...')
event.set()
Thread: 1 start at Sun May 13 20:38:08 2018 Thread: 2 start at Sun May 13 20:38:08 2018 Thread: 3 start at Sun May 13 20:38:08 2018 Thread: 4 start at Sun May 13 20:38:08 2018 等待5s... 喚醒所有線程... Thread: 1 finish at Sun May 13 20:38:13 2018 Thread: 4 finish at Sun May 13 20:38:13 2018 Thread: 2 finish at Sun May 13 20:38:13 2018 Thread: 3 finish at Sun May 13 20:38:13 2018
可見在所有線程都啟動(start())后,并不會執(zhí)行完,而是都在self.event.wait()止住了,需要我們通過event.set()來給所有線程發(fā)送執(zhí)行指令才能往下執(zhí)行。
2 Condition
Condition和Event 是類似的,并沒有多大區(qū)別。
同樣,Condition也只需要掌握幾個函數(shù)即可。
cond = threading.Condition() # 類似lock.acquire() cond.acquire() # 類似lock.release() cond.release() # 等待指定觸發(fā),同時會釋放對鎖的獲取,直到被notify才重新占有瑣。 cond.wait() # 發(fā)送指定,觸發(fā)執(zhí)行 cond.notify()
舉個網(wǎng)上一個比較趣的捉迷藏的例子來看看
import threading, time
class Hider(threading.Thread):
def __init__(self, cond, name):
super(Hider, self).__init__()
self.cond = cond
self.name = name
def run(self):
time.sleep(1) #確保先運行Seeker中的方法
self.cond.acquire()
print(self.name + ': 我已經(jīng)把眼睛蒙上了')
self.cond.notify()
self.cond.wait()
print(self.name + ': 我找到你了哦 ~_~')
self.cond.notify()
self.cond.release()
print(self.name + ': 我贏了')
class Seeker(threading.Thread):
def __init__(self, cond, name):
super(Seeker, self).__init__()
self.cond = cond
self.name = name
def run(self):
self.cond.acquire()
self.cond.wait()
print(self.name + ': 我已經(jīng)藏好了,你快來找我吧')
self.cond.notify()
self.cond.wait()
self.cond.release()
print(self.name + ': 被你找到了,哎~~~')
cond = threading.Condition()
seeker = Seeker(cond, 'seeker')
hider = Hider(cond, 'hider')
seeker.start()
hider.start()
通過cond來通信,阻塞自己,并使對方執(zhí)行。從而,達到有順序的執(zhí)行。
看下結果
hider: 我已經(jīng)把眼睛蒙上了 seeker: 我已經(jīng)藏好了,你快來找我吧 hider: 我找到你了 ~_~ hider: 我贏了 seeker: 被你找到了,哎~~~
3 Queue隊列
最后一個,隊列,它是本節(jié)的重點,因為它是我們?nèi)粘i_發(fā)中最使用頻率最高的。
從一個線程向另一個線程發(fā)送數(shù)據(jù)最安全的方式可能就是使用 queue 庫中的隊列了。創(chuàng)建一個被多個線程共享的 Queue 對象,這些線程通過使用put() 和 get() 操作來向隊列中發(fā)送和獲取元素。
同樣,對于Queue,我們也只需要掌握幾個函數(shù)即可。
from queue import Queue # maxsize默認為0,不受限 # 一旦>0,而消息數(shù)又達到限制,q.put()也將阻塞 q = Queue(maxsize=0) # 默認阻塞程序,等待隊列消息,可設置超時時間 q.get(block=True, timeout=None) # 發(fā)送消息:默認會阻塞程序至隊列中有空閑位置放入數(shù)據(jù) q.put(item, block=True, timeout=None) # 等待所有的消息都被消費完 q.join() # 通知隊列任務處理已經(jīng)完成,當所有任務都處理完成時,join() 阻塞將會解除 q.task_done()
以下三個方法,知道就好,一般不需要使用
# 查詢當前隊列的消息個數(shù) q.qsize() # 隊列消息是否都被消費完,返回 True/False q.empty() # 檢測隊列里消息是否已滿 q.full()
函數(shù)會比之前的多一些,同時也從另一方面說明了其功能更加豐富。
我來舉個老師點名的例子。
# coding=utf-8
# /usr/bin/env python
'''
Author: wangbm
Email: wongbingming@163.com
Wechat: mrbensonwon
Blog: python-online.cn
公眾號:Python編程時光
date: 2020/9/20 下午7:30
desc:
'''
__author__ = 'wangbm'
from queue import Queue
from threading import Thread
import time
class Student:
def __init__(self, name):
self.name = name
def speak(self):
print("{}:到!".format(self.name))
class Teacher:
def __init__(self, queue):
super().__init__()
self.queue=queue
def call(self, student_name):
if student_name == "exit":
print("點名結束,開始上課..")
else:
print("老師:{}來了沒?".format(student_name))
# 發(fā)送消息,要點誰的名
self.queue.put(student_name)
class CallManager(Thread):
def __init__(self, queue):
super().__init__()
self.students = {}
self.queue = queue
def put(self, student):
self.students.setdefault(student.name, student)
def run(self):
while True:
# 阻塞程序,時刻監(jiān)聽老師,接收消息
student_name = queue.get()
if student_name == "exit":
break
elif student_name in self.students:
self.students[student_name].speak()
else:
print("老師,咱班,沒有 {} 這個人".format(student_name))
queue = Queue()
teacher = Teacher(queue=queue)
s1 = Student(name="小明")
s2 = Student(name="小亮")
cm = CallManager(queue)
cm.put(s1)
cm.put(s2)
cm.start()
print('開始點名~')
teacher.call('小明')
time.sleep(1)
teacher.call('小亮')
time.sleep(1)
teacher.call("exit")
運行結果如下
開始點名~
老師:小明來了沒?
小明:到!
老師:小亮來了沒?
小亮:到!
點名結束,開始上課..
其實 queue 還有一個很重要的方法,Queue.task_done()
如果不明白它的原理,我們在寫程序,就很有可能卡死。
當我們使用 Queue.get() 從隊列取出數(shù)據(jù)后,這個數(shù)據(jù)有沒有被正常消費,是很重要的。
如果數(shù)據(jù)沒有被正常消費,那么Queue會認為這個任務還在執(zhí)行中,此時你使用 Queue.join() 會一直阻塞,即使此時你的隊列里已經(jīng)沒有消息了。
那么如何解決這種一直阻塞的問題呢?
就是在我們正常消費完數(shù)據(jù)后,記得調(diào)用一下 Queue.task_done(),說明隊列這個任務已經(jīng)結束了。
當隊列內(nèi)部的任務計數(shù)器歸于零時,調(diào)用 Queue.join() 就不會再阻塞了。
要理解這個過程,請參考 http://python.iswbm.com/en/latest/c02/c02_06.html 里自定義線程池的的例子。
4 總結一下
學習了以上三種通信方法,我們很容易就能發(fā)現(xiàn)Event 和 Condition 是threading模塊原生提供的模塊,原理簡單,功能單一,它能發(fā)送 True 和 False 的指令,所以只能適用于某些簡單的場景中。
而Queue則是比較高級的模塊,它可能發(fā)送任何類型的消息,包括字符串、字典等。其內(nèi)部實現(xiàn)其實也引用了Condition模塊(譬如put和get函數(shù)的阻塞),正是其對Condition進行了功能擴展,所以功能更加豐富,更能滿足實際應用。
以上就是Python并發(fā)編程線程消息通信機制詳解的詳細內(nèi)容,更多關于Python并發(fā)線程消息通信機制的資料請關注腳本之家其它相關文章!
相關文章
python之線程通過信號pyqtSignal刷新ui的方法
今天小編就為大家分享一篇python之線程通過信號pyqtSignal刷新ui的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-01-01

