Python線程同步的實(shí)現(xiàn)代碼
本文介紹Python中的線程同步對(duì)象,主要涉及 thread 和 threading 模塊。
threading 模塊提供的線程同步原語包括:Lock、RLock、Condition、Event、Semaphore等對(duì)象。
線程執(zhí)行
join與setDaemon
子線程在主線程運(yùn)行結(jié)束后,會(huì)繼續(xù)執(zhí)行完,如果給子線程設(shè)置為守護(hù)線程(setDaemon=True),主線程運(yùn)行結(jié)束子線程即結(jié)束;
如果join()線程,那么主線程會(huì)等待子線程執(zhí)行完再執(zhí)行。
import threading import time def get_thread_a(): print("get thread A started") time.sleep(3) print("get thread A end") def get_thread_b(): print("get thread B started") time.sleep(5) print("get thread B end") if __name__ == "__main__": thread_a = threading.Thread(target=get_thread_a) thread_b = threading.Thread(target=get_thread_b) start_time = time.time() thread_b.setDaemon(True) thread_a.start() thread_b.start() thread_a.join() end_time = time.time() print("execution time: {}".format(end_time - start_time))
thread_a是join,首先子線程thread_a執(zhí)行,thread_b是守護(hù)線程,當(dāng)主線程執(zhí)行完后,thread_b不會(huì)再執(zhí)行執(zhí)行結(jié)果如下:
get thread A started
get thread B started
get thread A end
execution time: 3.003199815750122
線程同步
當(dāng)線程間共享全局變量,多個(gè)線程對(duì)該變量執(zhí)行不同的操作時(shí),該變量最終的結(jié)果可能是不確定的(每次線程執(zhí)行后的結(jié)果不同),如:對(duì)count變量執(zhí)行加減操作 ,count的值是不確定的,要想count的值是一個(gè)確定的需對(duì)線程執(zhí)行的代碼段加鎖。
python對(duì)線程加鎖主要有Lock和Rlock模塊
Lock:
from threading import Lock lock = Lock() lock.acquire() lock.release()
Lock有acquire()和release()方法,這兩個(gè)方法必須是成對(duì)出現(xiàn)的,acquire()后面必須release()后才能再acquire(),否則會(huì)造成死鎖
Rlock:
鑒于Lock可能會(huì)造成死鎖的情況,RLock(可重入鎖)對(duì)Lock進(jìn)行了改進(jìn),RLock可以在同一個(gè)線程里面連續(xù)調(diào)用多次acquire(),但必須再執(zhí)行相同次數(shù)的release()
from threading import RLock lock = RLock() lock.acquire() lock.acquire() lock.release() lock.release()
condition(條件變量),線程在執(zhí)行時(shí),當(dāng)滿足了特定的條件后,才可以訪問相關(guān)的數(shù)據(jù)
import threading def get_thread_a(condition): with condition: condition.wait() print("A : Hello B,that's ok") condition.notify() condition.wait() print("A : I'm fine,and you?") condition.notify() condition.wait() print("A : Nice to meet you") condition.notify() condition.wait() print("A : That's all for today") condition.notify() def get_thread_b(condition): with condition: print("B : Hi A, Let's start the conversation") condition.notify() condition.wait() print("B : How are you") condition.notify() condition.wait() print("B : I'm fine too") condition.notify() condition.wait() print("B : Nice to meet you,too") condition.notify() condition.wait() print("B : Oh,goodbye") if __name__ == "__main__": condition = threading.Condition() thread_a = threading.Thread(target=get_thread_a, args=(condition,)) thread_b = threading.Thread(target=get_thread_b, args=(condition,)) thread_a.start() thread_b.start()
Condition內(nèi)部有一把鎖,默認(rèn)是RLock,在調(diào)用wait()和notify()之前必須先調(diào)用acquire()獲取這個(gè)鎖,才能繼續(xù)執(zhí)行;當(dāng)wait()和notify()執(zhí)行完后,需調(diào)用release()釋放這個(gè)鎖,在執(zhí)行with condition時(shí),會(huì)先執(zhí)行acquire(),with結(jié)束時(shí),執(zhí)行了release();所以condition有兩層鎖,最底層鎖在調(diào)用wait()時(shí)會(huì)釋放,同時(shí)會(huì)加一把鎖到等待隊(duì)列,等待notify()喚醒釋放鎖
wait() :允許等待某個(gè)條件變量的通知,notify()可喚醒
notify(): 喚醒等待隊(duì)列wait()
執(zhí)行結(jié)果:
B : Hi A, Let's start the conversation
A : Hello B,that's ok
B : How are you
A : I'm fine,and you?
B : I'm fine too
A : Nice to meet you
B : Nice to meet you,too
A : That's all for today
B : Oh,goodbye
Semaphore(信號(hào)量)
用于控制線程的并發(fā)數(shù),如爬蟲中請(qǐng)求次數(shù)過于頻繁會(huì)被禁止ip,每次控制爬取網(wǎng)頁的線程數(shù)量可在一定程度上防止ip被禁;文件讀寫中,控制寫線程每次只有一個(gè),讀線程可多個(gè)。
import time import threading def get_thread_a(semaphore,i): time.sleep(1) print("get thread : {}".format(i)) semaphore.release() def get_thread_b(semaphore): for i in range(10): semaphore.acquire() thread_a = threading.Thread(target=get_thread_a, args=(semaphore,i)) thread_a.start() if __name__ == "__main__": semaphore = threading.Semaphore(2) thread_b = threading.Thread(target=get_thread_b, args=(semaphore,)) thread_b.start()
上述示例了每隔1秒并發(fā)兩個(gè)線程執(zhí)行的情況,當(dāng)調(diào)用一次semaphore.acquire()時(shí),Semaphore的數(shù)量就減1,直至Semaphore數(shù)量為0時(shí)被鎖上,當(dāng)release()后Semaphore數(shù)量加1。Semaphore在本質(zhì)上是調(diào)用的Condition,semaphore.acquire()在Semaphore的值為0的條件下會(huì)調(diào)用Condition.wait(), 否則將值減1,semaphore.release()會(huì)將Semaphore的值加1,并調(diào)用Condition.notify()
Semaphore源碼
def acquire(self, blocking=True, timeout=None): if not blocking and timeout is not None: raise ValueError("can't specify timeout for non-blocking acquire") rc = False endtime = None with self._cond: while self._value == 0: if not blocking: break if timeout is not None: if endtime is None: endtime = _time() + timeout else: timeout = endtime - _time() if timeout <= 0: break self._cond.wait(timeout) else: self._value -= 1 rc = True return rc def release(self): with self._cond: self._value += 1 self._cond.notify()
以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,如果有疑問大家可以留言交流,謝謝大家對(duì)腳本之家的支持。
相關(guān)文章
Python基礎(chǔ)之字符串常見操作經(jīng)典實(shí)例詳解
這篇文章主要介紹了Python基礎(chǔ)之字符串常見操作,結(jié)合實(shí)例形式詳細(xì)分析了Python字符串操作基本函數(shù)、功能、使用方法及操作注意事項(xiàng),需要的朋友可以參考下2020-02-02基于Python實(shí)現(xiàn)簡(jiǎn)單的漢字拼音轉(zhuǎn)換工具
將漢字轉(zhuǎn)為拼音,可以用于批量漢字注音、文字排序、拼音檢索文字等常見場(chǎng)景?,F(xiàn)在互聯(lián)網(wǎng)上有許多拼音轉(zhuǎn)換工具,基于Python的開源模塊也不少,本文將利用pypinyin模塊制作簡(jiǎn)單的漢字拼音轉(zhuǎn)換工具,感興趣的可以了解一下2022-09-09使用Python腳本實(shí)現(xiàn)批量網(wǎng)站存活檢測(cè)遇到問題及解決方法
本文是小編自己編寫的一個(gè)使用python實(shí)現(xiàn)批量網(wǎng)站存活檢測(cè)。在項(xiàng)目測(cè)試中非常有用。本文給大家分享了遇到的問題及解決方案,非常不錯(cuò),具有參考借鑒價(jià)值,感興趣的朋友一起看看吧2016-10-10Python退出While循環(huán)的3種方法舉例詳解
在每次循環(huán)結(jié)束后,我們需要檢查循環(huán)條件是否滿足。如果條件滿足,則繼續(xù)執(zhí)行循環(huán)體內(nèi)的代碼,否則退出循環(huán),這篇文章主要給大家介紹了關(guān)于Python退出While循環(huán)的3種方法,需要的朋友可以參考下2023-10-10用OpenCV進(jìn)行年齡和性別檢測(cè)的實(shí)現(xiàn)示例
這篇文章主要介紹了用 OpenCV 進(jìn)行年齡和性別檢測(cè)的實(shí)現(xiàn)示例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-01-01詳解Python實(shí)現(xiàn)多進(jìn)程異步事件驅(qū)動(dòng)引擎
本篇文章主要介紹了詳解Python實(shí)現(xiàn)多進(jìn)程異步事件驅(qū)動(dòng)引擎,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-08-08