Python實現(xiàn)線程池之線程安全隊列
本文實例為大家分享了Python實現(xiàn)線程池之線程安全隊列的具體代碼,供大家參考,具體內(nèi)容如下
一、線程池組成
一個完整的線程池由下面幾部分組成,線程安全隊列、任務(wù)對象、線程處理對象、線程池對象。其中一個線程安全的隊列是實現(xiàn)線程池和任務(wù)隊列的基礎(chǔ),本節(jié)我們通過threading包中的互斥量threading.Lock()和條件變量threading.Condition()來實現(xiàn)一個簡單的、讀取安全的線程隊列。
二、線程安全隊列的實現(xiàn)
包括put、pop、get等方法,為保證線程安全,讀寫操作時要添加互斥鎖;并且pop操作可以設(shè)置等待時間以阻塞當(dāng)前獲取元素的線程,當(dāng)新元素寫入隊列時通過條件變量通知解除等待操作。
class ThreadSafeQueue(object): ? ? def __init__(self, max_size=0): ? ? ? ? self.queue = [] ? ? ? ? self.max_size = max_size ?# max_size為0表示無限大 ? ? ? ? self.lock = threading.Lock() ?# 互斥量 ? ? ? ? self.condition = threading.Condition() ?# 條件變量 ? ? def size(self): ? ? ? ? """ ? ? ? ? 獲取當(dāng)前隊列的大小 ? ? ? ? :return: 隊列長度 ? ? ? ? """ ? ? ? ? # 加鎖 ? ? ? ? self.lock.acquire() ? ? ? ? size = len(self.queue) ? ? ? ? self.lock.release() ? ? ? ? return size ? ? def put(self, item): ? ? ? ? """ ? ? ? ? 將單個元素放入隊列 ? ? ? ? :param item: ? ? ? ? :return: ? ? ? ? """ ? ? ? ? # 隊列已滿 max_size為0表示無限大 ? ? ? ? if self.max_size != 0 and self.size() >= self.max_size: ? ? ? ? ? ? return ThreadSafeException() ? ? ? ? # 加鎖 ? ? ? ? self.lock.acquire() ? ? ? ? self.queue.append(item) ? ? ? ? self.lock.release() ? ? ? ? self.condition.acquire() ? ? ? ? # 通知等待讀取的線程 ? ? ? ? self.condition.notify() ? ? ? ? self.condition.release() ? ? ? ? return item ? ? def batch_put(self, item_list): ? ? ? ? """ ? ? ? ? 批量添加元素 ? ? ? ? :param item_list: ? ? ? ? :return: ? ? ? ? """ ? ? ? ? if not isinstance(item_list, list): ? ? ? ? ? ? item_list = list(item_list) ? ? ? ? res = [self.put(item) for item in item_list] ? ? ? ? return res ? ? def pop(self, block=False, timeout=0): ? ? ? ? """ ? ? ? ? 從隊列頭部取出元素 ? ? ? ? :param block: 是否阻塞線程 ? ? ? ? :param timeout: 等待時間 ? ? ? ? :return: ? ? ? ? """ ? ? ? ? if self.size() == 0: ? ? ? ? ? ? if block: ? ? ? ? ? ? ? ? self.condition.acquire() ? ? ? ? ? ? ? ? self.condition.wait(timeout) ? ? ? ? ? ? ? ? self.condition.release() ? ? ? ? ? ? else: ? ? ? ? ? ? ? ? return None ? ? ? ? # 加鎖 ? ? ? ? self.lock.acquire() ? ? ? ? item = None ? ? ? ? if len(self.queue): ? ? ? ? ? ? item = self.queue.pop() ? ? ? ? self.lock.release() ? ? ? ? return item ? ? def get(self, index): ? ? ? ? """ ? ? ? ? 獲取指定位置的元素 ? ? ? ? :param index: ? ? ? ? :return: ? ? ? ? """ ? ? ? ? if self.size() == 0 or index >= self.size(): ? ? ? ? ? ? return None ? ? ? ? # 加鎖 ? ? ? ? self.lock.acquire() ? ? ? ? item = self.queue[index] ? ? ? ? self.lock.release() ? ? ? ? return item class ThreadSafeException(Exception): ? ? pass
三、測試邏輯
3.1、測試阻塞邏輯
def thread_queue_test_1(): ? ? thread_queue = ThreadSafeQueue(10) ? ? def producer(): ? ? ? ? while True: ? ? ? ? ? ? thread_queue.put(random.randint(0, 10)) ? ? ? ? ? ? time.sleep(2) ? ? def consumer(): ? ? ? ? while True: ? ? ? ? ? ? print('current time before pop is %d' % time.time()) ? ? ? ? ? ? item = thread_queue.pop(block=True, timeout=3) ? ? ? ? ? ? # item = thread_queue.get(2) ? ? ? ? ? ? if item is not None: ? ? ? ? ? ? ? ? print('get value from queue is %s' % item) ? ? ? ? ? ? else: ? ? ? ? ? ? ? ? print(item) ? ? ? ? ? ? print('current time after pop is %d' % time.time()) ? ? t1 = threading.Thread(target=producer) ? ? t2 = threading.Thread(target=consumer) ? ? t1.start() ? ? t2.start() ? ? t1.join() ? ? t2.join()
測試結(jié)果:
我們可以看到生產(chǎn)者線程每隔2s向隊列寫入一個元素,消費(fèi)者線程當(dāng)無數(shù)據(jù)時默認(rèn)阻塞3s。通過執(zhí)行時間發(fā)現(xiàn)消費(fèi)者線程確實發(fā)生了阻塞,當(dāng)生產(chǎn)者寫入數(shù)據(jù)時結(jié)束當(dāng)前等待操作。
3.2、測試讀寫加鎖邏輯
def thread_queue_test_2(): ? ? thread_queue = ThreadSafeQueue(10) ? ? def producer(): ? ? ? ? while True: ? ? ? ? ? ? thread_queue.put(random.randint(0, 10)) ? ? ? ? ? ? time.sleep(2) ? ? def consumer(name): ? ? ? ? while True: ? ? ? ? ? ? item = thread_queue.pop(block=True, timeout=1) ? ? ? ? ? ? # item = thread_queue.get(2) ? ? ? ? ? ? if item is not None: ? ? ? ? ? ? ? ? print('%s get value from queue is %s' % (name, item)) ? ? ? ? ? ? else: ? ? ? ? ? ? ? ? print('%s get value from queue is None' % name) ? ? t1 = threading.Thread(target=producer) ? ? t2 = threading.Thread(target=consumer, args=('thread1',)) ? ? t3 = threading.Thread(target=consumer, args=('thread2',)) ? ? t1.start() ? ? t2.start() ? ? t3.start() ? ? t1.join() ? ? t2.join() ? ? t3.join()
測試結(jié)果:
生產(chǎn)者還是每2s生成一個元素寫入隊列,消費(fèi)者開啟兩個線程進(jìn)行消費(fèi),默認(rèn)阻塞時間為1s,打印結(jié)果顯示通過加鎖確保每次只有一個線程能獲取數(shù)據(jù),保證了線程讀寫的安全。
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
對python中raw_input()和input()的用法詳解
下面小編就為大家分享一篇對python中raw_input()和input()的用法詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-04-04python初學(xué)者,用python實現(xiàn)基本的學(xué)生管理系統(tǒng)(python3)代碼實例
這篇文章主要介紹了用python實現(xiàn)學(xué)生管理系統(tǒng),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-04-04python [::-1] [::-1,::-1]的具體使用
本文主要介紹了python [::-1] [::-1,::-1]的具體使用,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-05-05Pytorch pth 格式轉(zhuǎn)ONNX 格式的詳細(xì)過程
PyTorch 訓(xùn)練的模型,需要在Jetson nano 上部署,jetson 原生提供了TensorRT 的支持,所以一個比較好的方式是把它轉(zhuǎn)換成ONNX 格式,然后在通過ONNX 轉(zhuǎn)換成TensorRT 格式,這篇文章主要介紹了Pytorch pth 格式轉(zhuǎn)ONNX 格式,需要的朋友可以參考下2023-05-05