Queue隊(duì)列中join()與task_done()的關(guān)系及說明
join()與task_done()的關(guān)系
在網(wǎng)上大多關(guān)于join()與task_done()的結(jié)束原話是這樣的:
Queue.task_done()
在完成一項(xiàng)工作之后,Queue.task_done()函數(shù)向任務(wù)已經(jīng)完成的隊(duì)列發(fā)送一個(gè)信號(hào)Queue.join()
實(shí)際上意味著等到隊(duì)列為空,再執(zhí)行別的操作
但是可能很多人還是不太理解,這里以我自己的理解來闡述這兩者的關(guān)聯(lián)。
理解
如果線程里每從隊(duì)列里取一次,但沒有執(zhí)行task_done(),則join無法判斷隊(duì)列到底有沒有結(jié)束,在最后執(zhí)行個(gè)join()是等不到結(jié)果的,會(huì)一直掛起。
可以理解為,每task_done一次 就從隊(duì)列里刪掉一個(gè)元素,這樣在最后join的時(shí)候根據(jù)隊(duì)列長度是否為零來判斷隊(duì)列是否結(jié)束,從而執(zhí)行主線程。
下面看個(gè)自己寫的例子:
下面這個(gè)例子,會(huì)在join()的地方無限掛起,因?yàn)閖oin在等隊(duì)列清空,但是由于沒有task_done,它認(rèn)為隊(duì)列還沒有清空,還在一直等。
#!/usr/bin/env python # -*- coding:utf-8 -*- '''threading test''' import threading import queue from time import sleep #之所以為什么要用線程,因?yàn)榫€程可以start后繼續(xù)執(zhí)行后面的主線程,可以put數(shù)據(jù),如果不是線程直接在get阻塞。 class Mythread(threading.Thread): def __init__(self,que): threading.Thread.__init__(self) self.queue = que def run(self): while True: sleep(1) if self.queue.empty(): #判斷放到get前面,這樣可以,否則隊(duì)列最后一個(gè)取完后就空了,直接break,走不到print break item = self.queue.get() print(item,'!') #self.queue.task_done() return que = queue.Queue() tasks = [Mythread(que) for x in range(1)] for x in range(10): que.put(x) #快速生產(chǎn) for x in tasks: t = Mythread(que) #把同一個(gè)隊(duì)列傳入2個(gè)線程 t.start() que.join() print('---success---')
如果把self.queue.task_done() 注釋去掉,就會(huì)順利執(zhí)行完主程序。
這就是“ Queue.task_done()函數(shù)向任務(wù)已經(jīng)完成的隊(duì)列發(fā)送一個(gè)信號(hào)”這句話的意義,能夠讓join()函數(shù)能判斷出隊(duì)列還剩多少,是否清空了。
而事實(shí)上我們看下queue的源碼可以看出確實(shí)是執(zhí)行一次未完成隊(duì)列減一:
def task_done(self): '''Indicate that a formerly enqueued task is complete. Used by Queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete. If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue). Raises a ValueError if called more times than there were items placed in the queue. ''' with self.all_tasks_done: unfinished = self.unfinished_tasks - 1 if unfinished <= 0: if unfinished < 0: raise ValueError('task_done() called too many times') self.all_tasks_done.notify_all() self.unfinished_tasks = unfinished
快速生產(chǎn)-快速消費(fèi)
上面的演示代碼是快速生產(chǎn)-慢速消費(fèi)的場(chǎng)景,我們可以直接用task_done()與join()配合,來讓empty()判斷出隊(duì)列是否已經(jīng)結(jié)束。
當(dāng)然,queue我們可以正確判斷是否已經(jīng)清空,但是線程里的get隊(duì)列是不知道,如果沒有東西告訴它,隊(duì)列空了,因此get還會(huì)繼續(xù)阻塞,那么我們就需要在get程序中加一個(gè)判斷,如果empty()成立,break退出循環(huán),否則get()還是會(huì)一直阻塞。
慢速生產(chǎn)-快速消費(fèi)
但是如果生產(chǎn)者速度與消費(fèi)者速度相當(dāng),或者生產(chǎn)速度小于消費(fèi)速度,則靠task_done()來實(shí)現(xiàn)隊(duì)列減一則不靠譜,隊(duì)列會(huì)時(shí)常處于供不應(yīng)求的狀態(tài),常為empty,所以用empty來判斷則不靠譜。
那么這種情況會(huì)導(dǎo)致 join可以判斷出隊(duì)列結(jié)束了,但是線程里不能依靠empty()來判斷線程是否可以結(jié)束。
我們可以在消費(fèi)隊(duì)列的每個(gè)線程最后塞入一個(gè)特定的“標(biāo)記”,在消費(fèi)的時(shí)候判斷,如果get到了這么一個(gè)“標(biāo)記”,則可以判定隊(duì)列結(jié)束了,因?yàn)樯a(chǎn)隊(duì)列都結(jié)束了,也不會(huì)再新增了。
代碼如下:
#!/usr/bin/env python # -*- coding:utf-8 -*- '''threading test''' import threading import queue from time import sleep #之所以為什么要用線程,因?yàn)榫€程可以start后繼續(xù)執(zhí)行后面的主線程,可以put數(shù)據(jù),如果不是線程直接在get阻塞。 class Mythread(threading.Thread): def __init__(self,que): threading.Thread.__init__(self) self.queue = que def run(self): while True: item = self.queue.get() self.queue.task_done() #這里要放到判斷前,否則取最后最后一個(gè)的時(shí)候已經(jīng)為空,直接break,task_done執(zhí)行不了,join()判斷隊(duì)列一直沒結(jié)束 if item == None: break print(item,'!') return que = queue.Queue() tasks = [Mythread(que) for x in range(1)] #快速生產(chǎn) for x in tasks: t = Mythread(que) #把同一個(gè)隊(duì)列傳入2個(gè)線程 t.start() for x in range(10): sleep(1) que.put(x) for x in tasks: que.put(None) que.join() print('---success---')
注意點(diǎn)
put隊(duì)列完成的時(shí)候千萬不能用task_done(),否則會(huì)報(bào)錯(cuò):
task_done() called too many times
因?yàn)樵摲椒▋H僅表示get成功后,執(zhí)行的一個(gè)標(biāo)記。
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Python實(shí)現(xiàn)構(gòu)建一個(gè)儀表板的示例代碼
這篇文章主要為大家詳細(xì)介紹了Python如何實(shí)現(xiàn)構(gòu)建一個(gè)儀表板,文中的示例代碼講解詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴可以了解一下2023-03-03基于深度學(xué)習(xí)和OpenCV實(shí)現(xiàn)目標(biāo)檢測(cè)
這篇文章主要介紹了通過使用OpenCV進(jìn)行基于深度學(xué)習(xí)的對(duì)象檢測(cè)以及使用OpenCV檢測(cè)視頻,文中的示例代碼講解詳細(xì),需要的可以參考一下2021-12-12CPython 垃圾收集器檢測(cè)循環(huán)引用詳解
這篇文章主要為大家介紹了CPython 垃圾收集器檢測(cè)循環(huán)引用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-10-10python項(xiàng)目對(duì)接釘釘SDK的實(shí)現(xiàn)
這篇文章主要介紹了python項(xiàng)目對(duì)接釘釘SDK的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-07-07python學(xué)習(xí)pymongo模塊的使用方法
這篇文章主要介紹了python學(xué)習(xí)pymongo模塊的使用方法,pymongo模塊是python操作mongo數(shù)據(jù)的第三方模塊,總結(jié)一下常用到的簡(jiǎn)單用,需要的小伙伴可以參考一下2022-09-09Python Numpy中數(shù)據(jù)的常用保存與讀取方法
這篇文章主要介紹了Python Numpy中數(shù)據(jù)的常用保存與讀取方法,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-04-04Python制作動(dòng)態(tài)詞頻條形圖的全過程
說起動(dòng)態(tài)圖表,最火的莫過于動(dòng)態(tài)條形圖了,下面這篇文章主要給大家介紹了關(guān)于Python制作動(dòng)態(tài)詞頻條形圖的全過程,文中通過示例代碼介紹的非常詳細(xì),需要的朋友可以參考下2021-11-11