python并發(fā)編程之線程實(shí)例解析
常用用法
t.is_alive()
Python中線程會(huì)在一個(gè)單獨(dú)的系統(tǒng)級(jí)別線程中執(zhí)行(比如一個(gè)POSIX線程或者一個(gè)Windows線程)
這些線程將由操作系統(tǒng)來(lái)全權(quán)管理。線程一旦啟動(dòng),將獨(dú)立執(zhí)行直到目標(biāo)函數(shù)返回??梢酝ㄟ^(guò)查詢
一個(gè)線程對(duì)象的狀態(tài),看它是否還在執(zhí)行t.is_alive()
t.join()
可以把一個(gè)線程加入到當(dāng)前線程,并等待它終止
Python解釋器在所有線程都終止后才繼續(xù)執(zhí)行代碼剩余的部分
daemon
對(duì)于需要長(zhǎng)時(shí)間運(yùn)行的線程或者需要一直運(yùn)行的后臺(tái)任務(wù),可以用后臺(tái)線程(也稱為守護(hù)線程)
例:
t=Thread(target=func,args(1,),daemon=True)
t.start()
后臺(tái)線程無(wú)法等待,這些線程會(huì)在主線程終止時(shí)自動(dòng)銷毀
小結(jié):
后臺(tái)線程無(wú)法等待,不過(guò),這些線程會(huì)在主線程終止時(shí)自動(dòng)銷毀。你無(wú)法結(jié)束一個(gè)線程,無(wú)法給它發(fā)送信
號(hào),無(wú)法調(diào)整它的調(diào)度,也無(wú)法執(zhí)行其他高級(jí)操作。如果需要這些特性,你需要自己添加。比如說(shuō),
如果你需要終止線程,那么這個(gè)線程必須通過(guò)編程在某個(gè)特定點(diǎn)輪詢來(lái)退出
如果線程執(zhí)行一些像I/O這樣的阻塞操作,那么通過(guò)輪詢來(lái)終止線程將使得線程之間的協(xié)調(diào)變得非常棘手。
比如,如果一個(gè)線程一直阻塞在一個(gè)I/O操作上,它就永遠(yuǎn)無(wú)法返回,也就無(wú)法檢查自己是否已經(jīng)被結(jié)束了。
要正確處理這些問題,需要利用超時(shí)循環(huán)來(lái)小心操作線程。
線程間通信
queue
一個(gè)線程向另外一個(gè)線程發(fā)送數(shù)據(jù)最安全的方式應(yīng)該就是queue庫(kù)中的隊(duì)列
先看一下使用例子,這里是一個(gè)簡(jiǎn)單的生產(chǎn)者和消費(fèi)者模型:
from queue import Queue from threading import Thread import random import time _sentinel = object() def producer(out_q): n = 10 while n: time.sleep(1) data = random.randint(0, 10) out_q.put(data) print("生產(chǎn)者生產(chǎn)了數(shù)據(jù){0}".format(data)) n -= 1 out_q.put(_sentinel) def consumer(in_q): while True: data = in_q.get() print("消費(fèi)者消費(fèi)了{(lán)0}".format(data)) if data is _sentinel: in_q.put(_sentinel) break q = Queue() t1 = Thread(target=consumer, args=(q,)) t2 = Thread(target=producer, args=(q,)) t1.start() t2.start()
上述代碼中設(shè)置了一個(gè)特殊值_sentinel用于當(dāng)獲取到這個(gè)值的時(shí)候終止執(zhí)行
關(guān)于queue的功能有個(gè)需要注意的地方:
Queue對(duì)象雖然已經(jīng)包含了必要的鎖,主要有q.put和q.get
而q.size(),q.full(),q.empty()等方法不是線程安全的
使用隊(duì)列進(jìn)行線程通信是一個(gè)單向、不確定的過(guò)程。通常情況下,是沒有辦法知道接收數(shù)據(jù)的線程是什么時(shí)候接收到的數(shù)據(jù)并開始工作的。但是隊(duì)列提供了一些基本的特性:q.task_done()和q.join()
如果一個(gè)線程需要在另外一個(gè)線程處理完特定的數(shù)據(jù)任務(wù)后立即得到通知,可以把要發(fā)送的數(shù)據(jù)和一個(gè)Event放到一起使用
關(guān)于線程中的Event
線程有一個(gè)非常關(guān)鍵的特性:每個(gè)線程都是獨(dú)立運(yùn)行的,且狀態(tài)不可預(yù)測(cè)
如果程序中的其他線程需要通過(guò)判斷每個(gè)線程的狀態(tài)來(lái)確定自己下一步的操作,這時(shí)線程同步問題就會(huì)比較麻煩。
解決方法:
使用threading庫(kù)中的Event
Event對(duì)象包含一個(gè)可由線程設(shè)置的信號(hào)標(biāo)志,它允許線程等待某些事件的發(fā)生。
在初始化狀態(tài)下,event對(duì)象中的信號(hào)標(biāo)志被設(shè)置為假。
如果有線程等待一個(gè)event對(duì)象,而這個(gè)event的標(biāo)志為假,這個(gè)線程將一直被阻塞知道該標(biāo)志為真。
一個(gè)線程如果把event對(duì)象的標(biāo)志設(shè)置為真,就會(huì)喚醒所有等待這個(gè)event對(duì)象的線程。
通過(guò)一個(gè)代碼例子理解:
from threading import Thread, Event import time def countdown(n, started_evt): print("countdown starting") # set將event的標(biāo)識(shí)設(shè)置為True started_evt.set() while n > 0: print("T-mins", n) n -= 1 time.sleep(2) # 初始化的started_evt為False started_evt = Event() print("Launching countdown") t = Thread(target=countdown, args=(10, started_evt,)) t.start() # 會(huì)一直等待直到event的標(biāo)志為True的時(shí)候 started_evt.wait() print("countdown is running")
而結(jié)果,我們也可以看出當(dāng)線程執(zhí)行了set之后,才打印running
實(shí)際用event對(duì)象最好是單次使用,創(chuàng)建一個(gè)event對(duì)象,讓某個(gè)線程等待這個(gè)對(duì)象,一旦對(duì)象被設(shè)置為Tru,就應(yīng)該丟棄它,我們雖然可以通過(guò)clear()方法重置event對(duì)象,但是這個(gè)沒法確保安全的清理event對(duì)象并對(duì)它進(jìn)行重新的賦值。會(huì)發(fā)生錯(cuò)過(guò)事件,死鎖等各種問題。
event對(duì)象的一個(gè)重要特點(diǎn)是它被設(shè)置為True時(shí)會(huì)喚醒所有等待它的線程,如果喚醒單個(gè)線程的最好用Condition或信號(hào)量Semaphore
和event功能類似的線程中還有一個(gè)Condition
關(guān)于線程中的Condition
關(guān)于Condition官網(wǎng)的一段話:
Aconditionvariableisalwaysassociatedwithsomekindoflock;thiscanbepassedinoronewillbecreatedbydefault.Passingoneinisusefulwhenseveralconditionvariablesmustsharethesamelock.Thelockispartoftheconditionobject:youdon'thavetotrackitseparately.
Othermethodsmustbecalledwiththeassociatedlockheld.Thewait()methodreleasesthelock,andthenblocksuntilanotherthreadawakensitbycallingnotify()ornotify_all().Onceawakened,wait()re-acquiresthelockandreturns.Itisalsopossibletospecifyatimeout.
但是需要注意的是:
notify()andnotify_all()這兩個(gè)方法,不會(huì)釋放鎖,這意味著線程或者被喚醒的線程不會(huì)立刻執(zhí)行wait()
我們可以通過(guò)Conditon對(duì)象實(shí)現(xiàn)一個(gè)周期定時(shí)器的功能,每當(dāng)定時(shí)器超時(shí)的時(shí)候,其他線程都可以檢測(cè)到,代碼例子如下:
import threading import time class PeriodicTimer: """ 這里做了一個(gè)定時(shí)器 """ def __init__(self, interval): self._interval = interval self._flag = 0 self._cv = threading.Condition() def start(self): t = threading.Thread(target=self.run) t.daemon = True t.start() def run(self): while True: time.sleep(self._interval) with self._cv: # 這個(gè)點(diǎn)還是非常有意思的^= self._flag ^= 1 self._cv.notify_all() def wait_for_tick(self): with self._cv: last_flag = self._flag while last_flag == self._flag: self._cv.wait() # 下面兩個(gè)分別為兩個(gè)需要定時(shí)執(zhí)行的任務(wù) def countdown(nticks): while nticks > 0: ptimer.wait_for_tick() print('T-minus', nticks) nticks -= 1 def countup(last): n = 0 while n < last: ptimer.wait_for_tick() print('Counting', n) n += 1 ptimer = PeriodicTimer(5) ptimer.start() threading.Thread(target=countdown, args=(10,)).start() threading.Thread(target=countup, args=(5,)).start()
關(guān)于線程中鎖的使用
要在多線程中安全使用可變對(duì)象,需要使用threading庫(kù)中的Lock對(duì)象
先看一個(gè)關(guān)于鎖的基本使用:
import threading class SharedCounter: def __init__(self, initial_value=0): self._value = initial_value self._value_lock = threading.Lock() def incr(self,delta = 1): with self._value_lock: self._value += delta def decr(self, delta=1): with self._value_lock: self._value -= delta
Lock對(duì)象和with語(yǔ)句塊一起使用可以保證互斥執(zhí)行,這樣每次就只有一個(gè)線程可以執(zhí)行with語(yǔ)句包含的代碼塊。with語(yǔ)句會(huì)在這個(gè)代碼快執(zhí)行前自動(dòng)獲取鎖,在執(zhí)行結(jié)束后自動(dòng)釋放所。
線程的調(diào)度本質(zhì)上是不確定的,因此,在多線程程序中錯(cuò)誤的使用鎖機(jī)制可能會(huì)導(dǎo)致隨機(jī)數(shù)據(jù)
損壞或者其他異常錯(cuò)誤,我們稱之為競(jìng)爭(zhēng)條件
你可能看到有些“老python程序員”
還是通過(guò)_value_lock.acquire()和_value_lock.release(),明顯看來(lái)
還是with更加方便,不容易出錯(cuò),畢竟你無(wú)法保證那次就忘記釋放鎖了
為了避免死鎖,使用鎖機(jī)制的程序應(yīng)該設(shè)定每個(gè)線程一次只能獲取一個(gè)鎖
threading庫(kù)中還提供了其他的同步原語(yǔ):RLock,Semaphore對(duì)象。但是這兩個(gè)使用場(chǎng)景相對(duì)來(lái)說(shuō)比較特殊
RLock(可重入鎖)可以被同一個(gè)線程多次獲取,主要用來(lái)實(shí)現(xiàn)基于檢測(cè)對(duì)象模式的鎖定和同步。在使用這種鎖的時(shí)候,當(dāng)鎖被持有時(shí),只有一個(gè)線程可以使用完整的函數(shù)或者類中的方法,例子如下:
import threading class SharedCounter: _lock = threading.RLock() def __init__(self,initial_value=0): self._value = initial_value def incr(self,delta=1): with SharedCounter._lock: self._value += delta def decr(self,delta=1): with SharedCounter._lock: self.incr(-delta)
這個(gè)例子中的鎖是一個(gè)類變量,也就是所有實(shí)例共享的類級(jí)鎖,這樣就保證了一次只有一個(gè)線程可以調(diào)用這個(gè)類的方法。與標(biāo)準(zhǔn)鎖不同的是已經(jīng)持有這個(gè)鎖的方法再調(diào)用同樣適用這個(gè)鎖的方法時(shí),無(wú)需再次獲取鎖,例如上面例子中的decr方法。
這種方法的特點(diǎn)是:無(wú)論這個(gè)類有多少實(shí)例都使用一個(gè)鎖。因此在需要使用大量使用計(jì)數(shù)器的情況下內(nèi)存效率更高。
缺點(diǎn):在程序中使用大量線程并頻繁更新計(jì)數(shù)器時(shí)會(huì)有競(jìng)爭(zhēng)用鎖的問題。
信號(hào)量對(duì)象是一個(gè)建立在共享計(jì)數(shù)器基礎(chǔ)上的同步原語(yǔ),如果計(jì)數(shù)器不為0,with語(yǔ)句講計(jì)數(shù)器減1,
線程被允許執(zhí)行。with語(yǔ)句執(zhí)行結(jié)束后,計(jì)數(shù)器加1。如果計(jì)數(shù)器為0,線程將被阻塞,直到其他線程結(jié)束并將計(jì)數(shù)器加1。但是信號(hào)量不推薦使用,增加了復(fù)雜性,影響程序性能。
所以信號(hào)量更適用于哪些需要在線程之間引入信號(hào)或者限制的程序。例如限制一段代碼的并發(fā)量
from threading import Semaphore import requests _fetch_url_sema = Semaphore(5) def fetch_url(url): with _fetch_url_sema: return requests.get(url)
關(guān)于防止死鎖的加鎖機(jī)制
在多線程程序中,死鎖問題很大一部分是由于多線程同時(shí)獲取多個(gè)鎖造成的。
舉個(gè)例子:一個(gè)線程獲取一個(gè)第一個(gè)鎖,在獲取第二個(gè)鎖的時(shí)候發(fā)生阻塞,那么這個(gè)線程就可能阻塞其他線程執(zhí)行,從而導(dǎo)致整個(gè)程序假死。
一種解決方法:為程序中每一個(gè)鎖分配一個(gè)唯一的id,然后只允許按照升序規(guī)則來(lái)使用多個(gè)鎖。
import threading from contextlib import contextmanager # 存儲(chǔ)已經(jīng)請(qǐng)求鎖的信息 _local = threading.local() @contextmanager def acquire(*locks): # 把鎖通過(guò)id進(jìn)行排序 locks = sorted(locks, key=lambda x: id(x)) acquired = getattr(_local, 'acquired', []) if acquired and max(id(lock) for lock in acquired) >= id(locks[0]): raise RuntimeError("Lock order Violation") acquired.extend(locks) _local.acquired = acquired try: for lock in locks: lock.acquire() yield finally: for lock in reversed(locks): lock.release() del acquired[-len(locks):] x_lock = threading.Lock() y_lock = threading.Lock() def thread_1(): while True: with acquire(x_lock,y_lock): print("Thread-1") def thread_2(): while True: with acquire(y_lock,x_lock): print("Thread-2") t1 = threading.Thread(target=thread_1) t1.daemon = True t1.start() t2 = threading.Thread(target=thread_2) t2.daemon = True t2.start()
通過(guò)排序,不管以什么樣的順序來(lái)請(qǐng)求鎖,這些鎖都會(huì)按照固定的順序被獲取。
這里也用了thread.local()來(lái)保存請(qǐng)求鎖的信息
同樣的這個(gè)東西也可以用來(lái)保存線程的信息,而這個(gè)線程對(duì)其他的線程是不可見的
總結(jié)
以上就是本文關(guān)于python并發(fā)編程之線程實(shí)例解析的全部?jī)?nèi)容,希望對(duì)大家有所幫助。感興趣的朋友可以繼續(xù)參閱本站其他相關(guān)專題,如有不足之處,歡迎留言指出。感謝朋友們對(duì)本站的支持!
相關(guān)文章
基于Python Shell獲取hostname和fqdn釋疑
一直以來(lái)被linux的hostname和fqdn(Fully Qualified Domain Name)困惑著,今天通過(guò)腳本之家平臺(tái)把它們使用細(xì)節(jié)弄清分享給大家2016-01-01