Python線程池thread?pool創(chuàng)建使用及實(shí)例代碼分享
前言
首先線程和線程池不管在哪個語言里面,理論都是通用的。對于開發(fā)來說,解決高并發(fā)問題離不開對多個線程處理。我們先從線程到線程池,從每個線程的運(yùn)行到多個線程并行,再到線程池管理。由淺入深的理解如何在實(shí)際開發(fā)中,使用線程池來提高處理線程的效率。
一、線程
1.線程介紹
線程(英語:thread)是操作系統(tǒng)能夠進(jìn)行運(yùn)算調(diào)度的最小單位。它被包含在進(jìn)程之中,是進(jìn)程中的實(shí)際運(yùn)作單位。一條線程指的是進(jìn)程中一個單一順序的控制流,一個進(jìn)程中可以并發(fā)多個線程,每條線程并行執(zhí)行不同的任務(wù)。在Unix System V及Sun中也被稱為輕量進(jìn)程(lightweight processes),但輕量進(jìn)程更多指內(nèi)核線程(kernel thread),而把用戶線程(user thread)稱為線程。
60年代,在OS中能擁有資源和獨(dú)立運(yùn)行的基本單位是進(jìn)程,然而隨著計算機(jī)技術(shù)的發(fā)展,進(jìn)程出現(xiàn)了很多弊端,一是由于進(jìn)程是資源擁有者,創(chuàng)建、撤消與切換存在較大的時空開銷,因此需要引入輕型進(jìn)程;二是由于對稱多處理機(jī)(SMP)出現(xiàn),可以滿足多個運(yùn)行單位,而多個進(jìn)程并行開銷過大。因此在80年代,出現(xiàn)了能獨(dú)立運(yùn)行的基本單位——線程(Threads)。
線程是獨(dú)立調(diào)度和分派的基本單位。線程可以為操作系統(tǒng)內(nèi)核調(diào)度的內(nèi)核線程,如Win32線程;由用戶進(jìn)程自行調(diào)度的用戶線程,如Linux平臺的POSIX Thread;或者由內(nèi)核與用戶進(jìn)程,如Windows 10的線程,進(jìn)行混合調(diào)度。
同一進(jìn)程中的多條線程將共享該進(jìn)程中的全部系統(tǒng)資源,如虛擬地址空間,文件描述符和信號處理等等。但同一進(jìn)程中的多個線程有各自的調(diào)用棧(call stack),自己的寄存器環(huán)境(register context),自己的線程本地存儲(thread-local storage)。
一個進(jìn)程可以有很多線程,每條線程并行執(zhí)行不同的任務(wù)。
2.線程特性
輕型實(shí)體
線程中的實(shí)體基本上不擁有系統(tǒng)資源,只是有一點(diǎn)必不可少的、能保證獨(dú)立運(yùn)行的資源。
線程的實(shí)體包括程序、數(shù)據(jù)和TCB。線程是動態(tài)概念,它的動態(tài)特性由線程控制塊TCB(Thread Control Block)描述。TCB包括以下信息:
- (1)線程狀態(tài)
- (2)當(dāng)線程不運(yùn)行時,被保存的現(xiàn)場資源。
- (3)一組執(zhí)行堆棧。
- (4)存放每個線程的局部變量主存區(qū)。
- (5)訪問同一個進(jìn)程中的主存和其它資源。
用于指示被執(zhí)行指令序列的、保留局部變量、少數(shù)狀態(tài)參數(shù)和返回地址等的一組寄存器和堆棧。
獨(dú)立調(diào)度和分派的基本單位
在多線程OS中,線程是能獨(dú)立運(yùn)行的基本單位,因而也是獨(dú)立調(diào)度和分派的基本單位。由于線程很“輕”,故線程的切換非常迅速且開銷?。ㄔ谕贿M(jìn)程中的)。
可并發(fā)執(zhí)行
在一個進(jìn)程中的多個線程之間,可以并發(fā)執(zhí)行,甚至允許在一個進(jìn)程中所有線程都能并發(fā)執(zhí)行;同樣,不同進(jìn)程中的線程也能并發(fā)執(zhí)行,充分利用和發(fā)揮了處理機(jī)與外圍設(shè)備并行工作的能力。
共享進(jìn)程資源
在同一進(jìn)程中的各個線程,都可以共享該進(jìn)程所擁有的資源,這首先表現(xiàn)在:所有線程都具有相同的地址空間(進(jìn)程的地址空間),這意味著,線程可以訪問該地址空間的每一個虛地址;此外,還可以訪問進(jìn)程所擁有的已打開文件、定時器等。由于同一個進(jìn)程內(nèi)的線程共享內(nèi)存和文件,所以線程之間互相通信不必調(diào)用內(nèi)核。
二、線程池
線程池(英語:thread pool):一種線程使用模式。線程過多會帶來調(diào)度開銷,進(jìn)而影響緩存局部性和整體性能。而線程池維護(hù)著多個線程,等待著監(jiān)督管理者分配可并發(fā)執(zhí)行的任務(wù)。這避免了在處理短時間任務(wù)時創(chuàng)建與銷毀線程的代價。線程池不僅能夠保證內(nèi)核的充分利用,還能防止過分調(diào)度??捎镁€程數(shù)量應(yīng)該取決于可用的并發(fā)處理器、處理器內(nèi)核、內(nèi)存、網(wǎng)絡(luò)sockets等的數(shù)量。 例如,線程數(shù)一般取cpu數(shù)量+2比較合適,線程數(shù)過多會導(dǎo)致額外的線程切換開銷。
任務(wù)調(diào)度以執(zhí)行線程的常見方法是使用同步隊(duì)列,稱作任務(wù)隊(duì)列。池中的線程等待隊(duì)列中的任務(wù),并把執(zhí)行完的任務(wù)放入完成隊(duì)列中。
線程池模式一般分為兩種:HS/HA半同步/半異步模式、L/F領(lǐng)導(dǎo)者與跟隨者模式。
HS/HA半同步/半異步模式:
半同步/半異步模式又稱為生產(chǎn)者消費(fèi)者模式,是比較常見的實(shí)現(xiàn)方式,比較簡單。分為同步層、隊(duì)列層、異步層三層。同步層的主線程處理工作任務(wù)并存入工作隊(duì)列,工作線程從工作隊(duì)列取出任務(wù)進(jìn)行處理,如果工作隊(duì)列為空,則取不到任務(wù)的工作線程進(jìn)入掛起狀態(tài)。由于線程間有數(shù)據(jù)通信,因此不適于大數(shù)據(jù)量交換的場合。
L/F領(lǐng)導(dǎo)者與跟隨者模式:
領(lǐng)導(dǎo)者跟隨者模式,在線程池中的線程可處在3種狀態(tài)之一:領(lǐng)導(dǎo)者leader、追隨者follower或工作者processor。任何時刻線程池只有一個領(lǐng)導(dǎo)者線程。事件到達(dá)時,領(lǐng)導(dǎo)者線程負(fù)責(zé)消息分離,并從處于追隨者線程中選出一個來當(dāng)繼任領(lǐng)導(dǎo)者,然后將自身設(shè)置為工作者狀態(tài)去處置該事件。處理完畢后工作者線程將自身的狀態(tài)置為追隨者。這一模式實(shí)現(xiàn)復(fù)雜,但避免了線程間交換任務(wù)數(shù)據(jù),提高了CPU cache相似性。在ACE(Adaptive Communication Environment)中,提供了領(lǐng)導(dǎo)者跟隨者模式實(shí)現(xiàn)。
線程池的伸縮性對性能有較大的影響。
- 創(chuàng)建太多線程,將會浪費(fèi)一定的資源,有些線程未被充分使用。
- 銷毀太多線程,將導(dǎo)致之后浪費(fèi)時間再次創(chuàng)建它們。
- 創(chuàng)建線程太慢,將會導(dǎo)致長時間的等待,性能變差。
- 銷毀線程太慢,導(dǎo)致其它線程資源饑餓。
在面向?qū)ο缶幊讨?,?chuàng)建和銷毀對象是很費(fèi)時間的,因?yàn)閯?chuàng)建一個對象要獲取內(nèi)存資源或者其它更多資源。在Java中更是如此,虛擬機(jī)將試圖跟蹤每一個對象,以便能夠在對象銷毀后進(jìn)行垃圾回收。所以提高服務(wù)程序效率的一個手段就是盡可能減少創(chuàng)建和銷毀對象的次數(shù),特別是一些很耗資源的對象創(chuàng)建和銷毀。如何利用已有對象來服務(wù)就是一個需要解決的關(guān)鍵問題,其實(shí)這就是一些""池化資源""技術(shù)產(chǎn)生的原因。比如大家所熟悉的數(shù)據(jù)庫連接池正是遵循這一思想而產(chǎn)生的,本文將介紹的線程池技術(shù)同樣符合這一思想。
三、線程池的設(shè)計思路
首先我們根據(jù)上述已經(jīng)了解了線程和線程池創(chuàng)建目的以及作用。讓我們自己思考一下,如果是自己的業(yè)務(wù)上要用到大量的請求或者是查詢處理,而我們只能的機(jī)器并不能一下就接受這么多的task涌入計算,這將消耗我們計算機(jī)大量資源。這時我們就該創(chuàng)建線程池來對線程進(jìn)行管理,我們可以給線程預(yù)留一定的空間,讓請求逐個進(jìn)入線程處理,當(dāng)請求超過我們給的線程數(shù)量時,等一個線程跑完了再跑下一個,這樣就不會造成資源的浪費(fèi)和達(dá)到資源重復(fù)利用。
那么我們建立線程池的思路就有一下幾點(diǎn):
- 控制線程,給予每個線程任務(wù)保證線程正常運(yùn)行。
- 限制線程數(shù)量,保證系統(tǒng)有足夠的運(yùn)行空間。
- 資源復(fù)用,保證每個線程運(yùn)行完成任務(wù)后能再度利用。
- 控制運(yùn)行時間,線程運(yùn)行超過一定時間后停止任務(wù)轉(zhuǎn)接下個任務(wù),防止線程堵塞。
有了這些思路,我們就可以充分利用Python自帶的庫來構(gòu)建線程池了。
四、Python線程池構(gòu)建
1.構(gòu)建思路
第一步,我們需要在線程池里面創(chuàng)建出很多個線程。第二步,當(dāng)?shù)玫揭粋€請求時候,就使用一個線程來運(yùn)行·它。第三步,若多個任務(wù)則分配多個線程來運(yùn)行。當(dāng)其中一個線程運(yùn)行完它的任務(wù)之后,將再次進(jìn)行下一個任務(wù)使用。
2.實(shí)現(xiàn)庫功能函數(shù)
首先python標(biāo)準(zhǔn)庫里面是有threading庫的,但是該庫并沒有線程池這個模塊。要快速構(gòu)建線程池,可以利用concurrent.futures,該庫提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,實(shí)現(xiàn)了對threading和multiprocessing的進(jìn)一步抽象。
這里我們只討論ThreadPoolExecutor:
from concurrent.futures import ThreadPoolExecutor
這里我們可以看JAVA關(guān)于線程池的設(shè)計:
構(gòu)造方法:
public ThreadPoolExecutor(int corePoolSize, //核心線程數(shù)量 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? int maximumPoolSize,// ? ? 最大線程數(shù) ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? long keepAliveTime, // ? ? ? 最大空閑時間 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? TimeUnit unit, ? ? ? ? // ? ? ? ?時間單位 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? BlockingQueue<Runnable> workQueue, ? // ? 任務(wù)隊(duì)列 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ThreadFactory threadFactory, ? ?// 線程工廠 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? RejectedExecutionHandler handler ?// ?飽和處理機(jī)制 ?? ?)? { ... }
參數(shù)和Python創(chuàng)建線程池是一樣的,python創(chuàng)建線程池:
#encoding:utf-8 from concurrent.futures import ThreadPoolExecutor import threading #創(chuàng)建一個包含2條線程的線程池 pool = ThreadPoolExecutor(max_workers = 2) ?#定義兩個線程
這樣就建立了一條簡單的線程池,其中最大線程數(shù)為2 .
def task(i): ? ? sleep_seconds = random.randint(1, 3) ? ?#隨機(jī)睡眠時間 ? ? print('線程名稱:%s,參數(shù):%s,睡眠時間:%s' % (threading.current_thread().name, i, sleep_seconds)) ? ? time.sleep(sleep_seconds) ? #定義睡眠時間 for i in range(10):#創(chuàng)建十個任務(wù) ? ? future1 = pool.submit(task, i)
ThreadPoolExecutor()
構(gòu)造線程池實(shí)例,傳入max_workers可以設(shè)置線程池中最多能同時運(yùn)行的線程數(shù)目
submit()
提交線程需要執(zhí)行的任務(wù)(函數(shù)名和參數(shù))到線程池中,立刻返回一個future對象。
result()
取task的執(zhí)行結(jié)果:
cancel()
取消該 Future 代表的線程任務(wù)。如果該任務(wù)正在執(zhí)行,不可取消,則該方法返回 False;否則,程序會取消該任務(wù),并返回 True。
調(diào)高點(diǎn)睡眠時間:
cancelled()
返回 Future 代表的線程任務(wù)是否被成功取消。
for i in range(5):#創(chuàng)建十個線程 ? ? future1 = pool.submit(task, i) ? ? print(future1.cancelled())
running()
for i in range(5):#創(chuàng)建十個線程 ? ? future1 = pool.submit(task, i) ? ? print(future1.running())
as_completed()
as_completed()方法是一個生成器,在沒有任務(wù)完成的時候,會阻塞,在有某個任務(wù)完成的時候,會yield這個任務(wù),就能執(zhí)行for循環(huán)下面的語句,然后繼續(xù)阻塞住,循環(huán)到所有的任務(wù)結(jié)束。從結(jié)果也可以看出,先完成的任務(wù)會先通知主線程。
map()
除了submit,ThreadPoolExecutor還提供了map函數(shù)來添加線程,與常規(guī)的map類似,區(qū)別在于線程池的 map() 函數(shù)會為 iterables 的每個元素啟動一個線程,以并發(fā)方式來執(zhí)行 func 函數(shù). 同時,使用map函數(shù),還會自動獲取返回值。
#向線程池提交5個任務(wù) x = np.arange(5) for i in pool.map(task,x): ? ? ? print('successful')
到此這篇關(guān)于Python線程池(thread pool)創(chuàng)建使用及實(shí)例代碼分享的文章就介紹到這了,更多相關(guān)Python線程池創(chuàng)建內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
利用 Python 實(shí)現(xiàn)多任務(wù)進(jìn)程
這篇文章主要介紹如何利用 Python 實(shí)現(xiàn)多任務(wù)進(jìn)程,正在執(zhí)行的程序,由程序、數(shù)據(jù)和進(jìn)程控制塊組成,是正在執(zhí)行的程序,程序的一次執(zhí)行過程,是資源調(diào)度的基本單位。下面就來詳細(xì)介紹改內(nèi)容,需要的朋友可以參考一下2021-10-10Python腳本實(shí)現(xiàn)網(wǎng)卡流量監(jiān)控
這篇文章主要介紹了Python腳本實(shí)現(xiàn)網(wǎng)卡流量監(jiān)控,本文直接給出實(shí)現(xiàn)代碼,需要的朋友可以參考下2015-02-02pytorch中Tensor.to(device)和model.to(device)的區(qū)別及說明
這篇文章主要介紹了pytorch中Tensor.to(device)和model.to(device)的區(qū)別及說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-07-07