python使用threading.Condition交替打印兩個字符
Python中使用threading.Condition交替打印兩個字符的程序。
這個程序涉及到兩個線程的的協(xié)調(diào)問題,兩個線程為了能夠相互協(xié)調(diào)運行,必須持有一個共同的狀態(tài),通過這個狀態(tài)來維護兩個線程的執(zhí)行,通過使用threading.Condition對象就能夠完成兩個線程之間的這種協(xié)調(diào)工作。
threading.Condition默認情況下會通過持有一個ReentrantLock來協(xié)調(diào)線程之間的工作,所謂可重入鎖,是只一個可以由一個線程遞歸獲取的鎖,此鎖對象會維護當前鎖的所有者(線程)和當前所有者遞歸獲取鎖的次數(shù)(本文在邏輯上和可重入鎖沒有任何關系,完全可以用一個普通鎖替代)。
Python文檔中給出的描述是:它是一個與某個鎖相聯(lián)系的變量。同時它實現(xiàn)了上下文管理協(xié)議。其對象中除了acquire和release方法之外,其它方法的調(diào)用的前提是,當前線程必須是這個鎖的所有者。
通過代碼和其中的注釋,能夠非常明白地弄清楚Condition的原理是怎樣的:
import threading import time import functools def worker(cond, name): """worker running in different thread""" with cond: # 通過__enter__方法,獲取cond對象中的鎖,默認是一個ReentrantLock對象 print('...{}-{}-{}'.format(name, threading.current_thread().getName(), cond._is_owned())) cond.wait() # 創(chuàng)建一個新的鎖NEWLOCK,調(diào)用acquire將NEWLOCK獲取,然后將NEWLOCK放入等待列表中,\ # 釋放cond._lock鎖(_release_save),最后再次調(diào)用acquire讓NEWLOCK阻塞 print('wait returned in {}'.format(name)) if __name__ == '__main__': condition = threading.Condition() t1 = threading.Thread(target=functools.partial(worker, condition, 't1')) t2 = threading.Thread(target=functools.partial(worker, condition, 't2')) t2.start() # 啟動線程2 t1.start() # 啟動線程1 time.sleep(2) with condition: condition.notify(1) # 按照FIFO順序(wait調(diào)用順序),釋放一個鎖,并將其從等待列表中刪除 time.sleep(2) with condition: condition.notify(1) # 按照FIFO順序(wait調(diào)用順序),釋放另一個鎖,并將其從等待隊列中刪除 t1.join() # 主線程等待子線程結(jié)束 t2.join() # 主線程等待子線程結(jié)束 print('All done')
其輸出為:
...t2-Thread-2-True ...t1-Thread-1-True wait returned in t2 wait returned in t1 All done
其中wait方法要求獲取到threading.Condition對象中的鎖(如果沒有提供,默認使用一個可重入鎖),然后自己創(chuàng)建一個新的普通鎖(NEWLOCK),并獲取這個NEWLOCK;之后調(diào)用_release_save方法釋放threading.Condition對象中的鎖,讓其它線程能夠獲取到;最后再次調(diào)用NEWLOCK上的acquire方法,由于在創(chuàng)建時已經(jīng)acquire過,所以此線程會阻塞在此。而wait想要繼續(xù)執(zhí)行,必須等待其它線程將產(chǎn)生阻塞的這個NEWLOCK給release掉,當然,這就是notify方法的責任了。
notify方法接收一個數(shù)字n,從等待列表中取出相應數(shù)量的等待對象(讓wait方法阻塞的鎖對象),調(diào)用其release方法,讓對應的wait方法能夠返回。而notify_all方法僅僅就是將n設置為等待列表的總長度而已。
在理解了threading.Condition對象中wait和notify的工作原理之后,我們就可以利用它們來實現(xiàn)兩個線程交替打印字符的功能了:
import threading import functools import time def print_a(state): while True: if state.closed: print('Close a') return print('A') time.sleep(2) state.set_current_is_a(True) state.wait_for_b() def print_b(state): while True: if state.closed: print('Close b') return state.wait_for_a() print('B') time.sleep(2) state.set_current_is_a(False) if __name__ == '__main__': class State(object): """state used to coordinate multiple(two here) threads""" def __init__(self): self.condition = threading.Condition() self.current_is_a = False self.closed = False def wait_for_a(self): with self.condition: while not self.current_is_a: self.condition.wait() def wait_for_b(self): with self.condition: while self.current_is_a: self.condition.wait() def set_current_is_a(self, flag): self.current_is_a = flag with self.condition: self.condition.notify_all() state = State() t1 = threading.Thread(target=functools.partial(print_a, state)) t2 = threading.Thread(target=functools.partial(print_b, state)) try: t1.start() t2.start() t1.join() t2.join() except KeyboardInterrupt: state.closed = True print('Closed')
可以看到有兩種類型的任務,一個用于打印字符A,一個用于打印字符B,我們的實現(xiàn)種讓A先于B打印,所以在print_a中,先打印A,再設置當前字符狀態(tài)并釋放等待列表中的所有鎖(set_current_is_a),如果沒有這一步,current_is_a將一直是False,wait_for_b能夠返回,而wait_for_a卻永遠不會返回,最終效果就是每隔兩秒就打印一個字符A,而B永遠不會打印。另一個副作用是如果wait_for_a永遠不會返回,那print_b所在線程的關閉邏輯也就無法執(zhí)行,最終會成為僵尸線程(這里的關閉邏輯只用作示例,生產(chǎn)環(huán)境需要更加完善的關閉機制)。
考慮另一種情況,print_a種將set_current_is_a和wait_for_b交換一下位置會怎么樣。從觀察到的輸出我們看到,程序首先輸出了一個字符A,以后,每隔2秒鐘,就會同時輸出A和B,而不是交替輸出。原因在于,由于current_is_a還是False,我們先調(diào)用的wait_for_b其會立即返回,之后調(diào)用set_current_is_a,將current_is_a設置為True,并釋放所有的阻塞wait的鎖(notify_all),這個過程中沒有阻塞,print_a緊接著進入了下一個打印循環(huán);與此同時,print_b中的wait_for_a也返回了,進入到B的打印循環(huán),故最終我們看到A和B總是一起打印。
可見對于threading.Condition的使用需要多加小心,要注意邏輯上的嚴謹性。
附一個隊列版本:
import threading import functools import time from queue import Queue def print_a(q_a, q_b): while True: char_a = q_a.get() if char_a == 'closed': return print(char_a) time.sleep(2) q_b.put('B') def print_b(q_a, q_b): while True: char_b = q_b.get() if char_b == 'closed': return print(char_b) time.sleep(2) q_a.put('A') if __name__ == '__main__': q_a = Queue() q_b = Queue() t1 = threading.Thread(target=functools.partial(print_a, q_a, q_b)) t2 = threading.Thread(target=functools.partial(print_b, q_a, q_b)) try: t1.start() t2.start() q_a.put('A') t1.join() t2.join() except KeyboardInterrupt: q_a.put('closed') q_b.put('closed') print('Done')
隊列版本邏輯更清晰,更不容易出錯,實際應用中應該選用隊列。
附一個協(xié)程版本(Python 3.5+):
import time import asyncio async def print_a(): while True: print('a') time.sleep(2) # simulate the CPU block time await asyncio.sleep(0) # release control to event loop async def print_b(): while True: print('b') time.sleep(2) # simulate the CPU block time await asyncio.sleep(0) # release control to event loop async def main(): await asyncio.wait([print_a(), print_b()]) if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(main())
協(xié)程的運行需要依附于一個事件循環(huán)(select/poll/epoll/kqueue),通過async def將一個函數(shù)定義為協(xié)程,通過await主動讓渡控制權(quán),通過相互讓渡控制權(quán)完成交替打印字符。整個程序運行于一個線程中,這樣就沒有線程間協(xié)調(diào)的工作,僅僅是控制權(quán)的讓渡邏輯。對于IO密集型操作,而沒有明顯的CPU阻塞(計算復雜,以致出現(xiàn)明顯的延時,比如復雜加解密算法)的情況下非常合適。
附一個Java版本:
PrintMain類,用于管理和協(xié)調(diào)打印A和打印B的兩個線程:
package com.cuttyfox.tests.self.version1; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class PrintMain { private boolean currentIsA = false; public synchronized void waitingForPrintingA() throws InterruptedException { while (this.currentIsA == false) { wait(); } } public synchronized void waitingForPrintingB() throws InterruptedException { while (this.currentIsA == true) { wait(); } } public synchronized void setCurrentIsA(boolean flag) { this.currentIsA = flag; notifyAll(); } public static void main(String[] args) throws Exception { PrintMain state = new PrintMain(); ExecutorService executorService = Executors.newCachedThreadPool(); executorService.execute(new PrintB(state)); executorService.execute(new PrintA(state)); executorService.shutdown(); executorService.awaitTermination(10, TimeUnit.SECONDS); System.out.println("Done"); System.exit(0); } }
打印A的線程(首先打印A):
package com.cuttyfox.tests.self.version1; import java.util.concurrent.TimeUnit; public class PrintA implements Runnable{ private PrintMain state; public PrintA(PrintMain state) { this.state = state; } public void run() { try { while (!Thread.interrupted()){ System.out.println("Print A"); TimeUnit.SECONDS.sleep(1); this.state.setCurrentIsA(true); this.state.waitingForPrintingB(); } } catch (InterruptedException e) { System.out.println("Exit through Interrupting."); } } }
打印B的線程:
package com.cuttyfox.tests.self.version1; import java.util.concurrent.TimeUnit; public class PrintB implements Runnable{ private PrintMain state; public PrintB(PrintMain state) { this.state = state; } public void run() { try{ while (!Thread.interrupted()) { this.state.waitingForPrintingA(); System.out.println("Print B"); TimeUnit.SECONDS.sleep(1); this.state.setCurrentIsA(false); } } catch (InterruptedException e) { System.out.println("Exit through Interrupting."); } } }
Java對象本身有對象鎖,故這里沒有像Python中那樣需要顯式通過創(chuàng)建一個Condition對象來得到一把鎖。
使用Python實現(xiàn)交替打印abcdef的過程:
import threading import time import functools from collections import deque LETTERS = [chr(code) for code in range(97, 97+6)] LENGTH = len(LETTERS) class State(object): def __init__(self): self.condition = threading.Condition() self.index_value = 0 def set_next_index(self, index): with self.condition: self.index_value = index self.condition.notify_all() def wait_for(self, index_value): with self.condition: while not self.index_value == index_value: self.condition.wait() def print_letter(state: State, wait_ident: int): print('Got: {}!'.format(wait_ident)) while True: state.wait_for(wait_ident) time.sleep(2) print(LETTERS[state.index_value]) print('PRINT: {} AND SET NEXT: {}'.format(state.index_value, (state.index_value + 1) % LENGTH )) state.set_next_index((state.index_value + 1) % LENGTH) state = State() d = deque() d.extend(range(LENGTH)) d.rotate(1) print(d) threads = [] for wait_ident in d: t = threading.Thread(target=functools.partial(print_letter, state, wait_ident)) threads.append(t) for thread in threads: thread.start() for thread in threads: thread.join()
以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
相關文章
python pycharm最新版本激活碼(永久有效)附python安裝教程
PyCharm是一個多功能的集成開發(fā)環(huán)境,只需要在pycharm中創(chuàng)建python file就運行python,并且pycharm內(nèi)置完備的功能,這篇文章給大家介紹python pycharm激活碼最新版,需要的朋友跟隨小編一起看看吧2020-01-01Python Django view 兩種return的實現(xiàn)方式
這篇文章主要介紹了Python Django view 兩種return的實現(xiàn)方式,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-03-03Python函數(shù)必須先定義,后調(diào)用說明(函數(shù)調(diào)用函數(shù)例外)
這篇文章主要介紹了Python函數(shù)必須先定義,后調(diào)用說明(函數(shù)調(diào)用函數(shù)例外),具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-06-06如何打包Python Web項目實現(xiàn)免安裝一鍵啟動的方法
這篇文章主要介紹了如何打包Python Web項目,實現(xiàn)免安裝一鍵啟動,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-05-05Python實現(xiàn)爬蟲設置代理IP和偽裝成瀏覽器的方法分享
今天小編就為大家分享一篇Python實現(xiàn)爬蟲設置代理IP和偽裝成瀏覽器的方法分享,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-05-05django restframework使用redis實現(xiàn)token認證
本文主要介紹了django restframework使用redis實現(xiàn)token認證,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-09-09