python并發(fā)編程之線程實(shí)例解析
常用用法
t.is_alive()
Python中線程會在一個單獨(dú)的系統(tǒng)級別線程中執(zhí)行(比如一個POSIX線程或者一個Windows線程)
這些線程將由操作系統(tǒng)來全權(quán)管理。線程一旦啟動,將獨(dú)立執(zhí)行直到目標(biāo)函數(shù)返回。可以通過查詢
一個線程對象的狀態(tài),看它是否還在執(zhí)行t.is_alive()
t.join()
可以把一個線程加入到當(dāng)前線程,并等待它終止
Python解釋器在所有線程都終止后才繼續(xù)執(zhí)行代碼剩余的部分
daemon
對于需要長時(shí)間運(yùn)行的線程或者需要一直運(yùn)行的后臺任務(wù),可以用后臺線程(也稱為守護(hù)線程)
例:
t=Thread(target=func,args(1,),daemon=True)
t.start()
后臺線程無法等待,這些線程會在主線程終止時(shí)自動銷毀
小結(jié):
后臺線程無法等待,不過,這些線程會在主線程終止時(shí)自動銷毀。你無法結(jié)束一個線程,無法給它發(fā)送信
號,無法調(diào)整它的調(diào)度,也無法執(zhí)行其他高級操作。如果需要這些特性,你需要自己添加。比如說,
如果你需要終止線程,那么這個線程必須通過編程在某個特定點(diǎn)輪詢來退出
如果線程執(zhí)行一些像I/O這樣的阻塞操作,那么通過輪詢來終止線程將使得線程之間的協(xié)調(diào)變得非常棘手。
比如,如果一個線程一直阻塞在一個I/O操作上,它就永遠(yuǎn)無法返回,也就無法檢查自己是否已經(jīng)被結(jié)束了。
要正確處理這些問題,需要利用超時(shí)循環(huán)來小心操作線程。
線程間通信
queue
一個線程向另外一個線程發(fā)送數(shù)據(jù)最安全的方式應(yīng)該就是queue庫中的隊(duì)列
先看一下使用例子,這里是一個簡單的生產(chǎn)者和消費(fèi)者模型:
from queue import Queue from threading import Thread import random import time _sentinel = object() def producer(out_q): n = 10 while n: time.sleep(1) data = random.randint(0, 10) out_q.put(data) print("生產(chǎn)者生產(chǎn)了數(shù)據(jù){0}".format(data)) n -= 1 out_q.put(_sentinel) def consumer(in_q): while True: data = in_q.get() print("消費(fèi)者消費(fèi)了{(lán)0}".format(data)) if data is _sentinel: in_q.put(_sentinel) break q = Queue() t1 = Thread(target=consumer, args=(q,)) t2 = Thread(target=producer, args=(q,)) t1.start() t2.start()
上述代碼中設(shè)置了一個特殊值_sentinel用于當(dāng)獲取到這個值的時(shí)候終止執(zhí)行
關(guān)于queue的功能有個需要注意的地方:
Queue對象雖然已經(jīng)包含了必要的鎖,主要有q.put和q.get
而q.size(),q.full(),q.empty()等方法不是線程安全的
使用隊(duì)列進(jìn)行線程通信是一個單向、不確定的過程。通常情況下,是沒有辦法知道接收數(shù)據(jù)的線程是什么時(shí)候接收到的數(shù)據(jù)并開始工作的。但是隊(duì)列提供了一些基本的特性:q.task_done()和q.join()
如果一個線程需要在另外一個線程處理完特定的數(shù)據(jù)任務(wù)后立即得到通知,可以把要發(fā)送的數(shù)據(jù)和一個Event放到一起使用
關(guān)于線程中的Event
線程有一個非常關(guān)鍵的特性:每個線程都是獨(dú)立運(yùn)行的,且狀態(tài)不可預(yù)測
如果程序中的其他線程需要通過判斷每個線程的狀態(tài)來確定自己下一步的操作,這時(shí)線程同步問題就會比較麻煩。
解決方法:
使用threading庫中的Event
Event對象包含一個可由線程設(shè)置的信號標(biāo)志,它允許線程等待某些事件的發(fā)生。
在初始化狀態(tài)下,event對象中的信號標(biāo)志被設(shè)置為假。
如果有線程等待一個event對象,而這個event的標(biāo)志為假,這個線程將一直被阻塞知道該標(biāo)志為真。
一個線程如果把event對象的標(biāo)志設(shè)置為真,就會喚醒所有等待這個event對象的線程。
通過一個代碼例子理解:
from threading import Thread, Event import time def countdown(n, started_evt): print("countdown starting") # set將event的標(biāo)識設(shè)置為True started_evt.set() while n > 0: print("T-mins", n) n -= 1 time.sleep(2) # 初始化的started_evt為False started_evt = Event() print("Launching countdown") t = Thread(target=countdown, args=(10, started_evt,)) t.start() # 會一直等待直到event的標(biāo)志為True的時(shí)候 started_evt.wait() print("countdown is running")
而結(jié)果,我們也可以看出當(dāng)線程執(zhí)行了set之后,才打印running
實(shí)際用event對象最好是單次使用,創(chuàng)建一個event對象,讓某個線程等待這個對象,一旦對象被設(shè)置為Tru,就應(yīng)該丟棄它,我們雖然可以通過clear()方法重置event對象,但是這個沒法確保安全的清理event對象并對它進(jìn)行重新的賦值。會發(fā)生錯過事件,死鎖等各種問題。
event對象的一個重要特點(diǎn)是它被設(shè)置為True時(shí)會喚醒所有等待它的線程,如果喚醒單個線程的最好用Condition或信號量Semaphore
和event功能類似的線程中還有一個Condition
關(guān)于線程中的Condition
關(guān)于Condition官網(wǎng)的一段話:
Aconditionvariableisalwaysassociatedwithsomekindoflock;thiscanbepassedinoronewillbecreatedbydefault.Passingoneinisusefulwhenseveralconditionvariablesmustsharethesamelock.Thelockispartoftheconditionobject:youdon'thavetotrackitseparately.
Othermethodsmustbecalledwiththeassociatedlockheld.Thewait()methodreleasesthelock,andthenblocksuntilanotherthreadawakensitbycallingnotify()ornotify_all().Onceawakened,wait()re-acquiresthelockandreturns.Itisalsopossibletospecifyatimeout.
但是需要注意的是:
notify()andnotify_all()這兩個方法,不會釋放鎖,這意味著線程或者被喚醒的線程不會立刻執(zhí)行wait()
我們可以通過Conditon對象實(shí)現(xiàn)一個周期定時(shí)器的功能,每當(dāng)定時(shí)器超時(shí)的時(shí)候,其他線程都可以檢測到,代碼例子如下:
import threading import time class PeriodicTimer: """ 這里做了一個定時(shí)器 """ def __init__(self, interval): self._interval = interval self._flag = 0 self._cv = threading.Condition() def start(self): t = threading.Thread(target=self.run) t.daemon = True t.start() def run(self): while True: time.sleep(self._interval) with self._cv: # 這個點(diǎn)還是非常有意思的^= self._flag ^= 1 self._cv.notify_all() def wait_for_tick(self): with self._cv: last_flag = self._flag while last_flag == self._flag: self._cv.wait() # 下面兩個分別為兩個需要定時(shí)執(zhí)行的任務(wù) def countdown(nticks): while nticks > 0: ptimer.wait_for_tick() print('T-minus', nticks) nticks -= 1 def countup(last): n = 0 while n < last: ptimer.wait_for_tick() print('Counting', n) n += 1 ptimer = PeriodicTimer(5) ptimer.start() threading.Thread(target=countdown, args=(10,)).start() threading.Thread(target=countup, args=(5,)).start()
關(guān)于線程中鎖的使用
要在多線程中安全使用可變對象,需要使用threading庫中的Lock對象
先看一個關(guān)于鎖的基本使用:
import threading class SharedCounter: def __init__(self, initial_value=0): self._value = initial_value self._value_lock = threading.Lock() def incr(self,delta = 1): with self._value_lock: self._value += delta def decr(self, delta=1): with self._value_lock: self._value -= delta
Lock對象和with語句塊一起使用可以保證互斥執(zhí)行,這樣每次就只有一個線程可以執(zhí)行with語句包含的代碼塊。with語句會在這個代碼快執(zhí)行前自動獲取鎖,在執(zhí)行結(jié)束后自動釋放所。
線程的調(diào)度本質(zhì)上是不確定的,因此,在多線程程序中錯誤的使用鎖機(jī)制可能會導(dǎo)致隨機(jī)數(shù)據(jù)
損壞或者其他異常錯誤,我們稱之為競爭條件
你可能看到有些“老python程序員”
還是通過_value_lock.acquire()和_value_lock.release(),明顯看來
還是with更加方便,不容易出錯,畢竟你無法保證那次就忘記釋放鎖了
為了避免死鎖,使用鎖機(jī)制的程序應(yīng)該設(shè)定每個線程一次只能獲取一個鎖
threading庫中還提供了其他的同步原語:RLock,Semaphore對象。但是這兩個使用場景相對來說比較特殊
RLock(可重入鎖)可以被同一個線程多次獲取,主要用來實(shí)現(xiàn)基于檢測對象模式的鎖定和同步。在使用這種鎖的時(shí)候,當(dāng)鎖被持有時(shí),只有一個線程可以使用完整的函數(shù)或者類中的方法,例子如下:
import threading class SharedCounter: _lock = threading.RLock() def __init__(self,initial_value=0): self._value = initial_value def incr(self,delta=1): with SharedCounter._lock: self._value += delta def decr(self,delta=1): with SharedCounter._lock: self.incr(-delta)
這個例子中的鎖是一個類變量,也就是所有實(shí)例共享的類級鎖,這樣就保證了一次只有一個線程可以調(diào)用這個類的方法。與標(biāo)準(zhǔn)鎖不同的是已經(jīng)持有這個鎖的方法再調(diào)用同樣適用這個鎖的方法時(shí),無需再次獲取鎖,例如上面例子中的decr方法。
這種方法的特點(diǎn)是:無論這個類有多少實(shí)例都使用一個鎖。因此在需要使用大量使用計(jì)數(shù)器的情況下內(nèi)存效率更高。
缺點(diǎn):在程序中使用大量線程并頻繁更新計(jì)數(shù)器時(shí)會有競爭用鎖的問題。
信號量對象是一個建立在共享計(jì)數(shù)器基礎(chǔ)上的同步原語,如果計(jì)數(shù)器不為0,with語句講計(jì)數(shù)器減1,
線程被允許執(zhí)行。with語句執(zhí)行結(jié)束后,計(jì)數(shù)器加1。如果計(jì)數(shù)器為0,線程將被阻塞,直到其他線程結(jié)束并將計(jì)數(shù)器加1。但是信號量不推薦使用,增加了復(fù)雜性,影響程序性能。
所以信號量更適用于哪些需要在線程之間引入信號或者限制的程序。例如限制一段代碼的并發(fā)量
from threading import Semaphore import requests _fetch_url_sema = Semaphore(5) def fetch_url(url): with _fetch_url_sema: return requests.get(url)
關(guān)于防止死鎖的加鎖機(jī)制
在多線程程序中,死鎖問題很大一部分是由于多線程同時(shí)獲取多個鎖造成的。
舉個例子:一個線程獲取一個第一個鎖,在獲取第二個鎖的時(shí)候發(fā)生阻塞,那么這個線程就可能阻塞其他線程執(zhí)行,從而導(dǎo)致整個程序假死。
一種解決方法:為程序中每一個鎖分配一個唯一的id,然后只允許按照升序規(guī)則來使用多個鎖。
import threading from contextlib import contextmanager # 存儲已經(jīng)請求鎖的信息 _local = threading.local() @contextmanager def acquire(*locks): # 把鎖通過id進(jìn)行排序 locks = sorted(locks, key=lambda x: id(x)) acquired = getattr(_local, 'acquired', []) if acquired and max(id(lock) for lock in acquired) >= id(locks[0]): raise RuntimeError("Lock order Violation") acquired.extend(locks) _local.acquired = acquired try: for lock in locks: lock.acquire() yield finally: for lock in reversed(locks): lock.release() del acquired[-len(locks):] x_lock = threading.Lock() y_lock = threading.Lock() def thread_1(): while True: with acquire(x_lock,y_lock): print("Thread-1") def thread_2(): while True: with acquire(y_lock,x_lock): print("Thread-2") t1 = threading.Thread(target=thread_1) t1.daemon = True t1.start() t2 = threading.Thread(target=thread_2) t2.daemon = True t2.start()
通過排序,不管以什么樣的順序來請求鎖,這些鎖都會按照固定的順序被獲取。
這里也用了thread.local()來保存請求鎖的信息
同樣的這個東西也可以用來保存線程的信息,而這個線程對其他的線程是不可見的
總結(jié)
以上就是本文關(guān)于python并發(fā)編程之線程實(shí)例解析的全部內(nèi)容,希望對大家有所幫助。感興趣的朋友可以繼續(xù)參閱本站其他相關(guān)專題,如有不足之處,歡迎留言指出。感謝朋友們對本站的支持!
相關(guān)文章
基于Python Shell獲取hostname和fqdn釋疑
一直以來被linux的hostname和fqdn(Fully Qualified Domain Name)困惑著,今天通過腳本之家平臺把它們使用細(xì)節(jié)弄清分享給大家2016-01-01