Python3標準庫之threading進程中管理并發(fā)操作方法
1. threading進程中管理并發(fā)操作
threading模塊提供了管理多個線程執(zhí)行的API,允許程序在同一個進程空間并發(fā)的運行多個操作。
1.1 Thread對象
要使用Thread,最簡單的方法就是用一個目標函數(shù)實例化一個Thread對象,并調(diào)用start()讓它開始工作。
import threading def worker(): """thread worker function""" print('Worker') threads = [] for i in range(5): t = threading.Thread(target=worker) threads.append(t) t.start()
輸出有5行,每一行都是"Worker"。
如果能夠創(chuàng)建一個線程,并向它傳遞參數(shù)告訴它要完成什么工作,那么這會很有用。任何類型的對象都可以作為參數(shù)傳遞到線程。下面的例子傳遞了一個數(shù),線程將打印出這個數(shù)。
import threading def worker(num): """thread worker function""" print('Worker: %s' % num) threads = [] for i in range(5): t = threading.Thread(target=worker, args=(i,)) threads.append(t) t.start()
現(xiàn)在這個整數(shù)參數(shù)會包含在各線程打印的消息中。
1.2 確定當前線程
使用參數(shù)來標識或命名線程很麻煩,也沒有必要。每個Thread實例都有一個帶有默認值的名,該默認值可以在創(chuàng)建線程時改變。如果服務器進程中有多個服務線程處理不同的操作,那么在這樣的服務器進程中,對線程命名就很有用。
import threading import time def worker(): print(threading.current_thread().getName(), 'Starting') time.sleep(0.2) print(threading.current_thread().getName(), 'Exiting') def my_service(): print(threading.current_thread().getName(), 'Starting') time.sleep(0.3) print(threading.current_thread().getName(), 'Exiting') t = threading.Thread(name='my_service', target=my_service) w = threading.Thread(name='worker', target=worker) w2 = threading.Thread(target=worker) # use default name w.start() w2.start() t.start()
調(diào)試輸出的每一行中包含有當前線程的名。線程名列中有"Thread-1"的行對應未命名的線程w2。
大多數(shù)程序并不使用print來進行調(diào)試。logging模塊支持將線程名嵌入到各個日志消息中(使用格式化代碼%(threadName)s)。通過把線程名包含在日志消息中,就能跟蹤這些消息的來源。
import logging import threading import time def worker(): logging.debug('Starting') time.sleep(0.2) logging.debug('Exiting') def my_service(): logging.debug('Starting') time.sleep(0.3) logging.debug('Exiting') logging.basicConfig( level=logging.DEBUG, format='[%(levelname)s] (%(threadName)-10s) %(message)s', ) t = threading.Thread(name='my_service', target=my_service) w = threading.Thread(name='worker', target=worker) w2 = threading.Thread(target=worker) # use default name w.start() w2.start() t.start()
而且logging是線程安全的,所以來自不同線程的消息在輸出中會有所區(qū)分。
1.3 守護與非守護線程
到目前為止,示例程序都在隱式地等待所有線程完成工作之后才退出。不過,程序有時會創(chuàng)建一個線程作為守護線程(daemon),這個線程可以一直運行而不阻塞主程序退出。
如果一個服務不能很容易地中斷線程,或者即使讓線程工作到一半時中止也不會造成數(shù)據(jù)損失或破壞(例如,為一個服務監(jiān)控工具生成“心跳”的線程),那么對于這些服務,使用守護線程就很有用。要標志一個線程為守護線程,構造線程時便要傳入daemon=True
或者要調(diào)用它的setDaemon()
方法并提供參數(shù)True。默認情況下線程不作為守護線程。
import threading import time import logging def daemon(): logging.debug('Starting') time.sleep(0.2) logging.debug('Exiting') def non_daemon(): logging.debug('Starting') logging.debug('Exiting') logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) d = threading.Thread(name='daemon', target=daemon, daemon=True) t = threading.Thread(name='non-daemon', target=non_daemon) d.start() t.start()
這個代碼的輸出中不包含守護線程的“Exiting“消息,因為在從sleep()調(diào)用喚醒守護線程之前,所有非守護線程(包括主線程)已經(jīng)退出。
要等待一個守護線程完成工作,需要使用join()方法。
import threading import time import logging def daemon(): logging.debug('Starting') time.sleep(0.2) logging.debug('Exiting') def non_daemon(): logging.debug('Starting') logging.debug('Exiting') logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) d = threading.Thread(name='daemon', target=daemon, daemon=True) t = threading.Thread(name='non-daemon', target=non_daemon) d.start() t.start() d.join() t.join()
使用join()等待守護線程退出意味著它有機會生成它的"Exiting"消息。
默認地,join()會無限阻塞?;蛘?,還可以傳入一個浮點值,表示等待線程在多長時間(秒數(shù))后變?yōu)椴换顒?。即使線程在這個時間段內(nèi)未完成,join()也會返回。
import threading import time import logging def daemon(): logging.debug('Starting') time.sleep(0.2) logging.debug('Exiting') def non_daemon(): logging.debug('Starting') logging.debug('Exiting') logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) d = threading.Thread(name='daemon', target=daemon, daemon=True) t = threading.Thread(name='non-daemon', target=non_daemon) d.start() t.start() d.join(0.1) print('d.isAlive()', d.isAlive()) t.join()
由于傳人的超時時間小于守護線程睡眠的時間,所以join()返回之后這個線程仍是"活著"。
1.4 枚舉所有線程
沒有必要為所有守護線程維護一個顯示句柄來確保它們在退出主進程之前已經(jīng)完成。
enumerate()會返回活動 Thread實例的一個列表。這個列表也包括當前線程,由于等待當前線程終止(join)會引入一種死鎖情況,所以必須跳過。
import random import threading import time import logging def worker(): """thread worker function""" pause = random.randint(1, 5) / 10 logging.debug('sleeping %0.2f', pause) time.sleep(pause) logging.debug('ending') logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) for i in range(3): t = threading.Thread(target=worker, daemon=True) t.start() main_thread = threading.main_thread() for t in threading.enumerate(): if t is main_thread: continue logging.debug('joining %s', t.getName()) t.join()
由于工作線程睡眠的時間量是隨機的,所以這個程序的輸出可能有變化。
1.5 派生線程
開始時,Thread要完成一些基本初始化,然后調(diào)用其run()方法,這會調(diào)用傳遞到構造函數(shù)的目標函數(shù)。要創(chuàng)建Thread的一個子類,需要覆蓋run()來完成所需的工作。
import threading import logging class MyThread(threading.Thread): def run(self): logging.debug('running') logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) for i in range(5): t = MyThread() t.start()
run()的返回值將被忽略。
由于傳遞到Thread構造函數(shù)的args和kwargs值保存在私有變量中(這些變量名都有前綴),所以不能很容易地從子類訪問這些值。要向一個定制的線程類型傳遞參數(shù),需要重新定義構造函數(shù),將這些值保存在子類可見的一個實例屬性中。
import threading import logging class MyThreadWithArgs(threading.Thread): def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None): super().__init__(group=group, target=target, name=name, daemon=daemon) self.args = args self.kwargs = kwargs def run(self): logging.debug('running with %s and %s', self.args, self.kwargs) logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) for i in range(5): t = MyThreadWithArgs(args=(i,), kwargs={'a': 'A', 'b': 'B'}) t.start()
MyThreadwithArgs
使用的API與Thread相同,不過類似于其他定制類,這個類可以輕松地修改構造函數(shù)方法,以取得更多參數(shù)或者與線程用途更直接相關的不同參數(shù)。
1.6 定時器線程
有時出于某種原因需要派生Thread,Timer就是這樣一個例子,Timer也包含在threading中。Timer在一個延遲之后開始工作,而且可以在這個延遲期間內(nèi)的任意時刻被取消。
import threading import time import logging def delayed(): logging.debug('worker running') logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) t1 = threading.Timer(0.3, delayed) t1.setName('t1') t2 = threading.Timer(0.3, delayed) t2.setName('t2') logging.debug('starting timers') t1.start() t2.start() logging.debug('waiting before canceling %s', t2.getName()) time.sleep(0.2) logging.debug('canceling %s', t2.getName()) t2.cancel() logging.debug('done')
這個例子中,第二個定時器永遠不會運行,看起來第一個定時器在主程序的其余部分完成之后還會運行。由于這不是一個守護線程,所以在主線程完成時其會隱式退出。
1.7 線程間傳送信號
盡管使用多線程的目的是并發(fā)地運行單獨的操作,但有時也需要在兩個或多個線程中同步操作。事件對象是實現(xiàn)線程間安全通信的一種簡單方法。Event管理一個內(nèi)部標志,調(diào)用者可以用set()和clear()方法控制這個標志。其他線程可以使用wait()暫停,直到這個標志被設置,可有效地阻塞進程直至允許這些線程繼續(xù)。
import logging import threading import time def wait_for_event(e): """Wait for the event to be set before doing anything""" logging.debug('wait_for_event starting') event_is_set = e.wait() logging.debug('event set: %s', event_is_set) def wait_for_event_timeout(e, t): """Wait t seconds and then timeout""" while not e.is_set(): logging.debug('wait_for_event_timeout starting') event_is_set = e.wait(t) logging.debug('event set: %s', event_is_set) if event_is_set: logging.debug('processing event') else: logging.debug('doing other work') logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) e = threading.Event() t1 = threading.Thread( name='block', target=wait_for_event, args=(e,), ) t1.start() t2 = threading.Thread( name='nonblock', target=wait_for_event_timeout, args=(e, 2), ) t2.start() logging.debug('Waiting before calling Event.set()') time.sleep(0.3) e.set() logging.debug('Event is set')
wait()方法取一個參數(shù),表示等待事件的時間(秒數(shù)),達到這個時間后就超時。它會返回一個布爾值,指示事件是否已設置,使調(diào)用者知道wait()為什么返回??梢詫κ录为毜厥褂胕s_set()方法而不必擔心阻塞。
在這個例子中,wait_for_event_timeout()
將檢查事件狀態(tài)而不會無限阻塞。wait_for_event()
在wait()調(diào)用的位置阻塞,事件狀態(tài)改變之前它不會返回。
1.8 控制資源訪問
除了同步線程操作,還有一點很重要,要能夠控制對共享資源的訪問,從而避免破壞或丟失數(shù)據(jù)。Python的內(nèi)置數(shù)據(jù)結(jié)構(列表、字典等)是線程安全的,這是Python使用原子字節(jié)碼來管理這些數(shù)據(jù)結(jié)構的一個副作用(更新過程中不會釋放保護Python內(nèi)部數(shù)據(jù)結(jié)構的全局解釋器鎖GIL(Global Interpreter Lock))。Python中實現(xiàn)的其他數(shù)據(jù)結(jié)構或更簡單的類型(如整數(shù)和浮點數(shù))則沒有這個保護。要保證同時安全地訪問一個對象,可以使用一個Lock對象。
import logging import random import threading import time class Counter: def __init__(self, start=0): self.lock = threading.Lock() self.value = start def increment(self): logging.debug('Waiting for lock') self.lock.acquire() try: logging.debug('Acquired lock') self.value = self.value + 1 finally: self.lock.release() def worker(c): for i in range(2): pause = random.random() logging.debug('Sleeping %0.02f', pause) time.sleep(pause) c.increment() logging.debug('Done') logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) counter = Counter() for i in range(2): t = threading.Thread(target=worker, args=(counter,)) t.start() logging.debug('Waiting for worker threads') main_thread = threading.main_thread() for t in threading.enumerate(): if t is not main_thread: t.join() logging.debug('Counter: %d', counter.value)
在這個例子中,worker()函數(shù)使一個Counter實例遞增,這個實例管理著一個Lock,以避免兩個線程同時改變其內(nèi)部狀態(tài)。如果沒有使用Lock,就有可能丟失一次對value屬性的修改。
要確定是否有另一個線程請求這個鎖而不影響當前線程,可以向acquire()
的blocking參數(shù)傳入False。在下一個例子中,worker()想要分別得到3次鎖,并統(tǒng)計為得到鎖而嘗試的次數(shù)。與此同時,lock_holder()
在占有和釋放鎖之間循環(huán),每個狀態(tài)會短暫暫停,以模擬負載情況。
import logging import threading import time def lock_holder(lock): logging.debug('Starting') while True: lock.acquire() try: logging.debug('Holding') time.sleep(0.5) finally: logging.debug('Not holding') lock.release() time.sleep(0.5) def worker(lock): logging.debug('Starting') num_tries = 0 num_acquires = 0 while num_acquires < 3: time.sleep(0.5) logging.debug('Trying to acquire') have_it = lock.acquire(0) try: num_tries += 1 if have_it: logging.debug('Iteration %d: Acquired', num_tries) num_acquires += 1 else: logging.debug('Iteration %d: Not acquired', num_tries) finally: if have_it: lock.release() logging.debug('Done after %d iterations', num_tries) logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) lock = threading.Lock() holder = threading.Thread( target=lock_holder, args=(lock,), name='LockHolder', daemon=True, ) holder.start() worker = threading.Thread( target=worker, args=(lock,), name='Worker', ) worker.start()
worker()需要超過3次迭代才能得到3次鎖。
1.8.1 再入鎖
正常的Lock對象不能請求多次,即使是由同一個線程請求也不例外。如果同一個調(diào)用鏈中的多個函數(shù)訪問一個鎖,則可能會產(chǎn)生我們不希望的副作用。
import threading lock = threading.Lock() print('First try :', lock.acquire()) print('Second try:', lock.acquire(0))
在這里,對第二個acquire()
調(diào)用給定超時值為0,以避免阻塞,因為鎖已經(jīng)被第一個調(diào)用獲得。
如果同一個線程的不同代碼需要"重新獲得"鎖,那么在這種情況下要使用RLock。
import threading lock = threading.RLock() print('First try :', lock.acquire()) print('Second try:', lock.acquire(0))
與前面的例子相比,對代碼唯一的修改就是用RLock替換Lock。
1.8.2 鎖作為上下文管理器
鎖實現(xiàn)了上下文管理器API,并與with語句兼容。使用with則不再需要顯式地獲得和釋放鎖。
import threading import logging def worker_with(lock): with lock: logging.debug('Lock acquired via with') def worker_no_with(lock): lock.acquire() try: logging.debug('Lock acquired directly') finally: lock.release() logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) lock = threading.Lock() w = threading.Thread(target=worker_with, args=(lock,)) nw = threading.Thread(target=worker_no_with, args=(lock,)) w.start() nw.start()
函數(shù)worker_with()
和worker_no_with()
用等價的方式管理鎖。
1.9 同步線程
除了使用Event,還可以通過使用一個Condition對象來同步線程。由于Condition使用了一個Lock,所以它可以綁定到一個共享資源,允許多個線程等待資源更新。在下一個例子中,consumer()線程要等待設置了Condition才能繼續(xù)。producer()線程負責設置條件,以及通知其他線程繼續(xù)。
import logging import threading import time def consumer(cond): """wait for the condition and use the resource""" logging.debug('Starting consumer thread') with cond: cond.wait() logging.debug('Resource is available to consumer') def producer(cond): """set up the resource to be used by the consumer""" logging.debug('Starting producer thread') with cond: logging.debug('Making resource available') cond.notifyAll() logging.basicConfig( level=logging.DEBUG, format='%(asctime)s (%(threadName)-2s) %(message)s', ) condition = threading.Condition() c1 = threading.Thread(name='c1', target=consumer, args=(condition,)) c2 = threading.Thread(name='c2', target=consumer, args=(condition,)) p = threading.Thread(name='p', target=producer, args=(condition,)) c1.start() time.sleep(0.2) c2.start() time.sleep(0.2) p.start()
這些線程使用with來獲得與Condition關聯(lián)的鎖。也可以顯式地使用acquire()和release()
方法。
屏障(barrier)是另一種線程同步機制。Barrier會建立一個控制點,所有參與線程會在這里阻塞,直到所有這些參與“方”都到達這一點。采用這種方法,線程可以單獨啟動然后暫停,直到所有線程都準備好才可以繼續(xù)。
import threading import time def worker(barrier): print(threading.current_thread().name, 'waiting for barrier with {} others'.format( barrier.n_waiting)) worker_id = barrier.wait() print(threading.current_thread().name, 'after barrier', worker_id) NUM_THREADS = 3 barrier = threading.Barrier(NUM_THREADS) threads = [ threading.Thread( name='worker-%s' % i, target=worker, args=(barrier,), ) for i in range(NUM_THREADS) ] for t in threads: print(t.name, 'starting') t.start() time.sleep(0.1) for t in threads: t.join()
在這個例子中,Barrier被配置為會阻塞線程,直到3個線程都在等待。滿足這個條件時,所有線程被同時釋放從而越過這個控制點。wait()的返回值指示了釋放的參與線程數(shù),可以用來限制一些線程做清理資源等動作。
Barrier的abort()方法會使所有等待線程接收一個BrokenBarrierError。如果線程在wait()上被阻塞而停止處理,這就允許線程完成清理工作。
import threading import time def worker(barrier): print(threading.current_thread().name, 'waiting for barrier with {} others'.format( barrier.n_waiting)) try: worker_id = barrier.wait() except threading.BrokenBarrierError: print(threading.current_thread().name, 'aborting') else: print(threading.current_thread().name, 'after barrier', worker_id) NUM_THREADS = 3 barrier = threading.Barrier(NUM_THREADS + 1) threads = [ threading.Thread( name='worker-%s' % i, target=worker, args=(barrier,), ) for i in range(NUM_THREADS) ] for t in threads: print(t.name, 'starting') t.start() time.sleep(0.1) barrier.abort() for t in threads: t.join()
這個例子將Barrier配置為多加一個線程,即需要比實際啟動的線程再多一個參與線程,所以所有線程中的處理都會阻塞。在被阻塞的各個線程中,abort()調(diào)用會產(chǎn)生一個異常。
1.10 限制資源的并發(fā)訪問
有時可能需要允許多個工作線程同時訪問一個資源,但要限制總數(shù)。例如,連接池支持同時連接,但數(shù)目可能是固定的,或者一個網(wǎng)絡應用可能支持固定數(shù)目的并發(fā)下載。這些連接就可以使用Semaphore來管理。
import logging import threading import time class ActivePool: def __init__(self): super(ActivePool, self).__init__() self.active = [] self.lock = threading.Lock() def makeActive(self, name): with self.lock: self.active.append(name) logging.debug('Running: %s', self.active) def makeInactive(self, name): with self.lock: self.active.remove(name) logging.debug('Running: %s', self.active) def worker(s, pool): logging.debug('Waiting to join the pool') with s: name = threading.current_thread().getName() pool.makeActive(name) time.sleep(0.1) pool.makeInactive(name) logging.basicConfig( level=logging.DEBUG, format='%(asctime)s (%(threadName)-2s) %(message)s', ) pool = ActivePool() s = threading.Semaphore(2) for i in range(4): t = threading.Thread( target=worker, name=str(i), args=(s, pool), ) t.start()
在這個例子中,ActivePool類只作為一種便利方法,用來跟蹤某個給定時刻哪些線程能夠運行。真正的資源池會為新的活動線程分配一個連接或另外某個值,并且當這個線程工作完成時再回收這個值。在這里,資源池只是用來保存活動線程的名,以顯示至少有兩個線程在并發(fā)運行。
1.11 線程特定的數(shù)據(jù)
有些資源需要鎖定以便多個線程使用,另外一些資源則需要保護,以使它們對并非是這些資源的“所有者”的線程隱藏。local()函數(shù)會創(chuàng)建一個對象,它能夠隱藏值,使其在不同線程中無法被看到。
import random import threading import logging def show_value(data): try: val = data.value except AttributeError: logging.debug('No value yet') else: logging.debug('value=%s', val) def worker(data): show_value(data) data.value = random.randint(1, 100) show_value(data) logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) local_data = threading.local() show_value(local_data) local_data.value = 1000 show_value(local_data) for i in range(2): t = threading.Thread(target=worker, args=(local_data,)) t.start()
屬性local_data.value
對所有線程都不可見,除非在某個線程中設置了這個屬性,這個線程才能看到它。
要初始化設置以使所有線程在開始時都有相同的值,可以使用一個子類,并在_init_()中設置這些屬性。
import random import threading import logging def show_value(data): try: val = data.value except AttributeError: logging.debug('No value yet') else: logging.debug('value=%s', val) def worker(data): show_value(data) data.value = random.randint(1, 100) show_value(data) class MyLocal(threading.local): def __init__(self, value): super().__init__() logging.debug('Initializing %r', self) self.value = value logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) local_data = MyLocal(1000) show_value(local_data) for i in range(2): t = threading.Thread(target=worker, args=(local_data,)) t.start()
這會在相同的對象上調(diào)用_init_()(注意id()值),每個線程中調(diào)用一次以設置默認值。
總結(jié)
到此這篇關于Python3標準庫:threading進程中管理并發(fā)操作的文章就介紹到這了,更多相關Python3標準庫:threading進程中管理并發(fā)操作內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Python tkinter之Bind(綁定事件)的使用示例
這篇文章主要介紹了Python tkinter之Bind(綁定事件)的使用詳解,幫助大家更好的理解和學習python的gui開發(fā),感興趣的朋友可以了解下2021-02-02Django框架CBV裝飾器中間件auth模塊CSRF跨站請求問題
這篇文章主要介紹了Django CBV裝飾器 中間件 auth模塊 CSRF跨站請求,本文給大家介紹給CBV添加裝飾器有三種方法,三種方法都需要導入模塊,具體操作方法跟隨小編一起看看考下2021-08-08Python GUI教程之在PyQt5中使用數(shù)據(jù)庫的方法
Qt平臺對SQL編程有著良好的支持,PyQt5也一并繼承了過來,這篇文章主要介紹了Python GUI教程之在PyQt5中使用數(shù)據(jù)庫的方法,需要的朋友可以參考下2021-09-09深入淺析python 協(xié)程與go協(xié)程的區(qū)別
這篇文章主要介紹了python 協(xié)程與go協(xié)程的區(qū)別 ,本文給大家介紹的非常詳細,具有一定的參考借鑒價值,需要的朋友可以參考下2019-05-05python3.8+django2+celery5.2.7環(huán)境準備(python測試開發(fā)django)
這篇文章主要介紹了python測試開發(fā)django之python3.8+django2+celery5.2.7環(huán)境準備工作,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-07-07