python使用threading.Condition交替打印兩個(gè)字符
Python中使用threading.Condition交替打印兩個(gè)字符的程序。
這個(gè)程序涉及到兩個(gè)線程的的協(xié)調(diào)問(wèn)題,兩個(gè)線程為了能夠相互協(xié)調(diào)運(yùn)行,必須持有一個(gè)共同的狀態(tài),通過(guò)這個(gè)狀態(tài)來(lái)維護(hù)兩個(gè)線程的執(zhí)行,通過(guò)使用threading.Condition對(duì)象就能夠完成兩個(gè)線程之間的這種協(xié)調(diào)工作。
threading.Condition默認(rèn)情況下會(huì)通過(guò)持有一個(gè)ReentrantLock來(lái)協(xié)調(diào)線程之間的工作,所謂可重入鎖,是只一個(gè)可以由一個(gè)線程遞歸獲取的鎖,此鎖對(duì)象會(huì)維護(hù)當(dāng)前鎖的所有者(線程)和當(dāng)前所有者遞歸獲取鎖的次數(shù)(本文在邏輯上和可重入鎖沒(méi)有任何關(guān)系,完全可以用一個(gè)普通鎖替代)。
Python文檔中給出的描述是:它是一個(gè)與某個(gè)鎖相聯(lián)系的變量。同時(shí)它實(shí)現(xiàn)了上下文管理協(xié)議。其對(duì)象中除了acquire和release方法之外,其它方法的調(diào)用的前提是,當(dāng)前線程必須是這個(gè)鎖的所有者。
通過(guò)代碼和其中的注釋,能夠非常明白地弄清楚Condition的原理是怎樣的:
import threading
import time
import functools
def worker(cond, name):
"""worker running in different thread"""
with cond: # 通過(guò)__enter__方法,獲取cond對(duì)象中的鎖,默認(rèn)是一個(gè)ReentrantLock對(duì)象
print('...{}-{}-{}'.format(name, threading.current_thread().getName(), cond._is_owned()))
cond.wait() # 創(chuàng)建一個(gè)新的鎖NEWLOCK,調(diào)用acquire將NEWLOCK獲取,然后將NEWLOCK放入等待列表中,\
# 釋放cond._lock鎖(_release_save),最后再次調(diào)用acquire讓NEWLOCK阻塞
print('wait returned in {}'.format(name))
if __name__ == '__main__':
condition = threading.Condition()
t1 = threading.Thread(target=functools.partial(worker, condition, 't1'))
t2 = threading.Thread(target=functools.partial(worker, condition, 't2'))
t2.start() # 啟動(dòng)線程2
t1.start() # 啟動(dòng)線程1
time.sleep(2)
with condition:
condition.notify(1) # 按照FIFO順序(wait調(diào)用順序),釋放一個(gè)鎖,并將其從等待列表中刪除
time.sleep(2)
with condition:
condition.notify(1) # 按照FIFO順序(wait調(diào)用順序),釋放另一個(gè)鎖,并將其從等待隊(duì)列中刪除
t1.join() # 主線程等待子線程結(jié)束
t2.join() # 主線程等待子線程結(jié)束
print('All done')
其輸出為:
...t2-Thread-2-True ...t1-Thread-1-True wait returned in t2 wait returned in t1 All done
其中wait方法要求獲取到threading.Condition對(duì)象中的鎖(如果沒(méi)有提供,默認(rèn)使用一個(gè)可重入鎖),然后自己創(chuàng)建一個(gè)新的普通鎖(NEWLOCK),并獲取這個(gè)NEWLOCK;之后調(diào)用_release_save方法釋放threading.Condition對(duì)象中的鎖,讓其它線程能夠獲取到;最后再次調(diào)用NEWLOCK上的acquire方法,由于在創(chuàng)建時(shí)已經(jīng)acquire過(guò),所以此線程會(huì)阻塞在此。而wait想要繼續(xù)執(zhí)行,必須等待其它線程將產(chǎn)生阻塞的這個(gè)NEWLOCK給release掉,當(dāng)然,這就是notify方法的責(zé)任了。
notify方法接收一個(gè)數(shù)字n,從等待列表中取出相應(yīng)數(shù)量的等待對(duì)象(讓wait方法阻塞的鎖對(duì)象),調(diào)用其release方法,讓對(duì)應(yīng)的wait方法能夠返回。而notify_all方法僅僅就是將n設(shè)置為等待列表的總長(zhǎng)度而已。
在理解了threading.Condition對(duì)象中wait和notify的工作原理之后,我們就可以利用它們來(lái)實(shí)現(xiàn)兩個(gè)線程交替打印字符的功能了:
import threading
import functools
import time
def print_a(state):
while True:
if state.closed:
print('Close a')
return
print('A')
time.sleep(2)
state.set_current_is_a(True)
state.wait_for_b()
def print_b(state):
while True:
if state.closed:
print('Close b')
return
state.wait_for_a()
print('B')
time.sleep(2)
state.set_current_is_a(False)
if __name__ == '__main__':
class State(object):
"""state used to coordinate multiple(two here) threads"""
def __init__(self):
self.condition = threading.Condition()
self.current_is_a = False
self.closed = False
def wait_for_a(self):
with self.condition:
while not self.current_is_a:
self.condition.wait()
def wait_for_b(self):
with self.condition:
while self.current_is_a:
self.condition.wait()
def set_current_is_a(self, flag):
self.current_is_a = flag
with self.condition:
self.condition.notify_all()
state = State()
t1 = threading.Thread(target=functools.partial(print_a, state))
t2 = threading.Thread(target=functools.partial(print_b, state))
try:
t1.start()
t2.start()
t1.join()
t2.join()
except KeyboardInterrupt:
state.closed = True
print('Closed')
可以看到有兩種類型的任務(wù),一個(gè)用于打印字符A,一個(gè)用于打印字符B,我們的實(shí)現(xiàn)種讓A先于B打印,所以在print_a中,先打印A,再設(shè)置當(dāng)前字符狀態(tài)并釋放等待列表中的所有鎖(set_current_is_a),如果沒(méi)有這一步,current_is_a將一直是False,wait_for_b能夠返回,而wait_for_a卻永遠(yuǎn)不會(huì)返回,最終效果就是每隔兩秒就打印一個(gè)字符A,而B(niǎo)永遠(yuǎn)不會(huì)打印。另一個(gè)副作用是如果wait_for_a永遠(yuǎn)不會(huì)返回,那print_b所在線程的關(guān)閉邏輯也就無(wú)法執(zhí)行,最終會(huì)成為僵尸線程(這里的關(guān)閉邏輯只用作示例,生產(chǎn)環(huán)境需要更加完善的關(guān)閉機(jī)制)。
考慮另一種情況,print_a種將set_current_is_a和wait_for_b交換一下位置會(huì)怎么樣。從觀察到的輸出我們看到,程序首先輸出了一個(gè)字符A,以后,每隔2秒鐘,就會(huì)同時(shí)輸出A和B,而不是交替輸出。原因在于,由于current_is_a還是False,我們先調(diào)用的wait_for_b其會(huì)立即返回,之后調(diào)用set_current_is_a,將current_is_a設(shè)置為True,并釋放所有的阻塞wait的鎖(notify_all),這個(gè)過(guò)程中沒(méi)有阻塞,print_a緊接著進(jìn)入了下一個(gè)打印循環(huán);與此同時(shí),print_b中的wait_for_a也返回了,進(jìn)入到B的打印循環(huán),故最終我們看到A和B總是一起打印。
可見(jiàn)對(duì)于threading.Condition的使用需要多加小心,要注意邏輯上的嚴(yán)謹(jǐn)性。
附一個(gè)隊(duì)列版本:
import threading
import functools
import time
from queue import Queue
def print_a(q_a, q_b):
while True:
char_a = q_a.get()
if char_a == 'closed':
return
print(char_a)
time.sleep(2)
q_b.put('B')
def print_b(q_a, q_b):
while True:
char_b = q_b.get()
if char_b == 'closed':
return
print(char_b)
time.sleep(2)
q_a.put('A')
if __name__ == '__main__':
q_a = Queue()
q_b = Queue()
t1 = threading.Thread(target=functools.partial(print_a, q_a, q_b))
t2 = threading.Thread(target=functools.partial(print_b, q_a, q_b))
try:
t1.start()
t2.start()
q_a.put('A')
t1.join()
t2.join()
except KeyboardInterrupt:
q_a.put('closed')
q_b.put('closed')
print('Done')
隊(duì)列版本邏輯更清晰,更不容易出錯(cuò),實(shí)際應(yīng)用中應(yīng)該選用隊(duì)列。
附一個(gè)協(xié)程版本(Python 3.5+):
import time
import asyncio
async def print_a():
while True:
print('a')
time.sleep(2) # simulate the CPU block time
await asyncio.sleep(0) # release control to event loop
async def print_b():
while True:
print('b')
time.sleep(2) # simulate the CPU block time
await asyncio.sleep(0) # release control to event loop
async def main():
await asyncio.wait([print_a(), print_b()])
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
協(xié)程的運(yùn)行需要依附于一個(gè)事件循環(huán)(select/poll/epoll/kqueue),通過(guò)async def將一個(gè)函數(shù)定義為協(xié)程,通過(guò)await主動(dòng)讓渡控制權(quán),通過(guò)相互讓渡控制權(quán)完成交替打印字符。整個(gè)程序運(yùn)行于一個(gè)線程中,這樣就沒(méi)有線程間協(xié)調(diào)的工作,僅僅是控制權(quán)的讓渡邏輯。對(duì)于IO密集型操作,而沒(méi)有明顯的CPU阻塞(計(jì)算復(fù)雜,以致出現(xiàn)明顯的延時(shí),比如復(fù)雜加解密算法)的情況下非常合適。
附一個(gè)Java版本:
PrintMain類,用于管理和協(xié)調(diào)打印A和打印B的兩個(gè)線程:
package com.cuttyfox.tests.self.version1;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class PrintMain {
private boolean currentIsA = false;
public synchronized void waitingForPrintingA() throws InterruptedException {
while (this.currentIsA == false) {
wait();
}
}
public synchronized void waitingForPrintingB() throws InterruptedException {
while (this.currentIsA == true) {
wait();
}
}
public synchronized void setCurrentIsA(boolean flag) {
this.currentIsA = flag;
notifyAll();
}
public static void main(String[] args) throws Exception {
PrintMain state = new PrintMain();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(new PrintB(state));
executorService.execute(new PrintA(state));
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.SECONDS);
System.out.println("Done");
System.exit(0);
}
}
打印A的線程(首先打印A):
package com.cuttyfox.tests.self.version1;
import java.util.concurrent.TimeUnit;
public class PrintA implements Runnable{
private PrintMain state;
public PrintA(PrintMain state) {
this.state = state;
}
public void run() {
try {
while (!Thread.interrupted()){
System.out.println("Print A");
TimeUnit.SECONDS.sleep(1);
this.state.setCurrentIsA(true);
this.state.waitingForPrintingB();
}
} catch (InterruptedException e) {
System.out.println("Exit through Interrupting.");
}
}
}
打印B的線程:
package com.cuttyfox.tests.self.version1;
import java.util.concurrent.TimeUnit;
public class PrintB implements Runnable{
private PrintMain state;
public PrintB(PrintMain state) {
this.state = state;
}
public void run() {
try{
while (!Thread.interrupted()) {
this.state.waitingForPrintingA();
System.out.println("Print B");
TimeUnit.SECONDS.sleep(1);
this.state.setCurrentIsA(false);
}
} catch (InterruptedException e) {
System.out.println("Exit through Interrupting.");
}
}
}
Java對(duì)象本身有對(duì)象鎖,故這里沒(méi)有像Python中那樣需要顯式通過(guò)創(chuàng)建一個(gè)Condition對(duì)象來(lái)得到一把鎖。
使用Python實(shí)現(xiàn)交替打印abcdef的過(guò)程:
import threading
import time
import functools
from collections import deque
LETTERS = [chr(code) for code in range(97, 97+6)]
LENGTH = len(LETTERS)
class State(object):
def __init__(self):
self.condition = threading.Condition()
self.index_value = 0
def set_next_index(self, index):
with self.condition:
self.index_value = index
self.condition.notify_all()
def wait_for(self, index_value):
with self.condition:
while not self.index_value == index_value:
self.condition.wait()
def print_letter(state: State, wait_ident: int):
print('Got: {}!'.format(wait_ident))
while True:
state.wait_for(wait_ident)
time.sleep(2)
print(LETTERS[state.index_value])
print('PRINT: {} AND SET NEXT: {}'.format(state.index_value,
(state.index_value + 1) % LENGTH
))
state.set_next_index((state.index_value + 1) % LENGTH)
state = State()
d = deque()
d.extend(range(LENGTH))
d.rotate(1)
print(d)
threads = []
for wait_ident in d:
t = threading.Thread(target=functools.partial(print_letter, state, wait_ident))
threads.append(t)
for thread in threads:
thread.start()
for thread in threads:
thread.join()
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
python pycharm最新版本激活碼(永久有效)附python安裝教程
PyCharm是一個(gè)多功能的集成開(kāi)發(fā)環(huán)境,只需要在pycharm中創(chuàng)建python file就運(yùn)行python,并且pycharm內(nèi)置完備的功能,這篇文章給大家介紹python pycharm激活碼最新版,需要的朋友跟隨小編一起看看吧2020-01-01
Python Django view 兩種return的實(shí)現(xiàn)方式
這篇文章主要介紹了Python Django view 兩種return的實(shí)現(xiàn)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-03-03
Python函數(shù)必須先定義,后調(diào)用說(shuō)明(函數(shù)調(diào)用函數(shù)例外)
這篇文章主要介紹了Python函數(shù)必須先定義,后調(diào)用說(shuō)明(函數(shù)調(diào)用函數(shù)例外),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-06-06
如何打包Python Web項(xiàng)目實(shí)現(xiàn)免安裝一鍵啟動(dòng)的方法
這篇文章主要介紹了如何打包Python Web項(xiàng)目,實(shí)現(xiàn)免安裝一鍵啟動(dòng),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-05-05
Python實(shí)現(xiàn)爬蟲(chóng)設(shè)置代理IP和偽裝成瀏覽器的方法分享
今天小編就為大家分享一篇Python實(shí)現(xiàn)爬蟲(chóng)設(shè)置代理IP和偽裝成瀏覽器的方法分享,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-05-05
django restframework使用redis實(shí)現(xiàn)token認(rèn)證
本文主要介紹了django restframework使用redis實(shí)現(xiàn)token認(rèn)證,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-09-09

