淺析python多線程中的鎖
Python 中多線程主要有以下幾種類(lèi)型的鎖:
- threading.Lock:這是最基礎(chǔ)的鎖對(duì)象,不屬于任何線程,在Python中,它是一個(gè)同步原語(yǔ)對(duì)象。一次只有一個(gè)線程可以獲得鎖。如果一個(gè)線程試圖獲得一個(gè)已經(jīng)被其他線程拿著的鎖,那么這個(gè)線程就會(huì)被阻塞,直到擁有鎖的線程釋放。
- threading.RLock:可重入鎖(RLock),允許同一線程多次獲取鎖,但是每次獲取鎖都需要釋放,通常用于遞歸函數(shù)。如果使用Lock,那么在同一個(gè)線程多次獲取鎖時(shí)會(huì)產(chǎn)生死鎖。
- threading.Semaphore:信號(hào)量,允許一定數(shù)量的線程同時(shí)獲取鎖。例如,如果你有一些資源,每次可以被5個(gè)線程同時(shí)訪問(wèn),那么你就可以使用一個(gè)初始化為5的信號(hào)量。
- threading.BoundedSemaphore:有界信號(hào)量,與Semaphore類(lèi)似,但是在釋放鎖的時(shí)候,會(huì)檢查當(dāng)前的信號(hào)量的值是否超過(guò)了初始值,如果超過(guò)了會(huì)拋出一個(gè)異常。
- threading.Condition:條件變量,允許一個(gè)或多個(gè)線程等待某個(gè)條件滿足,然后喚醒。
- threading.Event:事件對(duì)象。事件對(duì)象內(nèi)部有一個(gè)標(biāo)志位,初始為False。如果標(biāo)志位為T(mén)rue,wait()不做任何事情,如果標(biāo)志位為False,wait()會(huì)阻塞??梢酝ㄟ^(guò)set()來(lái)設(shè)置標(biāo)志位,clear()來(lái)清除標(biāo)志位。
- threading.Barrier:柵欄對(duì)象,允許一定數(shù)量的線程同步,直到所有線程都到達(dá)柵欄位置,才會(huì)全部釋放。
注意,在Python中,由于全局解釋器鎖(GIL)的存在,同一時(shí)刻只允許一個(gè)線程執(zhí)行Python字節(jié)碼,因此Python的多線程并不能實(shí)現(xiàn)真正的并行計(jì)算。如果需要進(jìn)行并行計(jì)算,可以使用multiprocessing
模塊,或者使用其他的并行計(jì)算框架,如concurrent.futures
。
threading.Lock():
線程鎖,可用于同步多個(gè)線程對(duì)共享資源的訪問(wèn)。
錯(cuò)誤示范:
存在一個(gè)問(wèn)題,就是在每次循環(huán)的時(shí)候都創(chuàng)建了一個(gè)新的鎖。這樣的話,每個(gè)鎖都是獨(dú)立的,它們之間不能保證對(duì)共享資源 counter
的互斥訪問(wèn),因此在多線程環(huán)境下,counter += 1
操作可能會(huì)出現(xiàn)競(jìng)態(tài)條件,導(dǎo)致結(jié)果不是預(yù)期的 10000000
。
import threading counter = 0 # 共享資源 def increment(): global counter for _ in range(1000000): with threading.Lock(): # 獲取線程鎖 counter += 1 threads = [threading.Thread(target=increment) for _ in range(10)] for t in threads: t.start() for t in threads: t.join() print(counter) # 輸出 10000000
正確示例
import threading counter = 0 # 共享資源 lock = threading.Lock() # 創(chuàng)建一個(gè)共享的鎖 def increment(): global counter for _ in range(1000000): with lock: # 獲取線程鎖 counter += 1 threads = [threading.Thread(target=increment) for _ in range(10)] for t in threads: t.start() for t in threads: t.join() print(counter) # 輸出應(yīng)該是10000000
threading.RLock()
可重入鎖,是一種特殊的線程鎖,允許同一線程多次獲得鎖。
import threading # 創(chuàng)建一個(gè)RLock對(duì)象 lock = threading.RLock() def recursive_function(level): with lock: if level > 0: print('Entering level', level) recursive_function(level - 1) print('Exiting level', level) else: print('Base case') # 在主線程中運(yùn)行遞歸函數(shù) recursive_function(5)
在這個(gè)例子中,recursive_function
是一個(gè)遞歸函數(shù),它會(huì)在每個(gè)遞歸級(jí)別上獲取一次鎖。由于我們使用的是 threading.RLock
,同一線程可以多次獲取鎖,所以這個(gè)代碼能夠正常運(yùn)行。
但是如果我們使用 threading.Lock
替代 threading.RLock
,那么在第二次嘗試獲取鎖時(shí),線程將被阻塞,因?yàn)?threading.Lock
不允許同一線程多次獲取鎖。這會(huì)導(dǎo)致死鎖,程序?qū)o(wú)法繼續(xù)運(yùn)行。
threading.RLock
(Reentrant Lock)在Python中是一種可重入鎖,也就是說(shuō),它允許同一線程在沒(méi)有釋放其所有權(quán)的情況下多次獲取同一個(gè)鎖。這在某些情況下是非常有用的,例如在遞歸函數(shù)或者嵌套調(diào)用中。
以下是一些具體的應(yīng)用場(chǎng)景:
- 遞歸函數(shù):在遞歸函數(shù)中,同一個(gè)線程可能需要多次獲取同一個(gè)鎖。如果使用普通的
threading.Lock
,那么在第二次嘗試獲取鎖的時(shí)候,線程將會(huì)阻塞,因?yàn)槠胀ǖ逆i不允許同一線程多次獲取。而threading.RLock
則允許這種行為。 - 復(fù)雜的同步問(wèn)題:在某些復(fù)雜的同步問(wèn)題中,你可能需要在同一線程中的多個(gè)函數(shù)或者多個(gè)不同的代碼塊中獲取同一個(gè)鎖。如果使用普通的
threading.Lock
,那么你需要確保在每次獲取鎖之前都已經(jīng)釋放了鎖,這在某些情況下可能會(huì)很復(fù)雜。而threading.RLock
則可以簡(jiǎn)化這種情況的處理。 - 實(shí)現(xiàn)高級(jí)同步原語(yǔ):你可以使用
threading.RLock
來(lái)實(shí)現(xiàn)一些更高級(jí)的同步原語(yǔ),例如讀寫(xiě)鎖(ReadWriteLock
)。在這種情況下,讀鎖可以被同一線程多次獲取,而寫(xiě)鎖只能被同一線程獲取一次。你可以使用threading.RLock
來(lái)實(shí)現(xiàn)讀鎖的行為。
需要注意的是,雖然 threading.RLock
在某些情況下非常有用,但是在大多數(shù)情況下,你仍然應(yīng)該使用更簡(jiǎn)單的 threading.Lock
。因?yàn)檫^(guò)度使用 threading.RLock
可能會(huì)使你的代碼更復(fù)雜,更難以理解和維護(hù)。而且,不正確的使用 threading.RLock
可能會(huì)導(dǎo)致死鎖。
threading.Semaphore():
信號(hào)量,用于限制同時(shí)訪問(wèn)某一資源的線程數(shù)量。
模擬一個(gè)有限資源池(例如數(shù)據(jù)庫(kù)連接池),限制同時(shí)訪問(wèn)資源的線程數(shù)量。
import threading import time # 定義一個(gè)有限資源池 RESOURCE_POOL_SIZE = 3 semaphore = threading.Semaphore(RESOURCE_POOL_SIZE) def access_resource(thread_id): print(f"Thread {thread_id} is requesting access to the resource pool.") with semaphore: print(f"Thread {thread_id} has acquired access to the resource pool.") time.sleep(1) # 模擬資源使用 print(f"Thread {thread_id} has released access to the resource pool.") # 創(chuàng)建10個(gè)線程 threads = [threading.Thread(target=access_resource, args=(i,)) for i in range(10)] for t in threads: t.start() for t in threads: t.join()
在這個(gè)例子中,我們有一個(gè)有限的資源池,其大小由 RESOURCE_POOL_SIZE
定義。我們使用一個(gè) threading.Semaphore
對(duì)象,將其初始值設(shè)為資源池的大小,以限制同時(shí)訪問(wèn)資源池的線程數(shù)量。
我們創(chuàng)建了10個(gè)線程,每個(gè)線程都嘗試訪問(wèn)資源池。由于我們使用了信號(hào)量,一次只能有 RESOURCE_POOL_SIZE
個(gè)線程同時(shí)訪問(wèn)資源池。其他線程將等待,直到有線程釋放資源。這樣,我們可以限制同時(shí)訪問(wèn)資源的線程數(shù)量,防止資源競(jìng)爭(zhēng)或過(guò)載。
threading.BoundedSemaphore:
有界信號(hào)量
import threading # 創(chuàng)建一個(gè)有界信號(hào)量,初始值為2 semaphore = threading.BoundedSemaphore(2) def access_resource(thread_id): print(f"Thread {thread_id} is requesting access to the resource.") with semaphore: print(f"Thread {thread_id} has acquired access to the resource.") # 模擬資源使用 # 創(chuàng)建3個(gè)線程 threads = [threading.Thread(target=access_resource, args=(i,)) for i in range(3)] for t in threads: t.start() for t in threads: t.join() # 嘗試釋放未獲取的信號(hào)量,將引發(fā) ValueError try: semaphore.release() except ValueError as e: print("Caught exception:", e)
在這個(gè)代碼段中,semaphore.release()
嘗試釋放(即增加)信號(hào)量的計(jì)數(shù)。在Python中,信號(hào)量是一個(gè)用來(lái)限制線程并發(fā)數(shù)量的同步原語(yǔ),它內(nèi)部有一個(gè)計(jì)數(shù)器。當(dāng)一個(gè)線程調(diào)用 acquire()
方法時(shí),信號(hào)量的計(jì)數(shù)器減一;當(dāng)一個(gè)線程調(diào)用 release()
方法時(shí),信號(hào)量的計(jì)數(shù)器加一。
threading.BoundedSemaphore
與 threading.Semaphore
的行為基本相同,但有一點(diǎn)不同:如果在調(diào)用 release()
后,信號(hào)量的計(jì)數(shù)器的值大于創(chuàng)建信號(hào)量時(shí)設(shè)定的初始值,threading.BoundedSemaphore
將拋出 ValueError
異常。這種行為有助于檢測(cè)程序中的一些錯(cuò)誤,例如錯(cuò)誤地多次釋放了信號(hào)量。
所以在這個(gè)示例中,try
/except
塊是為了捕獲并處理可能由 semaphore.release()
拋出的 ValueError
異常。如果在調(diào)用 release()
后,信號(hào)量的計(jì)數(shù)器的值大于創(chuàng)建信號(hào)量時(shí)設(shè)定的初始值,那么將會(huì)捕獲到 ValueError
,并打印出 “Caught exception:” 及其錯(cuò)誤信息。
threading.Condition:
條件變量,允許一個(gè)或多個(gè)線程等待某個(gè)條件滿足,然后喚醒。
import threading import time # 創(chuàng)建一個(gè)條件變量 condition = threading.Condition() # 創(chuàng)建一個(gè)共享資源 resource = [] def producer(): for i in range(5): time.sleep(1) # 模擬生產(chǎn)過(guò)程 with condition: resource.append(i) # 向資源中添加數(shù)據(jù) condition.notify() # 喚醒一個(gè)等待的線程 def consumer(): while True: with condition: while not resource: # 如果資源為空,則等待 condition.wait() item = resource.pop(0) # 從資源中獲取數(shù)據(jù) print(f"Consumer consumed: {item}") if item == 4: # 如果消費(fèi)了所有的資源,就退出循環(huán) break # 創(chuàng)建一個(gè)生產(chǎn)者線程和一個(gè)消費(fèi)者線程 producer_thread = threading.Thread(target=producer) consumer_thread = threading.Thread(target=consumer) producer_thread.start() consumer_thread.start() producer_thread.join() consumer_thread.join()
threading.Condition
對(duì)象常常與一個(gè)鎖一起使用,允許一個(gè)或多個(gè)線程等待直到滿足某個(gè)特定的條件,然后喚醒。這在某些情況下很有用,比如當(dāng)你需要一個(gè)或多個(gè)線程等待直到其他線程完成特定任務(wù)或者改變了某個(gè)狀態(tài)。
threading.Barrier:
柵欄對(duì)象,允許一定數(shù)量的線程同步,直到所有線程都到達(dá)柵欄位置,才會(huì)全部釋放
import threading import time # 設(shè)定柵欄,允許3個(gè)線程進(jìn)行同步 barrier = threading.Barrier(3) def worker(thread_id): print(f"Thread {thread_id} is starting.") time.sleep(thread_id) # 模擬線程執(zhí)行過(guò)程中的延時(shí) print(f"Thread {thread_id} is waiting at the barrier.") barrier.wait() # 等待所有線程到達(dá)柵欄 print(f"Thread {thread_id} is released from the barrier.") # 創(chuàng)建3個(gè)線程 threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)] for t in threads: t.start() for t in threads: t.join()
在這個(gè)例子中,我們創(chuàng)建了一個(gè)柵欄對(duì)象,允許3個(gè)線程進(jìn)行同步。然后我們創(chuàng)建了3個(gè)線程,每個(gè)線程都會(huì)執(zhí)行 worker
函數(shù)。在 worker
函數(shù)中,線程首先打印一個(gè)開(kāi)始信息,然后等待一段時(shí)間(這里是線程ID,可以模擬線程執(zhí)行過(guò)程中的延時(shí)),接著打印一個(gè)等待信息,然后調(diào)用 barrier.wait()
等待柵欄。
當(dāng)所有3個(gè)線程都調(diào)用了 barrier.wait()
,柵欄將釋放所有等待的線程。這時(shí),線程將繼續(xù)執(zhí)行并打印它們已經(jīng)從柵欄中釋放的信息。這個(gè)示例演示了如何使用 threading.Barrier
對(duì)象來(lái)同步一組線程,確保它們?cè)谀硞€(gè)點(diǎn)上相互等待。
到此這篇關(guān)于淺析python多線程中的鎖的文章就介紹到這了,更多相關(guān)python多線程鎖內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
linux系統(tǒng)使用python監(jiān)測(cè)網(wǎng)絡(luò)接口獲取網(wǎng)絡(luò)的輸入輸出
這篇文章主要介紹了linux系統(tǒng)使用python監(jiān)測(cè)網(wǎng)絡(luò)接口獲取網(wǎng)絡(luò)的輸入輸出信息,大家參考使用吧2014-01-01使用matplotlib繪制熱圖(heatmap)全過(guò)程
這篇文章主要介紹了使用matplotlib繪制熱圖(heatmap)全過(guò)程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-12-12Python中type的構(gòu)造函數(shù)參數(shù)含義說(shuō)明
這篇文章主要介紹了Python中type的構(gòu)造函數(shù)參數(shù)含義說(shuō)明,本文用一個(gè)編碼實(shí)例解釋Python type的參數(shù)的作用和含義,需要的朋友可以參考下2015-06-06淺談Python3中strip()、lstrip()、rstrip()用法詳解
這篇文章主要介紹了淺談Python3中strip()、lstrip()、rstrip()用法詳解,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2019-04-04利用python中集合的唯一性實(shí)現(xiàn)去重
集合,用{ }表示,集合中所有元素具有唯一性。這篇文章給大家介紹利用python中集合的唯一性實(shí)現(xiàn)去重,感興趣的朋友一起看看吧2020-02-02Python爬蟲(chóng)requests庫(kù)多種用法實(shí)例
這篇文章主要介紹了Python爬蟲(chóng)requests庫(kù)多種用法實(shí)例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-05-05Python靜態(tài)Web服務(wù)器面向?qū)ο筇幚砜蛻舳苏?qǐng)求
這篇文章主要為大家介紹了Python面向?qū)ο髮?shí)現(xiàn)靜態(tài)Web服務(wù)器處理客戶端請(qǐng)求示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-06-06