詳解python中的線程與線程池
線程
進(jìn)程和線程
什么是進(jìn)程?
進(jìn)程就是正在運(yùn)行的程序, 一個(gè)任務(wù)就是一個(gè)進(jìn)程, 進(jìn)程的主要工作是管理資源, 而不是實(shí)現(xiàn)功能
什么是線程?
線程的主要工作是去實(shí)現(xiàn)功能, 比如執(zhí)行計(jì)算.
線程和進(jìn)程的關(guān)系就像員工與老板的關(guān)系,
老板(進(jìn)程) 提供資源 和 工作空間,
員工(線程) 負(fù)責(zé)去完成相應(yīng)的任務(wù)
特點(diǎn)
一個(gè)進(jìn)程至少由一個(gè)線程, 這一個(gè)必須存在的線程被稱為主線程, 同時(shí)一個(gè)進(jìn)程也可以有多個(gè)線程, 即多線程
當(dāng)我們我們遇到一些需要重復(fù)執(zhí)行的代碼時(shí), 就可以使用多線程分擔(dān)一些任務(wù), 進(jìn)而加快運(yùn)行速度
線程的實(shí)現(xiàn)
線程模塊
Python通過(guò)兩個(gè)標(biāo)準(zhǔn)庫(kù)_thread和threading, 提供對(duì)線程的支持 , threading對(duì)_thread進(jìn)行了封裝。
threading模塊中提供了Thread , Lock , RLock , Condition等組件。
因此在實(shí)際的使用中我們一般都是使用threading來(lái)實(shí)現(xiàn)多線程
線程包括子線程和主線程:
主線程 : 當(dāng)一個(gè)程序啟動(dòng)時(shí) , 就有一個(gè)線程開始運(yùn)行 , 該線程通常叫做程序的主線程
子線程 : 因?yàn)槌绦蚴情_始時(shí)就執(zhí)行的 , 如果你需要再創(chuàng)建線程 , 那么創(chuàng)建的線程就是這個(gè)主線程的子線程
主線程的重要性體現(xiàn)在兩方面 :
- 是產(chǎn)生其他子線程的線程
- 通常它必須最后完成執(zhí)行, 比如執(zhí)行各種關(guān)閉操作
Thread類
常用參數(shù)說(shuō)明
參數(shù) | 說(shuō)明 |
---|---|
target | 表示調(diào)用的對(duì)象, 即子線程要執(zhí)行的任務(wù), 可以是某個(gè)內(nèi)置方法, 或是你自己寫的函數(shù) |
name | 子線程的名稱 |
args | 傳入target函數(shù)中的位置參數(shù), 是一個(gè)元組, 參數(shù)后必須加逗號(hào) |
常用實(shí)例方法
方法 | 作用 |
---|---|
Thread.run(self) | 線程啟動(dòng)時(shí)運(yùn)行的方法, 由該方法調(diào)用 target參數(shù)所指定的函數(shù) |
Thread.start(self) | 啟動(dòng)進(jìn)程, start方法就是區(qū)幫你調(diào)用run方法 |
Thread.terminate(self) | 強(qiáng)制終止線程 |
Thread.join(self, timeout=None) | 阻塞調(diào)用, 主線程進(jìn)行等待 |
Thread.setDaemon(self, daemonic) | 將子線程設(shè)置為守護(hù)線程, 隨主線程結(jié)束而結(jié)束 |
Thread.getName(self, name) | 獲取線程名 |
Thread.setName(self, name) | 設(shè)置線程名 |
創(chuàng)建線程
在python中創(chuàng)建線程有兩種方式, 實(shí)例Thread類和繼承重寫Thread類
實(shí)例Thread類
import threading import time def run(name, s): # 線程要執(zhí)行的任務(wù) time.sleep(s) # 停兩秒 print('I am %s' % name) # 實(shí)例化線程類, 并傳入函數(shù)及其參數(shù), t1 = threading.Thread(target=run, name='one', args=('One', 5)) t2 = threading.Thread(target=run, name='two', args=('Two', 2)) # 開始執(zhí)行, 這兩個(gè)線程會(huì)同步執(zhí)行 t1.start() t2.start() print(t1.getName()) # 獲取線程名 print(t2.getName()) # Result: one two I am Two # 運(yùn)行2s后 I am One # 運(yùn)行5s后
繼承Thread類
class MyThread(threading.Thread): # 繼承threading中的Thread類 # 線程所需的參數(shù) def __init__(self, name, second): super().__init__() self.name = name self.second = second # 重寫run方法,表示線程所執(zhí)行的任務(wù),必須有 def run(self): time.sleep(self.second) print('I am %s' % self.name) # 創(chuàng)建線程實(shí)例 t1 = MyThread('One', 5) t2 = MyThread('Two', 2) # 啟動(dòng)線程,實(shí)際上是調(diào)用了類中的run方法 t1.start() t2.start() t1.join() print(t1.getName()) print(t2.getName()) # Result: I am Two # 運(yùn)行后2s I am One # 運(yùn)行后5s One Two
常用方法
join()
阻塞調(diào)用程序 , 直到調(diào)用join () 方法的線程執(zhí)行結(jié)束, 才會(huì)繼續(xù)往下執(zhí)行
# 開始執(zhí)行, 這兩個(gè)線程會(huì)同步執(zhí)行 t1.start() t2.start() t1.join() # 等待t1線程執(zhí)行完畢,再繼續(xù)執(zhí)行剩余的代碼 print(t1.getName()) print(t2.getName()) # Result: I am Two I am One one two
setDemon()
使用給線程設(shè)置守護(hù)模式: 子線程跟隨主線程的結(jié)束而結(jié)束, 不管這個(gè)子線程任務(wù)是否完成. 而非守護(hù)模式的子線程只有在執(zhí)行完成后, 主線程才會(huì)執(zhí)行完成
setDaemon() 與 join() 基本上是相對(duì)的 , join會(huì)等子線程執(zhí)行完畢 ; 而setDaemon則不會(huì)等
def run(name, s): # 線程要執(zhí)行的函數(shù) time.sleep(s) # 停兩秒 print('I am %s' % name) # 實(shí)例化線程類, 并傳入函數(shù)及其參數(shù) t1 = threading.Thread(target=run, name='one', args=('One', 5)) t2 = threading.Thread(target=run, name='two', args=('Two', 2)) # 給t1設(shè)置守護(hù)模式, 使其隨著主線程的結(jié)束而結(jié)束 t1.setDaemon(True) # 開始執(zhí)行, 這兩個(gè)線程會(huì)同步執(zhí)行 t1.start() t2.start() # 主線程會(huì)等待未設(shè)置守護(hù)模式的線程t2執(zhí)行完成 # Result: I am Two # 運(yùn)行后2s
線程間的通信
互斥鎖
在同一個(gè)進(jìn)程的多線程中 , 其中的變量對(duì)于所有線程來(lái)說(shuō)都是共享的 , 因此 , 如果多個(gè)線程之間同時(shí)修改一個(gè)變量 , 那就亂套了 , 共享的數(shù)據(jù)就會(huì)有很大的風(fēng)險(xiǎn) , 所以我們需要互斥鎖 , 來(lái)鎖住數(shù)據(jù) , 防止篡改。
來(lái)看一個(gè)錯(cuò)誤的示范:
a = 0 def incr(n): global a for i in range(n): a += 1 # 這兩個(gè)方法同時(shí)聲明了變量a,并對(duì)其進(jìn)行修改 def decr(n): global a for i in range(n): a -= 1 t_incr = threading.Thread(target=incr, args=(1000000,)) t_decr = threading.Thread(target=decr, args=(1000000,)) t_incr.start() t_decr.start() t_incr.join() t_decr.join() print(a) # 期望結(jié)果應(yīng)該是0, 但是因?yàn)檫@里沒(méi)有設(shè)置互斥鎖, 所以兩個(gè)方法是同時(shí)對(duì)同一個(gè)變量進(jìn)行修改, 得到的的結(jié)果值是隨機(jī)的
下面我們改一下上面的代碼 , 兩個(gè)方法加上互斥鎖:
a = 0 lock = threading.Lock() # 實(shí)例化互斥鎖對(duì)象, 方便之后的調(diào)用 def incr(n): global a for i in range(n): lock.acquire() # 上鎖的方法 a += 1 lock.release() # 解鎖的方法 # 要注意的是上鎖的位置是, 出現(xiàn)修改操作的代碼 def decr(n): global a for i in range(n): with lock: # 也可以直接使用with, 自動(dòng)解鎖 a -= 1 t_incr = threading.Thread(target=incr, args=(1000000,)) t_decr = threading.Thread(target=decr, args=(1000000,)) t_incr.start() t_decr.start() t_incr.join() t_decr.join() print(a) # Result: 0
在容易出現(xiàn)搶奪資源的地方進(jìn)行上鎖 , 實(shí)現(xiàn)同一時(shí)間內(nèi) , 只有一個(gè)線程可以對(duì)對(duì)象進(jìn)行操作
常用方法
關(guān)鍵字 | 解釋 |
---|---|
put(item) | 入隊(duì) , 將item放入隊(duì)列中 , 在隊(duì)列為滿時(shí)插入值會(huì)發(fā)生阻塞(1) |
get() | 出隊(duì) , 從隊(duì)列中移除并返回一個(gè)數(shù)據(jù) , 在隊(duì)列為空時(shí)獲取值會(huì)發(fā)生阻塞 |
task_done() | 任務(wù)結(jié)束 , 意味著之前入隊(duì)的一個(gè)任務(wù)已經(jīng)完成。由隊(duì)列的消費(fèi)者線程調(diào)用 |
join() | 等待完成 , 阻塞調(diào)用線程,直到隊(duì)列中的所有任務(wù)被處理掉。 |
empty() | 如果隊(duì)列為空,返回True,反之返回False |
full() | 如果隊(duì)列為滿,返回True,反之返回False |
qsize() | 隊(duì)列長(zhǎng)度 , 返回當(dāng)前隊(duì)列的數(shù)據(jù)量 |
(1): 阻塞: 程序停在阻塞的位置 , 無(wú)法繼續(xù)執(zhí)行
導(dǎo)入和實(shí)例化
import queue q = queue.Queue(4) # 實(shí)例化隊(duì)列對(duì)象, 并設(shè)置最大數(shù)據(jù)量
put() 和 get()
q.put('a') q.put('b') print(q.get()) # : a print(q.get()) # : b q.task_done() # get后必須要加task_done,確認(rèn)get操作是否完成
q.put(1) # 當(dāng)前隊(duì)列已滿,再次put就會(huì)阻塞 print(q.full()) # 由于已經(jīng)阻塞, 所以這段不會(huì)被執(zhí)行 # put會(huì)在隊(duì)列慢了點(diǎn)時(shí)候,在插入值會(huì)發(fā)生阻塞 # get會(huì)在隊(duì)列里沒(méi)有值的時(shí)候,會(huì)發(fā)生阻塞
empty()
print(q.empty()) # 判斷隊(duì)列是否為空: True q.put('test') print(q.empty()) # : False
qsize()
print(q.qsize()) # 當(dāng)前隊(duì)列里有多少人: 1
full()
q.put(1) q.put(1) q.put(1) print(q.full()) # : True
join()
print('testetsetset') q.join() # join會(huì)在隊(duì)列非空時(shí)發(fā)生阻塞 print('done') # 由于已經(jīng)阻塞, 所以這段不會(huì)被執(zhí)行
線程池
池的概念
線程池中實(shí)現(xiàn)準(zhǔn)備好了一些可以重復(fù)使用的線程 , 等待接受任務(wù)并執(zhí)行
主線程提交任務(wù)給 線程池 , 線程池中的每個(gè)線程會(huì)一次一個(gè)的接收任務(wù)并執(zhí)行 , 直到主線程執(zhí)行結(jié)束
主線程: 相當(dāng)于生產(chǎn)者,只管向線程池提交任務(wù)。
并不關(guān)心線程池是如何執(zhí)行任務(wù)的。
因此,并不關(guān)心是哪一個(gè)線程執(zhí)行的這個(gè)任務(wù)。
線程池: 相當(dāng)于消費(fèi)者,負(fù)責(zé)接收任務(wù),
并將任務(wù)分配到一個(gè)空閑的線程中去執(zhí)行。
自定義線程池
import queue import threading import time class ThreadPool: # 自定義線程池 def __init__(self, n): # 主線程做 self.queue_obj = queue.Queue() for i in range(n): threading.Thread(target=self.worker, daemon=True).start() # 給子線程worker設(shè)置為守護(hù)模式 def worker(self): # 子線程做,由于Debug調(diào)試的只是主線程的代碼,所以在調(diào)試時(shí)看不到子線程執(zhí)行的代碼 """線程對(duì)象,寫while True 是為了能夠一直執(zhí)行任務(wù)。""" while True: # 讓線程執(zhí)行完一個(gè)任務(wù)之后不會(huì)死掉,主線程結(jié)束時(shí),守護(hù)模式會(huì)讓worker里的死循環(huán)停止 func = self.queue_obj.get() # get已經(jīng)入隊(duì)的任務(wù), 這里會(huì)接收到主線程分配的func # 由于設(shè)置了守護(hù)模式,當(dāng)隊(duì)列為空時(shí),不會(huì)一直阻塞在get這里 # 有了守護(hù)模式,worker會(huì)在主線程執(zhí)行完畢后死掉 func() # 將隊(duì)列里的任務(wù)拿出來(lái)調(diào)用 """ 這里func與task_done的順序非常重要,如果func放在task_done后面的話會(huì)出現(xiàn)只執(zhí)行兩次就結(jié)束。 """ self.queue_obj.task_done() # task_done 會(huì)刷新計(jì)數(shù)器 # 線程池里有一個(gè)類似計(jì)數(shù)器的機(jī)制,用來(lái)記錄put的次數(shù)(+1),每一次task_done都會(huì)回?fù)芤淮斡涗浀拇螖?shù)(-1) # 當(dāng)回?fù)芡暧?jì)數(shù)器為0之后,就會(huì)執(zhí)行join def apply_async(self, func): # 主線程做 """向隊(duì)列中傳入需要執(zhí)行的函數(shù)對(duì)象""" self.queue_obj.put(func) # 將接收到的func入隊(duì) def join(self): # 主線程做 """等待隊(duì)列中的內(nèi)容被取完""" self.queue_obj.join() # 隊(duì)列里不為空就阻塞,為空就不阻塞
簡(jiǎn)單使用
def task1(): # 子線程做 time.sleep(2) print('task1 over') def task2(): # 子線程做 time.sleep(3) print('task2 over') P = ThreadPool(2) # 如果在start開啟線程之后沒(méi)有傳入任務(wù)對(duì)象,worker里的get會(huì)直接阻塞 P.apply_async(task1) P.apply_async(task2) print('start') P.join() print('done') # Result: start task1 over task2 over done
如果get發(fā)生阻塞意味著隊(duì)列為空,意味著join不阻塞,意味著print('done')會(huì)執(zhí)行,
意味著主線程沒(méi)有任務(wù)在做,意味著主線程結(jié)束,意味著不等待設(shè)置了守護(hù)的線程執(zhí)行任務(wù),
意味著子線程會(huì)隨著主線程的死亡而死亡,這就是為什么會(huì)設(shè)置守護(hù)模式。
如果沒(méi)有設(shè)置守護(hù)模式意味著get發(fā)生阻塞,意味著子線程任務(wù)執(zhí)行不完,意味著主線程一直要等子線程完成,
意味著程序一直都結(jié)束不了,意味著程序有問(wèn)題
python內(nèi)置線程池
原理
- 創(chuàng)建線程池
- 將任務(wù)扔進(jìn)去
- 關(guān)閉線程池
- 等待線程任務(wù)執(zhí)行完畢
'''手動(dòng)實(shí)現(xiàn)線程池:
主要是配合隊(duì)列來(lái)進(jìn)行實(shí)現(xiàn),我們定義好一個(gè)隊(duì)列對(duì)象,然后將我們的任務(wù)對(duì)象put到我們的隊(duì)列對(duì)象中,
然后使用多線程,讓我們的線程去get隊(duì)列種的對(duì)象,然后各自去執(zhí)行自己get到的任務(wù),
這樣的話其實(shí)也就實(shí)現(xiàn)了線程池
'''
使用方法
from multiprocessing.pool import ThreadPool import time pool = ThreadPool(2) # 直接使用內(nèi)置線程池, 設(shè)置最大線程數(shù) def task1(): time.sleep(2) print('task1 over') def task2(*args, **kwargs): time.sleep(3) print('task2 over', args, kwargs) pool.apply_async(task1) pool.apply_async(task2, args=(1, 2), kwds={'a': 1, 'b': 2}) print('Task Submitted') pool.close() # 要點(diǎn): close必須要在join之前, 不允許再提交任務(wù)了 pool.join() print('Mission Complete') # Result: Task Submitted task1 over task2 over (1, 2) {'a': 1, 'b': 2} Mission Complete
其他操作
操作一: close - 關(guān)閉提交通道,不允許再提交任務(wù)
操作二: terminate - 中止進(jìn)程池,中止所有任務(wù)
以上所述是小編給大家介紹的python線程與線程池詳解整合,希望對(duì)大家有所幫助,如果大家有任何疑問(wèn)請(qǐng)給我留言,小編會(huì)及時(shí)回復(fù)大家的。在此也非常感謝大家對(duì)腳本之家網(wǎng)站的支持!
- 解決python ThreadPoolExecutor 線程池中的異常捕獲問(wèn)題
- Python線程池模塊ThreadPoolExecutor用法分析
- 實(shí)例代碼講解Python 線程池
- Python 如何創(chuàng)建一個(gè)線程池
- python線程池如何使用
- Python定時(shí)器線程池原理詳解
- python Event事件、進(jìn)程池與線程池、協(xié)程解析
- Python 線程池用法簡(jiǎn)單示例
- python線程池threadpool實(shí)現(xiàn)篇
- python線程池threadpool使用篇
- python線程池 ThreadPoolExecutor 的用法示例
相關(guān)文章
編寫Python腳本把sqlAlchemy對(duì)象轉(zhuǎn)換成dict的教程
這篇文章主要介紹了編寫Python腳本把sqlAlchemy對(duì)象轉(zhuǎn)換成dict的教程,主要是基于Python的model類構(gòu)建一個(gè)轉(zhuǎn)換的方法,需要的朋友可以參考下2015-05-05Python簡(jiǎn)單實(shí)現(xiàn)gif動(dòng)圖倒放示例
這篇文章主要為大家介紹了Python簡(jiǎn)單實(shí)現(xiàn)gif動(dòng)圖倒放的示例過(guò)程,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-05-05python數(shù)據(jù)庫(kù)批量插入數(shù)據(jù)的實(shí)現(xiàn)(executemany的使用)
這篇文章主要介紹了python數(shù)據(jù)庫(kù)批量插入數(shù)據(jù)的實(shí)現(xiàn)(executemany的使用),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04基于python實(shí)現(xiàn)操作git過(guò)程代碼解析
這篇文章主要介紹了基于python實(shí)現(xiàn)操作git過(guò)程代碼解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-07-07一行Python3代碼實(shí)現(xiàn)解析地址信息
在日常數(shù)據(jù)處理的過(guò)程中,有時(shí)候拿到手的是完整的地址信息,如果需要從地址信息中解析出相應(yīng)的各級(jí)行政單元名稱,方式有很多,而今天要介紹的方式只需要一行代碼即可快速實(shí)現(xiàn),快跟隨小編一起學(xué)習(xí)一下吧2022-05-05解決python報(bào)錯(cuò):AttributeError:?'ImageDraw'?object?h
這篇文章主要給大家介紹了關(guān)于解決python報(bào)錯(cuò):AttributeError:?'ImageDraw'?object?has?no?attribute?'textbbox'的相關(guān)資料,文中通過(guò)圖文介紹的非常詳細(xì),需要的朋友可以參考下2024-01-01