Python3多線程詳解
為什么要使用多線程?
使用多線程,可以同時進行多項任務,可以使用戶界面更友好,還可以后臺執(zhí)行某些用時長的任務,同時具有易于通信的優(yōu)點。
python3中多線程的實現(xiàn)使用了threading模塊,它允許同一進程中運行多個線程。
如何創(chuàng)建和執(zhí)行一個線程
一般我們有兩種方法來創(chuàng)建線程,一種是以某個函數(shù)來作為起點,另一種是繼承Thread類。
方法一
獲取一個Thread對象,構造參數(shù)中target是起點函數(shù),注意不要加括號
。假如起點函數(shù)有參數(shù),則可以通過args輸入元組參數(shù)或者kwargs輸入字典參數(shù)。
#! -*-conding=: UTF-8 -*- # 2023/5/6 15:53 import time from threading import Thread def task(): print("另外開始一個子線程做任務啦") time.sleep(1) # 用time.sleep模擬任務耗時 print("子線程任務結束啦") if __name__ == '__main__': print("這里是主線程") # 創(chuàng)建線程對象 t1 = Thread(target=task) # 啟動 t1.start() time.sleep(0.3) print("主線程依然可以干別的事")
輸出結果為:
這里是主線程
另外開始一個子線程做任務啦
主線程依然可以干別的事
子線程任務結束啦
方法二
#! -*-conding=: UTF-8 -*- # 2023/5/6 15:53 import time from threading import Thread class NewThread(Thread): def __init__(self): Thread.__init__(self) # 必須步驟 def run(self): # 入口是名字為run的方法 print("開始新的線程做一個任務啦") time.sleep(1) # 用time.sleep模擬任務耗時 print("這個新線程中的任務結束啦") if __name__ == '__main__': print("這里是主線程") # 創(chuàng)建線程對象 t1 = NewThread() # 啟動 t1.start() time.sleep(0.3) # 這里如果主線程結束,子線程會立刻退出,暫時先用sleep規(guī)避 print("主線程依然可以干別的事")
正式介紹threading模塊
關于線程信息的函數(shù):
threading.active_count()
:返回當前存活的Thread對象數(shù)量。threading.current_thread()
:返回當前線程的Thread對象。threading.enumerate()
:列表形式返回所有存活的Thread對象。threading.main_thread()
:返回主Thread對象。
Thread對象的方法及屬性:
Thread.name
:線程的名字,沒有語義,可以相同名稱。Thread.ident
:線程標識符,非零整數(shù)。Thread.Daemon
:是否為守護線程。Thread.is_alive()
:是否存活。Thread.start()
:開始線程活動。若多次調用拋出RuntimeError。Thread.run()
:用來重載的,Thread.join(timeout=None)
:等待直到線程正?;虍惓=Y束。尚未開始拋出RuntimeErrorThread(group=None, target=None, name=None, args=(), kwargs={}, *, deamon=None)
:構造函數(shù)。
守護線程 Daemon
在Python 3中,守護線程(daemon thread)是一種特殊的線程,它在程序運行時在后臺運行,不會阻止程序的退出。當主線程退出時,守護線程也會自動退出,而不需要等待它執(zhí)行完畢。
方法一
在創(chuàng)建線程對象時,可以通過設置daemon
屬性為True
來創(chuàng)建守護線程,例如:
import threading import time def worker(): while True: print('Worker thread running') time.sleep(1) # 創(chuàng)建守護線程 t = threading.Thread(target=worker, daemon=True) # 啟動線程 t.start() # 主線程執(zhí)行一些操作 print('Main thread running') time.sleep(5) print('Main thread finished')
在這個示例中,我們創(chuàng)建了一個守護線程worker()
,并將daemon
屬性設置為True
。在主線程中,我們執(zhí)行了一些操作,并休眠5秒鐘。由于守護線程的存在,即使主線程已經(jīng)結束,守護線程仍會在后臺運行。
方法二
設置守護線程用Thread.setDaemon(bool)
#! -*-conding=: UTF-8 -*- # 2023/5/6 16:06 import time from threading import Thread def task1(): print("開始子線程1做任務1啦") time.sleep(1) # 用time.sleep模擬任務耗時 print("子線程1中的任務1結束啦") def task2(): print("開始子線程2做任務2啦") for i in range(5): print("任務2-{}".format(i)) time.sleep(1) print("子線程2中的任務2結束啦") if __name__ == '__main__': print("這里是主線程") # 創(chuàng)建線程對象 t1 = Thread(target=task1) t2 = Thread(target=task2) t2.setDaemon(True) # 設置為守護進程,必須在start之前 # 啟動 t1.start() t2.start() time.sleep(1) print("主線程結束了")
輸出結果為:
這里是主線程
開始子線程1做任務1啦
開始子線程2做任務2啦
任務2-0
主線程結束了
子線程1中的任務1結束啦任務2-1
守護線程的作用在于,當我們需要在程序運行時執(zhí)行一些后臺任務,但是不想讓這些任務阻止程序的正常退出時,可以使用守護線程。
例如,在一個Web應用程序中,我們可能需要啟動一個守護線程來定期清理緩存或者執(zhí)行一些后臺任務。
需要注意的是,守護線程無法完全控制其執(zhí)行過程,因此不能用于一些必須在程序退出之前完成的任務。同時,守護線程不能訪問一些主線程資源,例如共享內存或者打開的文件,因為這些資源可能會在主線程結束時被釋放。
讓主線程等待子線程結束 join
假如要讓主線程等子線程結束,那么可以使用Thread.join()方法。
當調用線程對象的join()
方法時,主線程將被阻塞,直到該線程執(zhí)行完成或者超時。
以下是一個簡單的示例:
import threading import time def worker(): print('Worker thread started') time.sleep(2) print('Worker thread finished') # 創(chuàng)建線程對象 t = threading.Thread(target=worker) # 啟動線程 t.start() # 等待線程結束 t.join() # 主線程繼續(xù)執(zhí)行 print('Main thread finished')
在這個示例中,我們創(chuàng)建了一個子線程worker()
,并使用start()
方法啟動線程。在主線程中,我們調用了線程對象的join()
方法,讓主線程等待子線程執(zhí)行完畢。在子線程執(zhí)行完畢后,主線程繼續(xù)執(zhí)行。
需要注意的是,join()
方法還可以設置超時時間,以避免無限期等待線程的執(zhí)行。例如:
import threading import time def worker(): print('Worker thread started') time.sleep(2) print('Worker thread finished') # 創(chuàng)建線程對象 t = threading.Thread(target=worker) # 啟動線程 t.start() # 等待線程結束,最多等待3秒鐘 t.join(3) # 主線程繼續(xù)執(zhí)行 print('Main thread finished')
在這個示例中,我們設置了join()
方法的超時時間為3秒鐘,即使子線程沒有執(zhí)行完成,主線程也會在3秒鐘后繼續(xù)執(zhí)行。
線程共享資源可能引起什么問題?
在線程編程中,多個線程可能同時訪問和修改同一個共享資源,例如全局變量、共享內存、文件等。如果沒有進行適當?shù)耐讲僮?,就可能會引發(fā)以下問題:
競態(tài)條件(Race Condition):當多個線程同時訪問和修改同一個共享資源時,就可能會發(fā)生競態(tài)條件。這種情況下,由于線程執(zhí)行順序的不確定性,可能會導致資源被錯誤地讀取或寫入,從而引發(fā)程序的錯誤或崩潰。
死鎖(Deadlock):當多個線程都在等待另一個線程釋放某個資源時,就可能會發(fā)生死鎖。這種情況下,程序會永久地阻塞在這個狀態(tài)下,無法繼續(xù)執(zhí)行。
活鎖(Livelock):多個線程相互協(xié)作,但是由于某些原因無法前進,導致它們不斷重試,最終導致系統(tǒng)陷入死循環(huán)?;铈i是一種比死鎖更難以診斷和解決的問題。
為了避免以上問題,我們可以使用線程同步機制來保護共享資源的訪問。
例如,可以使用鎖(Lock)
、信號量(Semaphore)
、條件變量(Condition)
等機制來限制同時訪問共享資源的線程數(shù)量,從而避免競態(tài)條件。同時,也可以使用一些算法和策略來避免死鎖和活鎖等問題的發(fā)生。
下面是一些具體的例子,說明在多線程程序中共享資源可能引發(fā)的問題:
競態(tài)條件
import threading x = 0 def increment(): global x x += 1 threads = [] for i in range(1000): t = threading.Thread(target=increment) threads.append(t) t.start() for t in threads: t.join() print('x =', x)
在這個例子中,我們創(chuàng)建了1000個線程來執(zhí)行increment()
函數(shù),這個函數(shù)會對全局變量x
進行加1操作。由于多個線程同時訪問和修改x
變量,就會產(chǎn)生競態(tài)條件,導致x
的最終值可能小于1000。
死鎖
import threading lock1 = threading.Lock() lock2 = threading.Lock() def worker1(): print('Worker 1 acquiring lock 1') lock1.acquire() print('Worker 1 acquired lock 1') print('Worker 1 acquiring lock 2') lock2.acquire() print('Worker 1 acquired lock 2') lock2.release() lock1.release() def worker2(): print('Worker 2 acquiring lock 2') lock2.acquire() print('Worker 2 acquired lock 2') print('Worker 2 acquiring lock 1') lock1.acquire() print('Worker 2 acquired lock 1') lock1.release() lock2.release() t1 = threading.Thread(target=worker1) t2 = threading.Thread(target=worker2) t1.start() t2.start() t1.join() t2.join() print('Main thread finished')
在這個例子中,我們創(chuàng)建了兩個線程worker1()
和worker2()
,它們都需要同時獲取lock1
和lock2
兩個鎖來執(zhí)行操作。由于worker1()
先獲取lock1
,然后嘗試獲取lock2
,而worker2()
先獲取lock2
,然后嘗試獲取lock1
,就可能會產(chǎn)生死鎖的情況。
活鎖
import threading class Account: def __init__(self, balance): self.balance = balance self.lock = threading.Lock() def withdraw(self, amount): with self.lock: if self.balance < amount: print('Withdraw failed: not enough balance') return False print(f'Withdraw {amount} from account') self.balance -= amount return True def transfer(self, target, amount): while True: if self.withdraw(amount): if target.deposit(amount): return True else: self.deposit(amount) else: return False def deposit(self, amount): with self.lock: print(f'Deposit {amount} to account') self.balance += amount return True def worker1(acc1, acc2): while True: acc1.transfer(acc2, 100) print('Worker 1: transfer complete') def worker2(acc1, acc2): while True: acc2.transfer(acc1, 100) print('Worker 2: transfer complete') acc1 = Account(1000) acc2 = Account(1000) t1 = threading.Thread(target=worker1, args=(acc1, acc2)) t2 = threading.Thread(target=worker2, args=(acc1, acc2)) t1.start() t2.start() t1.join() t2.join()
在這個例子中,我們創(chuàng)建了兩個賬戶acc1
和acc2
,并創(chuàng)建了兩個線程worker1()
和worker2()
,它們不斷地在這兩個賬戶之間轉賬。
由于transfer()
方法中需要獲取鎖來修改賬戶余額,但是兩個線程的執(zhí)行順序可能會導致它們同時等待對方釋放鎖,從而無法前進,最終導致系統(tǒng)陷入活鎖的狀態(tài)。
具體來說,假設worker1()
執(zhí)行了acc1.transfer(acc2, 100)
,然后進入了transfer()
方法中的if self.withdraw(amount)
分支,在等待acc1
的鎖。
此時,worker2()
執(zhí)行了acc2.transfer(acc1, 100)
,然后也進入了transfer()
方法中的if self.withdraw(amount)
分支,在等待acc2
的鎖。由于acc1
和acc2
之間的轉賬是相互依賴的,因此這兩個線程無法前進,會一直重試,最終導致系統(tǒng)陷入活鎖的狀態(tài)。
多線程的鎖機制
在Python3中,鎖機制是一種線程同步機制,它用于協(xié)調多個線程的并發(fā)訪問共享資源,以避免競態(tài)條件的發(fā)生。
Python 3中的多線程鎖機制主要是通過threading
模塊中的Lock
、RLock
和Semaphore
等類來實現(xiàn)的。
Lock
類是最基本的鎖,它提供了兩個基本方法acquire()
和release()
,用于獲取鎖和釋放鎖。當一個線程調用acquire()
方法時,如果該鎖沒有被其他線程獲取,則該線程獲取到該鎖并進入臨界區(qū),否則該線程就會被阻塞,直到該鎖被其他線程釋放為止。
RLock
類是可重入鎖,它允許同一個線程多次獲取該鎖,每次獲取都必須有對應的釋放操作。如果一個線程已經(jīng)獲取到該鎖,它可以再次獲取該鎖而不被阻塞,這就是可重入的特性。RLock
類提供了acquire()
和release()
方法,與Lock
類相同。
Semaphore
類是信號量,它與鎖類似,但可以允許多個線程同時訪問某個資源,而不是像鎖一樣只允許一個線程訪問。它提供了acquire()
和release()
方法,用于獲取和釋放資源。
下面是一個使用Lock
類的示例代碼:
import threading counter = 0 lock = threading.Lock() def increment(): global counter for i in range(100000): lock.acquire() counter += 1 lock.release() threads = [] for i in range(10): t = threading.Thread(target=increment) threads.append(t) t.start() for t in threads: t.join() print(counter)
上面的代碼中,我們定義了一個全局變量counter
和一個Lock
對象lock
。increment()
函數(shù)用于在循環(huán)中對counter
進行100000次加1操作,而在每次加1之前,我們首先獲取lock
,加1操作完成之后再釋放lock
。這樣保證了多個線程同時對counter
進行操作時,不會產(chǎn)生競爭條件。
另外,還需要注意到,對于每個獲取鎖的線程,一定要記得在合適的地方釋放鎖,否則就會出現(xiàn)死鎖的情況。
在多線程環(huán)境中,多個線程可能同時訪問某個共享資源,這可能導致競態(tài)條件的發(fā)生,從而導致程序出現(xiàn)不可預測的結果。為了避免這種情況的發(fā)生,我們可以使用鎖機制來控制對共享資源的訪問。在使用鎖機制時,需要注意以下幾點:
鎖是一種互斥機制,即同一時刻只能有一個線程持有鎖,其他線程必須等待該線程釋放鎖后才能繼續(xù)執(zhí)行。
在訪問共享資源前,線程需要先獲取鎖。如果鎖已經(jīng)被其他線程持有,則線程會被阻塞,直到其他線程釋放鎖。
在訪問共享資源后,線程需要釋放鎖,以便其他線程可以獲取鎖并訪問共享資源。
在使用鎖時,需要保證所有線程都使用同一個鎖對象。
鎖機制可以用于解決多線程程序中的競態(tài)條件、死鎖和活鎖等問題。
下面我們分別通過例子來說明鎖是如何解決這些問題的。
競態(tài)條件
競態(tài)條件指的是多個線程對共享資源的競爭,導致結果的正確性取決于線程的執(zhí)行順序。
比如,在一個多線程程序中,多個線程同時對同一個變量進行加減操作,結果可能取決于每個線程的執(zhí)行順序,這就是一個典型的競態(tài)條件。
通過使用鎖,可以保證在任何時刻只有一個線程能夠訪問共享資源,從而避免競態(tài)條件的出現(xiàn)。下面的例子演示了如何使用鎖來解決競態(tài)條件:
import threading class Counter: def __init__(self): self.count = 0 self.lock = threading.Lock() def increment(self): with self.lock: self.count += 1 def worker(counter, num_iters): for i in range(num_iters): counter.increment() counter = Counter() num_threads = 10 num_iters = 10000 threads = [threading.Thread(target=worker, args=(counter, num_iters)) for _ in range(num_threads)] for t in threads: t.start() for t in threads: t.join() print(counter.count)
在這個例子中,多個線程對 Counter 對象的 count 屬性進行加 1 操作,這可能會導致競態(tài)條件。
為了避免這種情況,我們使用了一個鎖,通過 with self.lock 來獲取鎖,這樣在任何時刻只有一個線程能夠修改 count 屬性。
這樣,我們就避免了競態(tài)條件的出現(xiàn)。
死鎖
死鎖是指兩個或多個線程在等待彼此釋放資源,從而形成僵局的情況。為了解決死鎖問題,可以使用鎖機制來協(xié)調線程對共享資源的訪問。具體來說,當一個線程獲得鎖時,其他線程必須等待該線程釋放鎖之后才能訪問共享資源,從而避免多個線程同時訪問同一個共享資源而產(chǎn)生死鎖。
例如,考慮一個簡單的場景,其中有兩個線程,分別需要獲取兩個共享資源才能繼續(xù)執(zhí)行。假設這兩個線程在獲取資源時的順序不同,可能會出現(xiàn)死鎖的情況。
import threading resource_a = threading.Lock() resource_b = threading.Lock() def thread_a(): resource_a.acquire() resource_b.acquire() print("Thread A acquired resource A and resource B") resource_b.release() resource_a.release() def thread_b(): resource_b.acquire() resource_a.acquire() print("Thread B acquired resource A and resource B") resource_a.release() resource_b.release() thread1 = threading.Thread(target=thread_a) thread2 = threading.Thread(target=thread_b) thread1.start() thread2.start() thread1.join() thread2.join()
上述代碼中,thread_a
和thread_b
分別獲取resource_a
和resource_b
,但是它們的獲取順序不同。因此,如果這兩個線程同時運行,就有可能發(fā)生死鎖的情況,導致程序卡住。
為了避免死鎖,可以使用鎖機制。修改上述代碼,如下所示:
import threading resource_a = threading.Lock() resource_b = threading.Lock() def thread_a(): resource_a.acquire() resource_b.acquire() print("Thread A acquired resource A and resource B") resource_b.release() resource_a.release() def thread_b(): resource_a.acquire() resource_b.acquire() print("Thread B acquired resource A and resource B") resource_b.release() resource_a.release() thread1 = threading.Thread(target=thread_a) thread2 = threading.Thread(target=thread_b) thread1.start() thread2.start() thread1.join() thread2.join()
在這個示例中,每個線程都按照相同的順序獲取鎖,這樣就避免了死鎖的情況。
活鎖
活鎖是多線程程序中的一種常見問題,它是指線程在嘗試協(xié)調其操作時一直重試,但最終沒有達到進展的狀態(tài)。一個常見的例子是兩個線程互相等待對方釋放其持有的資源。
使用鎖是解決活鎖問題的一種常見方式。當線程需要訪問共享資源時,必須獲得相應的鎖。如果鎖已經(jīng)被其他線程持有,線程將阻塞直到獲得鎖為止。這樣,當多個線程嘗試同時訪問同一共享資源時,只有一個線程能夠獲取鎖,其他線程將被阻塞。
下面是一個使用鎖解決活鎖問題的例子。假設有兩個線程A和B,它們需要同時訪問兩個共享資源x和y,但由于資源x和y的訪問順序不同,線程A需要先獲得x再獲得y,而線程B需要先獲得y再獲得x。如果兩個線程嘗試同時獲取它們需要的資源,就會出現(xiàn)活鎖問題。
使用鎖可以解決這個問題。假設每個線程都先獲取x的鎖,然后再獲取y的鎖,這樣就可以保證每個線程都按照相同的順序獲取資源,避免了死鎖和活鎖的問題。
下面是使用鎖解決活鎖問題的代碼示例:
import threading class Resource: def __init__(self): self.lock1 = threading.Lock() self.lock2 = threading.Lock() def get_x(self): self.lock1.acquire() return "x" def get_y(self): self.lock2.acquire() return "y" def release_x(self): self.lock1.release() def release_y(self): self.lock2.release() def thread_a(resource): while True: x = resource.get_x() y = resource.get_y() print("Thread A got resources x and y") resource.release_x() resource.release_y() def thread_b(resource): while True: y = resource.get_y() x = resource.get_x() print("Thread B got resources y and x") resource.release_y() resource.release_x() if __name__ == "__main__": resource = Resource() a = threading.Thread(target=thread_a, args=(resource,)) b = threading.Thread(target=thread_b, args=(resource,)) a.start() b.start()
在這個例子中,每個線程都使用相同的鎖順序來獲取資源x和y,這樣就避免了活鎖的問題。
使用鎖可能導致執(zhí)行速度慢,但是保證了線程安全
無論是Lock還是RLock,acquire和release都要成對出現(xiàn)
多線程的通信
Python3 中多線程之間的通信方式有以下幾種:
隊列
在 Python 3 中,可以使用隊列(Queue)實現(xiàn)多線程之間的通信。隊列是線程安全的數(shù)據(jù)結構,可以實現(xiàn)線程之間的同步和協(xié)作,避免競爭條件和死鎖問題。
Python 內置了 Queue 模塊,提供了隊列數(shù)據(jù)結構,它可以用于實現(xiàn)多線程之間的安全通信??梢允褂藐犃械?put() 方法往隊列中添加元素,使用 get() 方法從隊列中取出元素。
Queue模塊提供了以下幾個類:
- Queue:基本隊列,實現(xiàn)FIFO(先進先出)的算法。
- LifoQueue:與Queue類似,但是實現(xiàn)了LIFO(后進先出)的算法。
- PriorityQueue:隊列中的每個元素都有一個優(yōu)先級,每次彈出優(yōu)先級最高的元素。
- SimpleQueue:類似于Queue,但是沒有任務協(xié)作的功能,也就是說,不能在進程之間使用。
Queue類中最常用的方法包括:
- put(item[, block[, timeout]]):將一個item放入隊列,如果隊列已滿,block為True則阻塞,直到隊列未滿或超時;block為False時,則拋出queue.Full異常。
- get([block[, timeout]]):從隊列中取出并返回一個item,如果隊列為空,block為True則阻塞,直到隊列不為空或超時;block為False時,則拋出queue.Empty異常。
- task_done():通知隊列,一個先前放入隊列的任務已經(jīng)完成。
- join():阻塞主線程,直到隊列中所有的任務都被處理完。
下面舉一個簡單的例子:
import threading import queue import time # 生產(chǎn)者線程,負責向隊列中添加數(shù)據(jù) class ProducerThread(threading.Thread): def __init__(self, queue, name): threading.Thread.__init__(self) self.queue = queue self.name = name def run(self): for i in range(5): item = "item-" + str(i) self.queue.put(item) print(self.name, "produced", item) time.sleep(1) # 消費者線程,負責從隊列中取出數(shù)據(jù) class ConsumerThread(threading.Thread): def __init__(self, queue, name): threading.Thread.__init__(self) self.queue = queue self.name = name def run(self): while True: item = self.queue.get() if item is None: break print(self.name, "consumed", item) time.sleep(0.5) # 創(chuàng)建一個隊列對象 q = queue.Queue() # 創(chuàng)建兩個線程對象,分別作為生產(chǎn)者和消費者 producer = ProducerThread(q, "Producer") consumer = ConsumerThread(q, "Consumer") # 啟動線程 producer.start() consumer.start() # 等待生產(chǎn)者線程完成生產(chǎn) producer.join() # 停止消費者線程 q.put(None) consumer.join()
在上面的例子中,我們創(chuàng)建了一個生產(chǎn)者線程和一個消費者線程。生產(chǎn)者線程負責向隊列中添加數(shù)據(jù),消費者線程負責從隊列中取出數(shù)據(jù)。生產(chǎn)者線程每隔一秒鐘向隊列中添加一個字符串,消費者線程每隔半秒鐘從隊列中取出一個字符串。為了避免消費者線程在隊列為空時陷入死循環(huán),我們在隊列的末尾放置了一個 None 值,當消費者線程取出該值時,就會退出循環(huán)。
事件(Event)
事件是一種同步對象,可以用于多線程之間的通信,常用于控制線程的執(zhí)行順序??梢允褂檬录?set() 方法設置事件,使用 wait() 方法等待事件被設置,使用 clear() 方法清除事件。
以下是一個使用事件實現(xiàn)多線程間通信的示例代碼:
import threading def worker1(event): print('Worker 1 is waiting...') event.wait() print('Worker 1 is running...') def worker2(event): print('Worker 2 is waiting...') event.wait() print('Worker 2 is running...') event = threading.Event() t1 = threading.Thread(target=worker1, args=(event,)) t2 = threading.Thread(target=worker2, args=(event,)) t1.start() t2.start() print('Main thread is sleeping...') time.sleep(3) event.set() t1.join() t2.join()
該代碼創(chuàng)建了兩個線程,它們都等待事件被設置,當事件被設置后,它們才開始執(zhí)行。在主線程中,先休眠了 3 秒鐘,然后設置了事件,從而喚醒了兩個線程。在實際應用中,事件可以用于控制線程的執(zhí)行順序,或者實現(xiàn)線程之間的協(xié)作。
鎖(Lock)
使用鎖可以實現(xiàn)多線程間的通信,可以通過共享變量和鎖的機制來實現(xiàn)線程間的同步和互斥。具體來說,一個線程需要訪問共享變量時,首先需要獲得鎖,然后讀取或修改共享變量的值,完成操作后再釋放鎖,以便其他線程訪問共享變量。
- 下面是一個簡單的示例代碼,其中兩個線程共享一個變量
counter
,通過鎖的機制來實現(xiàn)對該變量的互斥訪問。
import threading class CounterThread(threading.Thread): def __init__(self, lock): super().__init__() self.lock = lock def run(self): global counter for i in range(100000): with self.lock: counter += 1 if __name__ == '__main__': lock = threading.Lock() counter = 0 threads = [CounterThread(lock) for _ in range(10)] for t in threads: t.start() for t in threads: t.join() print(f'counter = {counter}')
在這個例子中,CounterThread
是一個繼承自 threading.Thread
的線程類,它有一個成員變量 lock
,用于控制對共享變量 counter
的訪問。在 run
方法中,線程會循環(huán)執(zhí)行一定次數(shù)的加操作,每次操作前會獲取鎖并對 counter
做加一操作,完成后再釋放鎖。在主線程中創(chuàng)建了 10 個 CounterThread
線程,并啟動它們進行計數(shù)操作。在所有線程都執(zhí)行完畢后,打印出 counter
的最終值。
使用鎖可以確保多個線程對共享變量的訪問是互斥的,從而避免競態(tài)條件和數(shù)據(jù)損壞等問題。但是,使用鎖也可能會導致性能問題和死鎖等問題,因此需要謹慎使用,并根據(jù)實際情況選擇合適的同步機制。
- 或者
import threading class Counter: def __init__(self): self.value = 0 self.lock = threading.Lock() def increment(self): with self.lock: self.value += 1 def worker(counter, n): for i in range(n): counter.increment() counter = Counter() threads = [] for i in range(10): t = threading.Thread(target=worker, args=(counter, 10000)) threads.append(t) for t in threads: t.start() for t in threads: t.join() print(counter.value)
在這個示例中,我們創(chuàng)建了一個 Counter
類,其中包含一個整數(shù) value
和一個鎖對象 lock
。 increment
方法使用 with
語句獲取鎖并增加 value
的值。
我們還創(chuàng)建了 10 個線程,每個線程都會調用 worker
函數(shù)。這個函數(shù)會循環(huán) 10000 次調用 increment
方法來增加 value
的值。
由于每個線程都會獲取鎖來訪問共享資源,因此只有一個線程可以訪問 increment
方法,避免了多個線程同時修改 value
的值,從而確保了線程安全。最終的輸出結果應該是 100000,即 10 個線程分別增加了 10000 次。
條件變量(Condition)實現(xiàn)多線程間的通信
條件變量(Condition)是Python多線程編程中常用的線程間通信機制之一,它可以用于線程間的通信和同步,提供了一個線程等待另一個線程通知其發(fā)生了某個事件的方法。
下面是一個使用條件變量實現(xiàn)多線程間通信的示例代碼:
import threading import time class Producer(threading.Thread): def __init__(self, name, cond): super().__init__(name=name) self.cond = cond def run(self): for i in range(10): with self.cond: print(f"{self.name} producing item {i}") self.cond.notify() # 通知消費者線程 self.cond.wait() # 等待消費者線程通知 class Consumer(threading.Thread): def __init__(self, name, cond): super().__init__(name=name) self.cond = cond def run(self): for i in range(10): with self.cond: self.cond.wait() # 等待生產(chǎn)者線程通知 print(f"{self.name} consuming item {i}") self.cond.notify() # 通知生產(chǎn)者線程 cond = threading.Condition() producer = Producer("Producer", cond) consumer = Consumer("Consumer", cond) producer.start() consumer.start() producer.join() consumer.join()
在這個示例代碼中,生產(chǎn)者線程通過cond.notify()
方法通知消費者線程,消費者線程通過cond.wait()
方法等待生產(chǎn)者線程的通知。條件變量cond
用于實現(xiàn)線程之間的同步和通信,生產(chǎn)者線程和消費者線程在共享同一把鎖的情況下,通過with self.cond:
語句獲取條件變量的鎖并進入臨界區(qū),確保線程安全。
信號量(Semaphore)實現(xiàn)多線程間的通信
信號量(Semaphore)是一種用于控制并發(fā)訪問共享資源的同步原語。它是一種計數(shù)器,用于控制多個線程對共享資源的訪問。信號量維護一個計數(shù)器,初始值為一個非負整數(shù),每當一個線程訪問共享資源時,計數(shù)器減1;當計數(shù)器為0時,所有試圖訪問共享資源的線程都會被阻塞,直到某個線程釋放了共享資源,此時計數(shù)器加1,被阻塞的線程才有機會繼續(xù)執(zhí)行。
在 Python 中,我們可以使用 threading.Semaphore
類來創(chuàng)建信號量對象。該類的構造函數(shù)接受一個整數(shù)作為參數(shù),表示初始計數(shù)器的值。Semaphore
類有兩個方法,acquire()
和 release()
,分別用于獲取和釋放信號量。
以下是使用信號量實現(xiàn)的簡單示例代碼:
import threading class Producer(threading.Thread): def __init__(self, name, buf, sem): super().__init__(name=name) self.buf = buf self.sem = sem def run(self): for i in range(5): self.sem.acquire() self.buf.append(i) print(f"{self.name} produced {i}") self.sem.release() class Consumer(threading.Thread): def __init__(self, name, buf, sem): super().__init__(name=name) self.buf = buf self.sem = sem def run(self): while True: self.sem.acquire() if not self.buf: self.sem.release() break item = self.buf.pop(0) print(f"{self.name} consumed {item}") self.sem.release() if __name__ == '__main__': buf = [] sem = threading.Semaphore(1) producer = Producer("Producer", buf, sem) consumer1 = Consumer("Consumer1", buf, sem) consumer2 = Consumer("Consumer2", buf, sem) producer.start() consumer1.start() consumer2.start() producer.join() consumer1.join() consumer2.join()
在這個示例代碼中,有一個生產(chǎn)者線程和兩個消費者線程。生產(chǎn)者線程向共享緩沖區(qū)中添加數(shù)據(jù),而消費者線程從緩沖區(qū)中獲取數(shù)據(jù)。為了避免競爭條件,我們使用了信號量。
在生產(chǎn)者線程中,當信號量可用時,它會獲取信號量并添加數(shù)據(jù)到緩沖區(qū)中,然后釋放信號量。在消費者線程中,當信號量可用時,它會獲取信號量并從緩沖區(qū)中獲取數(shù)據(jù),然后釋放信號量。
通過使用信號量,我們可以確保生產(chǎn)者和消費者線程之間的同步,從而避免了競爭條件和死鎖問題。
管道(Pipe)
在 Python3 中,可以使用 multiprocessing
模塊中的 Pipe
類來實現(xiàn)多進程間的通信,也可以用 multiprocessing.connection
模塊來創(chuàng)建多進程間的通信通道。下面的例子是用 Pipe
類來實現(xiàn)多線程間的通信:
import threading from multiprocessing import Pipe def producer(pipe): for i in range(5): pipe.send(i) print(f"{threading.current_thread().name} produced {i}") pipe.close() def consumer(pipe): while True: try: item = pipe.recv() print(f"{threading.current_thread().name} consumed {item}") except EOFError: break if __name__ == '__main__': producer_conn, consumer_conn = Pipe() producer_thread = threading.Thread(target=producer, args=(producer_conn,)) consumer_thread = threading.Thread(target=consumer, args=(consumer_conn,)) producer_thread.start() consumer_thread.start() producer_thread.join() consumer_thread.join()
在這個例子中,我們創(chuàng)建了兩個線程,一個生產(chǎn)者線程和一個消費者線程。它們之間共享一個管道(Pipe
),其中生產(chǎn)者將數(shù)據(jù)寫入管道,而消費者從管道中讀取數(shù)據(jù)。當生產(chǎn)者完成其工作后,它會關閉管道以通知消費者停止運行。
需要注意的是,在 Pipe
中,發(fā)送和接收操作是阻塞的。因此,在發(fā)送或接收數(shù)據(jù)時,如果沒有可用的空間或數(shù)據(jù),線程將被阻塞,直到有數(shù)據(jù)可用或空間可用。
定時器Timer
定時器(Timer)是Python中的一個線程類,它可以在一定時間之后調用指定的函數(shù)或方法。Timer是繼承自Thread類的,因此可以像Thread一樣啟動、停止和等待它。
定時器的構造函數(shù)如下:
class threading.Timer(interval, function, args=None, kwargs=None)
這個程序中,我們創(chuàng)建了一個定時器t
,它會在5秒后調用hello
函數(shù),并啟動定時器。程序將在啟動定時器后立即繼續(xù)執(zhí)行,而定時器則在后臺等待5秒,然后調用hello
函數(shù)。
如果我們想要停止定時器,可以使用cancel()
方法:
t.cancel() # 停止定時器
需要注意的是,如果定時器已經(jīng)超時并且在調用函數(shù)之前被取消,那么函數(shù)將不會被調用。因此,需要在調用cancel()
方法之前等待定時器超時。
python3線程池
concurrent.futures實現(xiàn)多線程
Python 3中的線程池是一種常見的多線程編程模型,可以提高多線程程序的性能和可維護性。在Python 3中,線程池可以通過標準庫中的concurrent.futures
模塊來實現(xiàn)。
concurrent.futures
模塊定義了兩個類:ThreadPoolExecutor
和ProcessPoolExecutor
。這兩個類都實現(xiàn)了Python 3中的執(zhí)行器(Executor)接口,提供了一種方便的方式來異步執(zhí)行函數(shù)或方法,并返回其結果。
Exectuor 提供了如下常用方法:
submit(fn, *args, **kwargs)
:將 fn 函數(shù)提交給線程池。*args 代表傳給 fn 函數(shù)的參數(shù),*kwargs 代表以關鍵字參數(shù)的形式為 fn 函數(shù)傳入?yún)?shù)。map(func, *iterables, timeout=None, chunksize=1)
:該函數(shù)類似于全局函數(shù)map(func, *iterables)
,只是該函數(shù)將會啟動多個線程,以異步方式立即對 iterables 執(zhí)行 map 處理。超時拋出TimeoutError錯誤。返回每個函數(shù)的結果,注意不是返回future。shutdown(wait=True)
:關閉線程池。關閉之后線程池不再接受新任務,但會將之前提交的任務完成。
程序將task函數(shù)submit給線程池后,會返回一個Future對象,F(xiàn)uture主要用來獲取task的返回值。
Future 提供了如下方法:
cancel()
:取消該 Future 代表的線程任務。如果該任務正在執(zhí)行,不可取消,則該方法返回 False;否則,程序會取消該任務,并返回 True。cancelled()
:返回 Future 代表的線程任務是否被成功取消。running()
:如果該 Future 代表的線程任務正在執(zhí)行、不可被取消,該方法返回 True。done()
:如果該 Funture 代表的線程任務被成功取消或執(zhí)行完成,則該方法返回 True。result(timeout=None)
:獲取該 Future 代表的線程任務最后返回的結果。如果 Future 代表的線程任務還未完成,該方法將會阻塞當前線程,其中 timeout 參數(shù)指定最多阻塞多少秒。超時拋出TimeoutError,取消拋出CancelledError。exception(timeout=None)
:獲取該 Future 代表的線程任務所引發(fā)的異常。如果該任務成功完成,沒有異常,則該方法返回 None。add_done_callback(fn)
:為該 Future 代表的線程任務注冊一個“回調函數(shù)”,當該任務成功完成時,程序會自動觸發(fā)該 fn 函數(shù),參數(shù)是future。
使用線程池來執(zhí)行線程任務的步驟如下:
- 調用 ThreadPoolExecutor 類的構造器創(chuàng)建一個線程池。
- 定義一個普通函數(shù)作為線程任務。
- 調用 ThreadPoolExecutor 對象的 submit() 方法來提交線程任務。
- 當不想提交任何任務時,調用 ThreadPoolExecutor 對象的 shutdown() 方法來關閉線程池。
ThreadPoolExecutor
是一個線程池執(zhí)行器,可以用來執(zhí)行異步任務,它管理著一個線程池,其中包含若干個線程。當一個任務被提交給執(zhí)行器時,執(zhí)行器會將其分配給一個線程來執(zhí)行。當線程池中的所有線程都在執(zhí)行任務時,新提交的任務會被放入隊列中,直到有可用的線程為止。
以下是一個使用ThreadPoolExecutor
的簡單示例:
from concurrent.futures import ThreadPoolExecutor import time def worker(num): print(f"Worker {num} starting") time.sleep(1) print(f"Worker {num} finished") if __name__ == '__main__': with ThreadPoolExecutor(max_workers=3) as executor: futures = [executor.submit(worker, i) for i in range(5)] for future in concurrent.futures.as_completed(futures): try: result = future.result() except Exception as e: print(f"Task raised an exception: {e}") else: print(f"Task returned: {result}")
在這個例子中,我們創(chuàng)建了一個線程池執(zhí)行器,并指定了最大線程數(shù)為3。然后我們循環(huán)提交5個任務給執(zhí)行器,每個任務都是一個worker
函數(shù),并傳入不同的參數(shù)。由于我們設置了最大線程數(shù)為3,所以只會有3個任務同時被執(zhí)行,另外2個任務會在之后的某個時間點被執(zhí)行。
執(zhí)行結果會存儲在Future
對象中,我們可以通過as_completed()
方法獲取任務的執(zhí)行結果。如果任務執(zhí)行過程中發(fā)生了異常,result()
方法會拋出相應的異常。否則,它會返回任務的執(zhí)行結果。
ThreadPoolExecutor
還有其他一些有用的方法,例如shutdown()
方法可以等待所有已提交的任務執(zhí)行完畢并關閉線程池。
總之,Python 3中的線程池提供了一種方便的方式來執(zhí)行異步任務,可以大大提高多線程程序的性能和可維護性。
使用線程池的好處和場景
使用線程池的優(yōu)點是可以避免線程的頻繁創(chuàng)建和銷毀,從而提高線程的利用率,減少系統(tǒng)的開銷。因此,當需要頻繁執(zhí)行短時間的任務時,可以考慮使用線程池。例如:
- 網(wǎng)絡服務器:在服務器端接收客戶端請求后,可以使用線程池來處理客戶端請求,以提高服務器的并發(fā)性能。
- 圖像處理:在圖像處理過程中,需要頻繁啟動和停止線程來處理每個像素點的計算,使用線程池可以減少線程的創(chuàng)建和銷毀,提高處理效率。
- 數(shù)據(jù)庫連接池:在數(shù)據(jù)庫操作中,需要頻繁創(chuàng)建和銷毀數(shù)據(jù)庫連接,使用線程池可以減少這種開銷,提高數(shù)據(jù)庫操作的效率。
總之,當需要頻繁執(zhí)行短時間的任務時,可以考慮使用線程池來優(yōu)化程序性能。
以上就是Python3多線程詳解的詳細內容,更多關于Python3多線程的資料請關注腳本之家其它相關文章!
相關文章
python利用thrift服務讀取hbase數(shù)據(jù)的方法
今天小編就為大家分享一篇python利用thrift服務讀取hbase數(shù)據(jù)的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-12-12python wxpython 實現(xiàn)界面跳轉功能
wxpython沒提供界面跳轉的方式,所以就需要借助threading模塊,本文給大家分享python wxpython 實現(xiàn)界面跳轉功能,感興趣的朋友跟隨小編一起看看吧2019-12-12SpringMVC和SpringBoot接收參數(shù)的幾種方式詳解
這篇文章主要介紹了SpringMVC和SpringBoot接收參數(shù)的幾種方式詳解,Spring是分層的JavaSE/EE應用輕量級開源框架,以IoC和AOP為內核,提供了展現(xiàn)層 Spring MVC和持久層Spring JDBC以及業(yè)務層事務管理等眾多的企業(yè)級應用技術,需要的朋友可以參考下2023-07-07