欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python3多線程詳解

 更新時間:2023年05月08日 09:28:12   作者:答案永恒  
使用多線程,可以同時進行多項任務,可以使用戶界面更友好,還可以后臺執(zhí)行某些用時長的任務,同時具有易于通信的優(yōu)點。python3中多線程的實現(xiàn)使用了threading模塊,它允許同一進程中運行多個線程,本文介紹的非常詳細,,需要的朋友可以參考下

為什么要使用多線程?

使用多線程,可以同時進行多項任務,可以使用戶界面更友好,還可以后臺執(zhí)行某些用時長的任務,同時具有易于通信的優(yōu)點。

python3中多線程的實現(xiàn)使用了threading模塊,它允許同一進程中運行多個線程。

如何創(chuàng)建和執(zhí)行一個線程

一般我們有兩種方法來創(chuàng)建線程,一種是以某個函數(shù)來作為起點,另一種是繼承Thread類。

方法一

獲取一個Thread對象,構造參數(shù)中target是起點函數(shù),注意不要加括號。假如起點函數(shù)有參數(shù),則可以通過args輸入元組參數(shù)或者kwargs輸入字典參數(shù)。

#! -*-conding=: UTF-8 -*-
# 2023/5/6 15:53
import time
from threading import Thread


def task():
    print("另外開始一個子線程做任務啦")
    time.sleep(1)  # 用time.sleep模擬任務耗時
    print("子線程任務結束啦")


if __name__ == '__main__':
    print("這里是主線程")
    # 創(chuàng)建線程對象
    t1 = Thread(target=task)
    # 啟動
    t1.start()
    time.sleep(0.3)
    print("主線程依然可以干別的事")

輸出結果為:

這里是主線程
另外開始一個子線程做任務啦
主線程依然可以干別的事
子線程任務結束啦

方法二

#! -*-conding=: UTF-8 -*-
# 2023/5/6 15:53
import time
from threading import Thread


class NewThread(Thread):
    def __init__(self):
        Thread.__init__(self)  # 必須步驟

    def run(self):  # 入口是名字為run的方法
        print("開始新的線程做一個任務啦")
        time.sleep(1)  # 用time.sleep模擬任務耗時
        print("這個新線程中的任務結束啦")


if __name__ == '__main__':
    print("這里是主線程")
    # 創(chuàng)建線程對象
    t1 = NewThread()
    # 啟動
    t1.start()
    time.sleep(0.3)  # 這里如果主線程結束,子線程會立刻退出,暫時先用sleep規(guī)避
    print("主線程依然可以干別的事")

正式介紹threading模塊

docs.python.org/3/library/t…

 

關于線程信息的函數(shù):

  • threading.active_count():返回當前存活的Thread對象數(shù)量。
  • threading.current_thread():返回當前線程的Thread對象。
  • threading.enumerate():列表形式返回所有存活的Thread對象。
  • threading.main_thread():返回主Thread對象。

Thread對象的方法及屬性:

  • Thread.name:線程的名字,沒有語義,可以相同名稱。
  • Thread.ident:線程標識符,非零整數(shù)。
  • Thread.Daemon:是否為守護線程。
  • Thread.is_alive():是否存活。
  • Thread.start():開始線程活動。若多次調用拋出RuntimeError。
  • Thread.run():用來重載的,
  • Thread.join(timeout=None):等待直到線程正?;虍惓=Y束。尚未開始拋出RuntimeError
  • Thread(group=None, target=None, name=None, args=(), kwargs={}, *, deamon=None):構造函數(shù)。

守護線程 Daemon

在Python 3中,守護線程(daemon thread)是一種特殊的線程,它在程序運行時在后臺運行,不會阻止程序的退出。當主線程退出時,守護線程也會自動退出,而不需要等待它執(zhí)行完畢。

方法一

在創(chuàng)建線程對象時,可以通過設置daemon屬性為True來創(chuàng)建守護線程,例如:

import threading
import time

def worker():
    while True:
        print('Worker thread running')
        time.sleep(1)

# 創(chuàng)建守護線程
t = threading.Thread(target=worker, daemon=True)
# 啟動線程
t.start()

# 主線程執(zhí)行一些操作
print('Main thread running')
time.sleep(5)
print('Main thread finished')

在這個示例中,我們創(chuàng)建了一個守護線程worker(),并將daemon屬性設置為True。在主線程中,我們執(zhí)行了一些操作,并休眠5秒鐘。由于守護線程的存在,即使主線程已經(jīng)結束,守護線程仍會在后臺運行。

方法二

設置守護線程用Thread.setDaemon(bool)

#! -*-conding=: UTF-8 -*-
# 2023/5/6 16:06


import time
from threading import Thread


def task1():
    print("開始子線程1做任務1啦")
    time.sleep(1)  # 用time.sleep模擬任務耗時
    print("子線程1中的任務1結束啦")


def task2():
    print("開始子線程2做任務2啦")
    for i in range(5):
        print("任務2-{}".format(i))
        time.sleep(1)
    print("子線程2中的任務2結束啦")


if __name__ == '__main__':
    print("這里是主線程")
    # 創(chuàng)建線程對象
    t1 = Thread(target=task1)
    t2 = Thread(target=task2)
    t2.setDaemon(True)  # 設置為守護進程,必須在start之前
    # 啟動
    t1.start()
    t2.start()
    time.sleep(1)
    print("主線程結束了")

輸出結果為:

這里是主線程
開始子線程1做任務1啦
開始子線程2做任務2啦
任務2-0
主線程結束了
子線程1中的任務1結束啦任務2-1

守護線程的作用在于,當我們需要在程序運行時執(zhí)行一些后臺任務,但是不想讓這些任務阻止程序的正常退出時,可以使用守護線程。
例如,在一個Web應用程序中,我們可能需要啟動一個守護線程來定期清理緩存或者執(zhí)行一些后臺任務。

需要注意的是,守護線程無法完全控制其執(zhí)行過程,因此不能用于一些必須在程序退出之前完成的任務。同時,守護線程不能訪問一些主線程資源,例如共享內存或者打開的文件,因為這些資源可能會在主線程結束時被釋放。

讓主線程等待子線程結束 join

假如要讓主線程等子線程結束,那么可以使用Thread.join()方法。

當調用線程對象的join()方法時,主線程將被阻塞,直到該線程執(zhí)行完成或者超時。

以下是一個簡單的示例:

import threading
import time

def worker():
    print('Worker thread started')
    time.sleep(2)
    print('Worker thread finished')

# 創(chuàng)建線程對象
t = threading.Thread(target=worker)
# 啟動線程
t.start()

# 等待線程結束
t.join()

# 主線程繼續(xù)執(zhí)行
print('Main thread finished')

在這個示例中,我們創(chuàng)建了一個子線程worker(),并使用start()方法啟動線程。在主線程中,我們調用了線程對象的join()方法,讓主線程等待子線程執(zhí)行完畢。在子線程執(zhí)行完畢后,主線程繼續(xù)執(zhí)行。

需要注意的是,join()方法還可以設置超時時間,以避免無限期等待線程的執(zhí)行。例如:

import threading
import time

def worker():
    print('Worker thread started')
    time.sleep(2)
    print('Worker thread finished')

# 創(chuàng)建線程對象
t = threading.Thread(target=worker)
# 啟動線程
t.start()

# 等待線程結束,最多等待3秒鐘
t.join(3)

# 主線程繼續(xù)執(zhí)行
print('Main thread finished')

在這個示例中,我們設置了join()方法的超時時間為3秒鐘,即使子線程沒有執(zhí)行完成,主線程也會在3秒鐘后繼續(xù)執(zhí)行。

線程共享資源可能引起什么問題?

在線程編程中,多個線程可能同時訪問和修改同一個共享資源,例如全局變量、共享內存、文件等。如果沒有進行適當?shù)耐讲僮?,就可能會引發(fā)以下問題:

競態(tài)條件(Race Condition):當多個線程同時訪問和修改同一個共享資源時,就可能會發(fā)生競態(tài)條件。這種情況下,由于線程執(zhí)行順序的不確定性,可能會導致資源被錯誤地讀取或寫入,從而引發(fā)程序的錯誤或崩潰。

死鎖(Deadlock):當多個線程都在等待另一個線程釋放某個資源時,就可能會發(fā)生死鎖。這種情況下,程序會永久地阻塞在這個狀態(tài)下,無法繼續(xù)執(zhí)行。

活鎖(Livelock):多個線程相互協(xié)作,但是由于某些原因無法前進,導致它們不斷重試,最終導致系統(tǒng)陷入死循環(huán)?;铈i是一種比死鎖更難以診斷和解決的問題。

為了避免以上問題,我們可以使用線程同步機制來保護共享資源的訪問。

例如,可以使用鎖(Lock)、信號量(Semaphore)條件變量(Condition)等機制來限制同時訪問共享資源的線程數(shù)量,從而避免競態(tài)條件。同時,也可以使用一些算法和策略來避免死鎖和活鎖等問題的發(fā)生。

下面是一些具體的例子,說明在多線程程序中共享資源可能引發(fā)的問題:

競態(tài)條件

import threading

x = 0

def increment():
    global x
    x += 1

threads = []
for i in range(1000):
    t = threading.Thread(target=increment)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print('x =', x)

在這個例子中,我們創(chuàng)建了1000個線程來執(zhí)行increment()函數(shù),這個函數(shù)會對全局變量x進行加1操作。由于多個線程同時訪問和修改x變量,就會產(chǎn)生競態(tài)條件,導致x的最終值可能小于1000。

死鎖

import threading

lock1 = threading.Lock()
lock2 = threading.Lock()

def worker1():
    print('Worker 1 acquiring lock 1')
    lock1.acquire()
    print('Worker 1 acquired lock 1')
    print('Worker 1 acquiring lock 2')
    lock2.acquire()
    print('Worker 1 acquired lock 2')
    lock2.release()
    lock1.release()

def worker2():
    print('Worker 2 acquiring lock 2')
    lock2.acquire()
    print('Worker 2 acquired lock 2')
    print('Worker 2 acquiring lock 1')
    lock1.acquire()
    print('Worker 2 acquired lock 1')
    lock1.release()
    lock2.release()

t1 = threading.Thread(target=worker1)
t2 = threading.Thread(target=worker2)
t1.start()
t2.start()
t1.join()
t2.join()

print('Main thread finished')

在這個例子中,我們創(chuàng)建了兩個線程worker1()worker2(),它們都需要同時獲取lock1lock2兩個鎖來執(zhí)行操作。由于worker1()先獲取lock1,然后嘗試獲取lock2,而worker2()先獲取lock2,然后嘗試獲取lock1,就可能會產(chǎn)生死鎖的情況。

活鎖

import threading

class Account:
    def __init__(self, balance):
        self.balance = balance
        self.lock = threading.Lock()

    def withdraw(self, amount):
        with self.lock:
            if self.balance < amount:
                print('Withdraw failed: not enough balance')
                return False
            print(f'Withdraw {amount} from account')
            self.balance -= amount
            return True

    def transfer(self, target, amount):
        while True:
            if self.withdraw(amount):
                if target.deposit(amount):
                    return True
                else:
                    self.deposit(amount)
            else:
                return False

    def deposit(self, amount):
        with self.lock:
            print(f'Deposit {amount} to account')
            self.balance += amount
            return True

def worker1(acc1, acc2):
    while True:
        acc1.transfer(acc2, 100)
        print('Worker 1: transfer complete')

def worker2(acc1, acc2):
    while True:
        acc2.transfer(acc1, 100)
        print('Worker 2: transfer complete')

acc1 = Account(1000)
acc2 = Account(1000)
t1 = threading.Thread(target=worker1, args=(acc1, acc2))
t2 = threading.Thread(target=worker2, args=(acc1, acc2))
t1.start()
t2.start()
t1.join()
t2.join()

在這個例子中,我們創(chuàng)建了兩個賬戶acc1acc2,并創(chuàng)建了兩個線程worker1()worker2(),它們不斷地在這兩個賬戶之間轉賬。

由于transfer()方法中需要獲取鎖來修改賬戶余額,但是兩個線程的執(zhí)行順序可能會導致它們同時等待對方釋放鎖,從而無法前進,最終導致系統(tǒng)陷入活鎖的狀態(tài)。

具體來說,假設worker1()執(zhí)行了acc1.transfer(acc2, 100),然后進入了transfer()方法中的if self.withdraw(amount)分支,在等待acc1的鎖。

此時,worker2()執(zhí)行了acc2.transfer(acc1, 100),然后也進入了transfer()方法中的if self.withdraw(amount)分支,在等待acc2的鎖。由于acc1acc2之間的轉賬是相互依賴的,因此這兩個線程無法前進,會一直重試,最終導致系統(tǒng)陷入活鎖的狀態(tài)。

多線程的鎖機制

在Python3中,鎖機制是一種線程同步機制,它用于協(xié)調多個線程的并發(fā)訪問共享資源,以避免競態(tài)條件的發(fā)生。

Python 3中的多線程鎖機制主要是通過threading模塊中的Lock、RLockSemaphore等類來實現(xiàn)的。

Lock類是最基本的鎖,它提供了兩個基本方法acquire()release(),用于獲取鎖和釋放鎖。當一個線程調用acquire()方法時,如果該鎖沒有被其他線程獲取,則該線程獲取到該鎖并進入臨界區(qū),否則該線程就會被阻塞,直到該鎖被其他線程釋放為止。

RLock類是可重入鎖,它允許同一個線程多次獲取該鎖,每次獲取都必須有對應的釋放操作。如果一個線程已經(jīng)獲取到該鎖,它可以再次獲取該鎖而不被阻塞,這就是可重入的特性。RLock類提供了acquire()release()方法,與Lock類相同。

Semaphore類是信號量,它與鎖類似,但可以允許多個線程同時訪問某個資源,而不是像鎖一樣只允許一個線程訪問。它提供了acquire()release()方法,用于獲取和釋放資源。

下面是一個使用Lock類的示例代碼:

import threading

counter = 0
lock = threading.Lock()

def increment():
    global counter
    for i in range(100000):
        lock.acquire()
        counter += 1
        lock.release()

threads = []
for i in range(10):
    t = threading.Thread(target=increment)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(counter)

上面的代碼中,我們定義了一個全局變量counter和一個Lock對象lock。increment()函數(shù)用于在循環(huán)中對counter進行100000次加1操作,而在每次加1之前,我們首先獲取lock,加1操作完成之后再釋放lock。這樣保證了多個線程同時對counter進行操作時,不會產(chǎn)生競爭條件。

另外,還需要注意到,對于每個獲取鎖的線程,一定要記得在合適的地方釋放鎖,否則就會出現(xiàn)死鎖的情況。

在多線程環(huán)境中,多個線程可能同時訪問某個共享資源,這可能導致競態(tài)條件的發(fā)生,從而導致程序出現(xiàn)不可預測的結果。為了避免這種情況的發(fā)生,我們可以使用鎖機制來控制對共享資源的訪問。在使用鎖機制時,需要注意以下幾點:

  • 鎖是一種互斥機制,即同一時刻只能有一個線程持有鎖,其他線程必須等待該線程釋放鎖后才能繼續(xù)執(zhí)行。

  • 在訪問共享資源前,線程需要先獲取鎖。如果鎖已經(jīng)被其他線程持有,則線程會被阻塞,直到其他線程釋放鎖。

  • 在訪問共享資源后,線程需要釋放鎖,以便其他線程可以獲取鎖并訪問共享資源。

  • 在使用鎖時,需要保證所有線程都使用同一個鎖對象。

鎖機制可以用于解決多線程程序中的競態(tài)條件、死鎖和活鎖等問題。

下面我們分別通過例子來說明鎖是如何解決這些問題的。

競態(tài)條件

競態(tài)條件指的是多個線程對共享資源的競爭,導致結果的正確性取決于線程的執(zhí)行順序。

比如,在一個多線程程序中,多個線程同時對同一個變量進行加減操作,結果可能取決于每個線程的執(zhí)行順序,這就是一個典型的競態(tài)條件。

通過使用鎖,可以保證在任何時刻只有一個線程能夠訪問共享資源,從而避免競態(tài)條件的出現(xiàn)。下面的例子演示了如何使用鎖來解決競態(tài)條件:

import threading

class Counter:
    def __init__(self):
        self.count = 0
        self.lock = threading.Lock()

    def increment(self):
        with self.lock:
            self.count += 1

def worker(counter, num_iters):
    for i in range(num_iters):
        counter.increment()

counter = Counter()
num_threads = 10
num_iters = 10000
threads = [threading.Thread(target=worker, args=(counter, num_iters)) for _ in range(num_threads)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(counter.count)

在這個例子中,多個線程對 Counter 對象的 count 屬性進行加 1 操作,這可能會導致競態(tài)條件。

為了避免這種情況,我們使用了一個鎖,通過 with self.lock 來獲取鎖,這樣在任何時刻只有一個線程能夠修改 count 屬性。

這樣,我們就避免了競態(tài)條件的出現(xiàn)。

死鎖

死鎖是指兩個或多個線程在等待彼此釋放資源,從而形成僵局的情況。為了解決死鎖問題,可以使用鎖機制來協(xié)調線程對共享資源的訪問。具體來說,當一個線程獲得鎖時,其他線程必須等待該線程釋放鎖之后才能訪問共享資源,從而避免多個線程同時訪問同一個共享資源而產(chǎn)生死鎖。

例如,考慮一個簡單的場景,其中有兩個線程,分別需要獲取兩個共享資源才能繼續(xù)執(zhí)行。假設這兩個線程在獲取資源時的順序不同,可能會出現(xiàn)死鎖的情況。

import threading

resource_a = threading.Lock()
resource_b = threading.Lock()

def thread_a():
    resource_a.acquire()
    resource_b.acquire()
    print("Thread A acquired resource A and resource B")
    resource_b.release()
    resource_a.release()

def thread_b():
    resource_b.acquire()
    resource_a.acquire()
    print("Thread B acquired resource A and resource B")
    resource_a.release()
    resource_b.release()

thread1 = threading.Thread(target=thread_a)
thread2 = threading.Thread(target=thread_b)

thread1.start()
thread2.start()

thread1.join()
thread2.join()

上述代碼中,thread_athread_b分別獲取resource_aresource_b,但是它們的獲取順序不同。因此,如果這兩個線程同時運行,就有可能發(fā)生死鎖的情況,導致程序卡住。

為了避免死鎖,可以使用鎖機制。修改上述代碼,如下所示:

import threading

resource_a = threading.Lock()
resource_b = threading.Lock()

def thread_a():
    resource_a.acquire()
    resource_b.acquire()
    print("Thread A acquired resource A and resource B")
    resource_b.release()
    resource_a.release()

def thread_b():
    resource_a.acquire()
    resource_b.acquire()
    print("Thread B acquired resource A and resource B")
    resource_b.release()
    resource_a.release()

thread1 = threading.Thread(target=thread_a)
thread2 = threading.Thread(target=thread_b)

thread1.start()
thread2.start()

thread1.join()
thread2.join()

在這個示例中,每個線程都按照相同的順序獲取鎖,這樣就避免了死鎖的情況。

活鎖

活鎖是多線程程序中的一種常見問題,它是指線程在嘗試協(xié)調其操作時一直重試,但最終沒有達到進展的狀態(tài)。一個常見的例子是兩個線程互相等待對方釋放其持有的資源。

使用鎖是解決活鎖問題的一種常見方式。當線程需要訪問共享資源時,必須獲得相應的鎖。如果鎖已經(jīng)被其他線程持有,線程將阻塞直到獲得鎖為止。這樣,當多個線程嘗試同時訪問同一共享資源時,只有一個線程能夠獲取鎖,其他線程將被阻塞。

下面是一個使用鎖解決活鎖問題的例子。假設有兩個線程A和B,它們需要同時訪問兩個共享資源x和y,但由于資源x和y的訪問順序不同,線程A需要先獲得x再獲得y,而線程B需要先獲得y再獲得x。如果兩個線程嘗試同時獲取它們需要的資源,就會出現(xiàn)活鎖問題。

使用鎖可以解決這個問題。假設每個線程都先獲取x的鎖,然后再獲取y的鎖,這樣就可以保證每個線程都按照相同的順序獲取資源,避免了死鎖和活鎖的問題。

下面是使用鎖解決活鎖問題的代碼示例:

import threading

class Resource:
    def __init__(self):
        self.lock1 = threading.Lock()
        self.lock2 = threading.Lock()

    def get_x(self):
        self.lock1.acquire()
        return "x"

    def get_y(self):
        self.lock2.acquire()
        return "y"

    def release_x(self):
        self.lock1.release()

    def release_y(self):
        self.lock2.release()

def thread_a(resource):
    while True:
        x = resource.get_x()
        y = resource.get_y()
        print("Thread A got resources x and y")
        resource.release_x()
        resource.release_y()

def thread_b(resource):
    while True:
        y = resource.get_y()
        x = resource.get_x()
        print("Thread B got resources y and x")
        resource.release_y()
        resource.release_x()

if __name__ == "__main__":
    resource = Resource()
    a = threading.Thread(target=thread_a, args=(resource,))
    b = threading.Thread(target=thread_b, args=(resource,))
    a.start()
    b.start()

在這個例子中,每個線程都使用相同的鎖順序來獲取資源x和y,這樣就避免了活鎖的問題。

使用鎖可能導致執(zhí)行速度慢,但是保證了線程安全
無論是Lock還是RLock,acquire和release都要成對出現(xiàn)

多線程的通信

Python3 中多線程之間的通信方式有以下幾種:

隊列

在 Python 3 中,可以使用隊列(Queue)實現(xiàn)多線程之間的通信。隊列是線程安全的數(shù)據(jù)結構,可以實現(xiàn)線程之間的同步和協(xié)作,避免競爭條件和死鎖問題。

Python 內置了 Queue 模塊,提供了隊列數(shù)據(jù)結構,它可以用于實現(xiàn)多線程之間的安全通信??梢允褂藐犃械?put() 方法往隊列中添加元素,使用 get() 方法從隊列中取出元素。

Queue模塊提供了以下幾個類:

  • Queue:基本隊列,實現(xiàn)FIFO(先進先出)的算法。
  • LifoQueue:與Queue類似,但是實現(xiàn)了LIFO(后進先出)的算法。
  • PriorityQueue:隊列中的每個元素都有一個優(yōu)先級,每次彈出優(yōu)先級最高的元素。
  • SimpleQueue:類似于Queue,但是沒有任務協(xié)作的功能,也就是說,不能在進程之間使用。

Queue類中最常用的方法包括:

  • put(item[, block[, timeout]]):將一個item放入隊列,如果隊列已滿,block為True則阻塞,直到隊列未滿或超時;block為False時,則拋出queue.Full異常。
  • get([block[, timeout]]):從隊列中取出并返回一個item,如果隊列為空,block為True則阻塞,直到隊列不為空或超時;block為False時,則拋出queue.Empty異常。
  • task_done():通知隊列,一個先前放入隊列的任務已經(jīng)完成。
  • join():阻塞主線程,直到隊列中所有的任務都被處理完。

下面舉一個簡單的例子:

import threading
import queue
import time

# 生產(chǎn)者線程,負責向隊列中添加數(shù)據(jù)
class ProducerThread(threading.Thread):
    def __init__(self, queue, name):
        threading.Thread.__init__(self)
        self.queue = queue
        self.name = name

    def run(self):
        for i in range(5):
            item = "item-" + str(i)
            self.queue.put(item)
            print(self.name, "produced", item)
            time.sleep(1)

# 消費者線程,負責從隊列中取出數(shù)據(jù)
class ConsumerThread(threading.Thread):
    def __init__(self, queue, name):
        threading.Thread.__init__(self)
        self.queue = queue
        self.name = name

    def run(self):
        while True:
            item = self.queue.get()
            if item is None:
                break
            print(self.name, "consumed", item)
            time.sleep(0.5)

# 創(chuàng)建一個隊列對象
q = queue.Queue()

# 創(chuàng)建兩個線程對象,分別作為生產(chǎn)者和消費者
producer = ProducerThread(q, "Producer")
consumer = ConsumerThread(q, "Consumer")

# 啟動線程
producer.start()
consumer.start()

# 等待生產(chǎn)者線程完成生產(chǎn)
producer.join()

# 停止消費者線程
q.put(None)
consumer.join()

在上面的例子中,我們創(chuàng)建了一個生產(chǎn)者線程和一個消費者線程。生產(chǎn)者線程負責向隊列中添加數(shù)據(jù),消費者線程負責從隊列中取出數(shù)據(jù)。生產(chǎn)者線程每隔一秒鐘向隊列中添加一個字符串,消費者線程每隔半秒鐘從隊列中取出一個字符串。為了避免消費者線程在隊列為空時陷入死循環(huán),我們在隊列的末尾放置了一個 None 值,當消費者線程取出該值時,就會退出循環(huán)。

事件(Event)

事件是一種同步對象,可以用于多線程之間的通信,常用于控制線程的執(zhí)行順序??梢允褂檬录?set() 方法設置事件,使用 wait() 方法等待事件被設置,使用 clear() 方法清除事件。

以下是一個使用事件實現(xiàn)多線程間通信的示例代碼:

import threading

def worker1(event):
    print('Worker 1 is waiting...')
    event.wait()
    print('Worker 1 is running...')

def worker2(event):
    print('Worker 2 is waiting...')
    event.wait()
    print('Worker 2 is running...')

event = threading.Event()

t1 = threading.Thread(target=worker1, args=(event,))
t2 = threading.Thread(target=worker2, args=(event,))

t1.start()
t2.start()

print('Main thread is sleeping...')
time.sleep(3)
event.set()

t1.join()
t2.join()

該代碼創(chuàng)建了兩個線程,它們都等待事件被設置,當事件被設置后,它們才開始執(zhí)行。在主線程中,先休眠了 3 秒鐘,然后設置了事件,從而喚醒了兩個線程。在實際應用中,事件可以用于控制線程的執(zhí)行順序,或者實現(xiàn)線程之間的協(xié)作。

鎖(Lock)

使用鎖可以實現(xiàn)多線程間的通信,可以通過共享變量和鎖的機制來實現(xiàn)線程間的同步和互斥。具體來說,一個線程需要訪問共享變量時,首先需要獲得鎖,然后讀取或修改共享變量的值,完成操作后再釋放鎖,以便其他線程訪問共享變量。

  • 下面是一個簡單的示例代碼,其中兩個線程共享一個變量 counter,通過鎖的機制來實現(xiàn)對該變量的互斥訪問。
import threading

class CounterThread(threading.Thread):
    def __init__(self, lock):
        super().__init__()
        self.lock = lock

    def run(self):
        global counter
        for i in range(100000):
            with self.lock:
                counter += 1

if __name__ == '__main__':
    lock = threading.Lock()
    counter = 0
    threads = [CounterThread(lock) for _ in range(10)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    print(f'counter = {counter}')

在這個例子中,CounterThread 是一個繼承自 threading.Thread 的線程類,它有一個成員變量 lock,用于控制對共享變量 counter 的訪問。在 run 方法中,線程會循環(huán)執(zhí)行一定次數(shù)的加操作,每次操作前會獲取鎖并對 counter 做加一操作,完成后再釋放鎖。在主線程中創(chuàng)建了 10 個 CounterThread 線程,并啟動它們進行計數(shù)操作。在所有線程都執(zhí)行完畢后,打印出 counter 的最終值。

使用鎖可以確保多個線程對共享變量的訪問是互斥的,從而避免競態(tài)條件和數(shù)據(jù)損壞等問題。但是,使用鎖也可能會導致性能問題和死鎖等問題,因此需要謹慎使用,并根據(jù)實際情況選擇合適的同步機制。

  • 或者
import threading

class Counter:
    def __init__(self):
        self.value = 0
        self.lock = threading.Lock()

    def increment(self):
        with self.lock:
            self.value += 1

def worker(counter, n):
    for i in range(n):
        counter.increment()

counter = Counter()
threads = []
for i in range(10):
    t = threading.Thread(target=worker, args=(counter, 10000))
    threads.append(t)

for t in threads:
    t.start()

for t in threads:
    t.join()

print(counter.value)

在這個示例中,我們創(chuàng)建了一個 Counter 類,其中包含一個整數(shù) value 和一個鎖對象 lock。 increment 方法使用 with 語句獲取鎖并增加 value 的值。

我們還創(chuàng)建了 10 個線程,每個線程都會調用 worker 函數(shù)。這個函數(shù)會循環(huán) 10000 次調用 increment 方法來增加 value 的值。

由于每個線程都會獲取鎖來訪問共享資源,因此只有一個線程可以訪問 increment 方法,避免了多個線程同時修改 value 的值,從而確保了線程安全。最終的輸出結果應該是 100000,即 10 個線程分別增加了 10000 次。

條件變量(Condition)實現(xiàn)多線程間的通信

條件變量(Condition)是Python多線程編程中常用的線程間通信機制之一,它可以用于線程間的通信和同步,提供了一個線程等待另一個線程通知其發(fā)生了某個事件的方法。

下面是一個使用條件變量實現(xiàn)多線程間通信的示例代碼:

import threading
import time

class Producer(threading.Thread):
    def __init__(self, name, cond):
        super().__init__(name=name)
        self.cond = cond

    def run(self):
        for i in range(10):
            with self.cond:
                print(f"{self.name} producing item {i}")
                self.cond.notify() # 通知消費者線程
                self.cond.wait() # 等待消費者線程通知

class Consumer(threading.Thread):
    def __init__(self, name, cond):
        super().__init__(name=name)
        self.cond = cond

    def run(self):
        for i in range(10):
            with self.cond:
                self.cond.wait() # 等待生產(chǎn)者線程通知
                print(f"{self.name} consuming item {i}")
                self.cond.notify() # 通知生產(chǎn)者線程

cond = threading.Condition()
producer = Producer("Producer", cond)
consumer = Consumer("Consumer", cond)
producer.start()
consumer.start()
producer.join()
consumer.join()

在這個示例代碼中,生產(chǎn)者線程通過cond.notify()方法通知消費者線程,消費者線程通過cond.wait()方法等待生產(chǎn)者線程的通知。條件變量cond用于實現(xiàn)線程之間的同步和通信,生產(chǎn)者線程和消費者線程在共享同一把鎖的情況下,通過with self.cond:語句獲取條件變量的鎖并進入臨界區(qū),確保線程安全。

信號量(Semaphore)實現(xiàn)多線程間的通信

信號量(Semaphore)是一種用于控制并發(fā)訪問共享資源的同步原語。它是一種計數(shù)器,用于控制多個線程對共享資源的訪問。信號量維護一個計數(shù)器,初始值為一個非負整數(shù),每當一個線程訪問共享資源時,計數(shù)器減1;當計數(shù)器為0時,所有試圖訪問共享資源的線程都會被阻塞,直到某個線程釋放了共享資源,此時計數(shù)器加1,被阻塞的線程才有機會繼續(xù)執(zhí)行。

在 Python 中,我們可以使用 threading.Semaphore 類來創(chuàng)建信號量對象。該類的構造函數(shù)接受一個整數(shù)作為參數(shù),表示初始計數(shù)器的值。Semaphore 類有兩個方法,acquire()release(),分別用于獲取和釋放信號量。

以下是使用信號量實現(xiàn)的簡單示例代碼:

import threading

class Producer(threading.Thread):
    def __init__(self, name, buf, sem):
        super().__init__(name=name)
        self.buf = buf
        self.sem = sem

    def run(self):
        for i in range(5):
            self.sem.acquire()
            self.buf.append(i)
            print(f"{self.name} produced {i}")
            self.sem.release()

class Consumer(threading.Thread):
    def __init__(self, name, buf, sem):
        super().__init__(name=name)
        self.buf = buf
        self.sem = sem

    def run(self):
        while True:
            self.sem.acquire()
            if not self.buf:
                self.sem.release()
                break
            item = self.buf.pop(0)
            print(f"{self.name} consumed {item}")
            self.sem.release()

if __name__ == '__main__':
    buf = []
    sem = threading.Semaphore(1)
    producer = Producer("Producer", buf, sem)
    consumer1 = Consumer("Consumer1", buf, sem)
    consumer2 = Consumer("Consumer2", buf, sem)
    producer.start()
    consumer1.start()
    consumer2.start()
    producer.join()
    consumer1.join()
    consumer2.join()

在這個示例代碼中,有一個生產(chǎn)者線程和兩個消費者線程。生產(chǎn)者線程向共享緩沖區(qū)中添加數(shù)據(jù),而消費者線程從緩沖區(qū)中獲取數(shù)據(jù)。為了避免競爭條件,我們使用了信號量。

在生產(chǎn)者線程中,當信號量可用時,它會獲取信號量并添加數(shù)據(jù)到緩沖區(qū)中,然后釋放信號量。在消費者線程中,當信號量可用時,它會獲取信號量并從緩沖區(qū)中獲取數(shù)據(jù),然后釋放信號量。

通過使用信號量,我們可以確保生產(chǎn)者和消費者線程之間的同步,從而避免了競爭條件和死鎖問題。

管道(Pipe)

在 Python3 中,可以使用 multiprocessing 模塊中的 Pipe 類來實現(xiàn)多進程間的通信,也可以用 multiprocessing.connection 模塊來創(chuàng)建多進程間的通信通道。下面的例子是用 Pipe 類來實現(xiàn)多線程間的通信:

import threading
from multiprocessing import Pipe

def producer(pipe):
    for i in range(5):
        pipe.send(i)
        print(f"{threading.current_thread().name} produced {i}")
    pipe.close()

def consumer(pipe):
    while True:
        try:
            item = pipe.recv()
            print(f"{threading.current_thread().name} consumed {item}")
        except EOFError:
            break

if __name__ == '__main__':
    producer_conn, consumer_conn = Pipe()
    producer_thread = threading.Thread(target=producer, args=(producer_conn,))
    consumer_thread = threading.Thread(target=consumer, args=(consumer_conn,))
    producer_thread.start()
    consumer_thread.start()
    producer_thread.join()
    consumer_thread.join()

在這個例子中,我們創(chuàng)建了兩個線程,一個生產(chǎn)者線程和一個消費者線程。它們之間共享一個管道(Pipe),其中生產(chǎn)者將數(shù)據(jù)寫入管道,而消費者從管道中讀取數(shù)據(jù)。當生產(chǎn)者完成其工作后,它會關閉管道以通知消費者停止運行。

需要注意的是,在 Pipe 中,發(fā)送和接收操作是阻塞的。因此,在發(fā)送或接收數(shù)據(jù)時,如果沒有可用的空間或數(shù)據(jù),線程將被阻塞,直到有數(shù)據(jù)可用或空間可用。

定時器Timer

定時器(Timer)是Python中的一個線程類,它可以在一定時間之后調用指定的函數(shù)或方法。Timer是繼承自Thread類的,因此可以像Thread一樣啟動、停止和等待它。

定時器的構造函數(shù)如下:

class threading.Timer(interval, function, args=None, kwargs=None)

這個程序中,我們創(chuàng)建了一個定時器t,它會在5秒后調用hello函數(shù),并啟動定時器。程序將在啟動定時器后立即繼續(xù)執(zhí)行,而定時器則在后臺等待5秒,然后調用hello函數(shù)。

如果我們想要停止定時器,可以使用cancel()方法:

t.cancel()  # 停止定時器

需要注意的是,如果定時器已經(jīng)超時并且在調用函數(shù)之前被取消,那么函數(shù)將不會被調用。因此,需要在調用cancel()方法之前等待定時器超時。

python3線程池

concurrent.futures實現(xiàn)多線程

Python 3中的線程池是一種常見的多線程編程模型,可以提高多線程程序的性能和可維護性。在Python 3中,線程池可以通過標準庫中的concurrent.futures模塊來實現(xiàn)。

concurrent.futures模塊定義了兩個類:ThreadPoolExecutorProcessPoolExecutor。這兩個類都實現(xiàn)了Python 3中的執(zhí)行器(Executor)接口,提供了一種方便的方式來異步執(zhí)行函數(shù)或方法,并返回其結果。

Exectuor 提供了如下常用方法:

  • submit(fn, *args, **kwargs):將 fn 函數(shù)提交給線程池。*args 代表傳給 fn 函數(shù)的參數(shù),*kwargs 代表以關鍵字參數(shù)的形式為 fn 函數(shù)傳入?yún)?shù)。
  • map(func, *iterables, timeout=None, chunksize=1):該函數(shù)類似于全局函數(shù) map(func, *iterables),只是該函數(shù)將會啟動多個線程,以異步方式立即對 iterables 執(zhí)行 map 處理。超時拋出TimeoutError錯誤。返回每個函數(shù)的結果,注意不是返回future。
  • shutdown(wait=True):關閉線程池。關閉之后線程池不再接受新任務,但會將之前提交的任務完成。

程序將task函數(shù)submit給線程池后,會返回一個Future對象,F(xiàn)uture主要用來獲取task的返回值。

Future 提供了如下方法:

  • cancel():取消該 Future 代表的線程任務。如果該任務正在執(zhí)行,不可取消,則該方法返回 False;否則,程序會取消該任務,并返回 True。
  • cancelled():返回 Future 代表的線程任務是否被成功取消。
  • running():如果該 Future 代表的線程任務正在執(zhí)行、不可被取消,該方法返回 True。
  • done():如果該 Funture 代表的線程任務被成功取消或執(zhí)行完成,則該方法返回 True。
  • result(timeout=None):獲取該 Future 代表的線程任務最后返回的結果。如果 Future 代表的線程任務還未完成,該方法將會阻塞當前線程,其中 timeout 參數(shù)指定最多阻塞多少秒。超時拋出TimeoutError,取消拋出CancelledError。
  • exception(timeout=None):獲取該 Future 代表的線程任務所引發(fā)的異常。如果該任務成功完成,沒有異常,則該方法返回 None。
  • add_done_callback(fn):為該 Future 代表的線程任務注冊一個“回調函數(shù)”,當該任務成功完成時,程序會自動觸發(fā)該 fn 函數(shù),參數(shù)是future。

使用線程池來執(zhí)行線程任務的步驟如下:

  • 調用 ThreadPoolExecutor 類的構造器創(chuàng)建一個線程池。
  • 定義一個普通函數(shù)作為線程任務。
  • 調用 ThreadPoolExecutor 對象的 submit() 方法來提交線程任務。
  • 當不想提交任何任務時,調用 ThreadPoolExecutor 對象的 shutdown() 方法來關閉線程池。

ThreadPoolExecutor是一個線程池執(zhí)行器,可以用來執(zhí)行異步任務,它管理著一個線程池,其中包含若干個線程。當一個任務被提交給執(zhí)行器時,執(zhí)行器會將其分配給一個線程來執(zhí)行。當線程池中的所有線程都在執(zhí)行任務時,新提交的任務會被放入隊列中,直到有可用的線程為止。

以下是一個使用ThreadPoolExecutor的簡單示例:

from concurrent.futures import ThreadPoolExecutor
import time

def worker(num):
    print(f"Worker {num} starting")
    time.sleep(1)
    print(f"Worker {num} finished")

if __name__ == '__main__':
    with ThreadPoolExecutor(max_workers=3) as executor:
        futures = [executor.submit(worker, i) for i in range(5)]
    for future in concurrent.futures.as_completed(futures):
        try:
            result = future.result()
        except Exception as e:
            print(f"Task raised an exception: {e}")
        else: 
            print(f"Task returned: {result}")

在這個例子中,我們創(chuàng)建了一個線程池執(zhí)行器,并指定了最大線程數(shù)為3。然后我們循環(huán)提交5個任務給執(zhí)行器,每個任務都是一個worker函數(shù),并傳入不同的參數(shù)。由于我們設置了最大線程數(shù)為3,所以只會有3個任務同時被執(zhí)行,另外2個任務會在之后的某個時間點被執(zhí)行。

執(zhí)行結果會存儲在Future對象中,我們可以通過as_completed()方法獲取任務的執(zhí)行結果。如果任務執(zhí)行過程中發(fā)生了異常,result()方法會拋出相應的異常。否則,它會返回任務的執(zhí)行結果。

ThreadPoolExecutor還有其他一些有用的方法,例如shutdown()方法可以等待所有已提交的任務執(zhí)行完畢并關閉線程池。

總之,Python 3中的線程池提供了一種方便的方式來執(zhí)行異步任務,可以大大提高多線程程序的性能和可維護性。

使用線程池的好處和場景

使用線程池的優(yōu)點是可以避免線程的頻繁創(chuàng)建和銷毀,從而提高線程的利用率,減少系統(tǒng)的開銷。因此,當需要頻繁執(zhí)行短時間的任務時,可以考慮使用線程池。例如:

  • 網(wǎng)絡服務器:在服務器端接收客戶端請求后,可以使用線程池來處理客戶端請求,以提高服務器的并發(fā)性能。
  • 圖像處理:在圖像處理過程中,需要頻繁啟動和停止線程來處理每個像素點的計算,使用線程池可以減少線程的創(chuàng)建和銷毀,提高處理效率。
  • 數(shù)據(jù)庫連接池:在數(shù)據(jù)庫操作中,需要頻繁創(chuàng)建和銷毀數(shù)據(jù)庫連接,使用線程池可以減少這種開銷,提高數(shù)據(jù)庫操作的效率。

總之,當需要頻繁執(zhí)行短時間的任務時,可以考慮使用線程池來優(yōu)化程序性能。

以上就是Python3多線程詳解的詳細內容,更多關于Python3多線程的資料請關注腳本之家其它相關文章!

相關文章

  • 利用scikitlearn畫ROC曲線實例

    利用scikitlearn畫ROC曲線實例

    這篇文章主要介紹了利用scikitlearn畫ROC曲線實例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-07-07
  • python利用thrift服務讀取hbase數(shù)據(jù)的方法

    python利用thrift服務讀取hbase數(shù)據(jù)的方法

    今天小編就為大家分享一篇python利用thrift服務讀取hbase數(shù)據(jù)的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2018-12-12
  • 解決Django layui {{}}沖突的問題

    解決Django layui {{}}沖突的問題

    今天小編就為大家分享一篇解決Django layui {{}}沖突的問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2019-08-08
  • ???????Python?入門學習之函數(shù)式編程

    ???????Python?入門學習之函數(shù)式編程

    這篇文章主要介紹了???????Python?入門學習之函數(shù)式編程,?Python?中的函數(shù)式編程技術進行了簡單的入門介紹,下文詳細內容需要的小伙伴可以參考一下
    2022-05-05
  • python wxpython 實現(xiàn)界面跳轉功能

    python wxpython 實現(xiàn)界面跳轉功能

    wxpython沒提供界面跳轉的方式,所以就需要借助threading模塊,本文給大家分享python wxpython 實現(xiàn)界面跳轉功能,感興趣的朋友跟隨小編一起看看吧
    2019-12-12
  • python實現(xiàn)超時退出的三種方式總結

    python實現(xiàn)超時退出的三種方式總結

    這篇文章主要介紹了python實現(xiàn)超時退出的三種方式總結,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-11-11
  • python原類、類的創(chuàng)建過程與方法詳解

    python原類、類的創(chuàng)建過程與方法詳解

    在本篇文章里小編給各位分享了關于python原類、類的創(chuàng)建過程與方法的相關知識點內容,有興趣的朋友們跟著學習參考下。
    2019-07-07
  • Tensorflow 查看變量的值方法

    Tensorflow 查看變量的值方法

    今天小編就為大家分享一篇Tensorflow 查看變量的值方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2018-06-06
  • python實現(xiàn)矩陣打印

    python實現(xiàn)矩陣打印

    這篇文章主要為大家詳細介紹了python實現(xiàn)矩陣打印的相關代碼,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2019-03-03
  • SpringMVC和SpringBoot接收參數(shù)的幾種方式詳解

    SpringMVC和SpringBoot接收參數(shù)的幾種方式詳解

    這篇文章主要介紹了SpringMVC和SpringBoot接收參數(shù)的幾種方式詳解,Spring是分層的JavaSE/EE應用輕量級開源框架,以IoC和AOP為內核,提供了展現(xiàn)層 Spring MVC和持久層Spring JDBC以及業(yè)務層事務管理等眾多的企業(yè)級應用技術,需要的朋友可以參考下
    2023-07-07

最新評論