Python多線程threading模塊實(shí)例詳解
什么是多線程
線程(thread)是操作系統(tǒng)中能夠進(jìn)行運(yùn)算的最小單位,包含于進(jìn)程之中,一個(gè)進(jìn)程可以有多個(gè)線程,這意味著一個(gè)進(jìn)程中可以并發(fā)多個(gè)線程,即為多線程。
對(duì)于一個(gè)python程序,如果需要同時(shí)大量處理多個(gè)任務(wù),有使用多進(jìn)程和多線程兩種方法。在python中,實(shí)現(xiàn)多線程主要通過threading模塊,而多進(jìn)程主要通過multiprocessing模塊。
這兩個(gè)模塊的主要區(qū)別是:threading模塊基于線程,而multiprocessing模塊基于進(jìn)程。threading模塊使用共享內(nèi)存來實(shí)現(xiàn)多線程,所有線程都共享一樣的變量(這點(diǎn)在后續(xù)的實(shí)例中可以感受到);而multiprocessing基于子進(jìn)程,每個(gè)子進(jìn)程之間都有獨(dú)立的變量和數(shù)據(jù)結(jié)構(gòu)。兩者的區(qū)別意味著threading更使用于I/O密集型任務(wù)(例如需要進(jìn)行多表格讀取操作),multiprocessing模塊更適用于包含較多計(jì)算的CPU密集型任務(wù)(矩陣運(yùn)算,圖片處理類任務(wù))。
需要注意的是,由于python中的GIL鎖的存在,Python解釋器只允許一個(gè)Python進(jìn)程使用,這意味著對(duì)于一個(gè)解釋器只允許一個(gè)進(jìn)程在運(yùn)行,這也是為什么threading模塊無法適用于CPU密集型這類需要大量CPU資源的任務(wù),因?yàn)橐粋€(gè)進(jìn)程的CPU資源有限,無論開啟多少個(gè)線程,總的資源就只有那些,總耗時(shí)不會(huì)有太大變化。而multiprocessing模塊則可以開多個(gè)進(jìn)程,能夠更快速的處理CPU密集型任務(wù)。
關(guān)于GIL鎖和Multiprocessing模塊的部分就不繼續(xù)深入介紹了,本次主要介紹如何使用threading模塊實(shí)現(xiàn)多線程的相關(guān)內(nèi)容。
線程完整生命周期
一個(gè)線程完整的生命周期包括新建——就緒——運(yùn)行——阻塞——死亡。
- 新建:即新創(chuàng)建一個(gè)線程對(duì)象
- 就緒:調(diào)用start方法后,線程對(duì)象等待運(yùn)行,什么時(shí)候開始運(yùn)行取決于調(diào)度
- 運(yùn)行:線程處于運(yùn)行狀態(tài)
- 阻塞:處于運(yùn)行狀態(tài)的線程被堵塞,通俗理解就是被卡住了,可能的原因包括但不限于程序自身調(diào)用sleep方法阻塞線程運(yùn)行,或調(diào)用了一個(gè)阻塞式I/O方法,被阻塞的進(jìn)程會(huì)等待何時(shí)解除阻塞重新運(yùn)行
- 死亡:線程執(zhí)行完畢或異常退出,線程對(duì)象被銷毀并釋放內(nèi)存
主線程與子線程
我們講的多線程實(shí)際上指的就是只在主線程中運(yùn)行多個(gè)子線程,而主線程就是我們的python編譯器執(zhí)行的線程,所有子線程和主線程都同屬于一個(gè)進(jìn)程。在未添加子線程的情況下,默認(rèn)就只有一個(gè)主線程在運(yùn)行,他會(huì)將我們寫的代碼從開頭到結(jié)尾執(zhí)行一遍,后文中我們也會(huì)提到一些主線程與子線程的關(guān)系。
不扯那么多概念了,接下來直接進(jìn)入正題!
實(shí)例1-直接使用Thread創(chuàng)建線程對(duì)象
Thread類創(chuàng)建新線程的基本語法如下:
Newthread= Thread(target=function, args=(argument1,argument2,...))
- Newthread: 創(chuàng)建的線程對(duì)象
- function: 要執(zhí)行的函數(shù)
- argument1,argument2: 傳遞給線程函數(shù)的參數(shù),為tuple類型
假設(shè)一個(gè)任務(wù)task(當(dāng)然task可以替換為其他任何任務(wù),本實(shí)例中僅為假設(shè)),這個(gè)任務(wù)實(shí)現(xiàn)的功能是每隔1s打印某個(gè)字母,我們使用兩個(gè)子線程,分別同時(shí)打印不同的字母a和b,實(shí)例如下:
""" <case1: 直接使用threading中的Thread類創(chuàng)建線程> """ from threading import Thread import time from time import sleep # 自定義的函數(shù),可以替換成其他任何函數(shù) def task(threadName, number, letter): print(f"【線程開始】{threadName}") m = 0 while m < number: sleep(1) m += 1 current_time = time.strftime('%H:%M:%S', time.localtime()) print(f"[{current_time}] {threadName} 輸出 {letter}") print(f"【線程結(jié)束】{threadName}") thread1 = Thread(target=task, args=("thread_1", 4, "a")) # 線程1:執(zhí)行任務(wù)打印4個(gè)a thread2 = Thread(target=task, args=("thread_2", 2, "b")) # 線程2:執(zhí)行任務(wù)打印2個(gè)b thread1.start() # 線程1開始 thread2.start() # 線程2開始 thread1.join() # 等待線程1結(jié)束 thread2.join() # 等待線程2結(jié)束
其輸出為:
【線程開始】thread_1
【線程開始】thread_2
[13:42:00] thread_1 輸出 a
[13:42:00] thread_2 輸出 b
[13:42:01] thread_1 輸出 a
[13:42:01] thread_2 輸出 b
【線程結(jié)束】thread_2
[13:42:02] thread_1 輸出 a
[13:42:03] thread_1 輸出 a
【線程結(jié)束】thread_1
線程thread1和thread2同時(shí)開始,thread2打印2個(gè)b后結(jié)束,而thread1繼續(xù)打印a直到完成。
實(shí)例2-使用join阻塞線程
在前一個(gè)實(shí)例中我們可以看到在結(jié)尾有thread1.join()和thread2.join()兩個(gè)語句,這兩個(gè)語句出現(xiàn)在末尾表示主線程會(huì)等待所有的子線程執(zhí)行完成,當(dāng)然了,由于默認(rèn)我們創(chuàng)建的子線程是前臺(tái)線程(這個(gè)概念會(huì)在后面提到),如果沒有join語句主線程也會(huì)等待所有子線程執(zhí)行完畢才退出。
join方法可以用于阻塞主線程的順序執(zhí)行,因此,在主線程中使用可以調(diào)整各個(gè)子線程的執(zhí)行順序,了解完這些之后,我們來看下一個(gè)實(shí)例。
""" <case2: 使用join方法阻塞進(jìn)程> """ from threading import Thread import time from time import sleep # 自定義的函數(shù),可以替換成其他任何函數(shù) def task(threadName, number, letter): print(f"【線程開始】{threadName}") m = 0 while m < number: sleep(1) m += 1 current_time = time.strftime('%H:%M:%S', time.localtime()) print(f"[{current_time}] {threadName} 輸出 {letter}") print(f"【線程結(jié)束】{threadName}") thread1 = Thread(target=task, args=("thread_1", 6, "a")) # 線程1:假設(shè)任務(wù)為打印6個(gè)a thread2 = Thread(target=task, args=("thread_2", 4, "b")) # 線程2:假設(shè)任務(wù)為打印4個(gè)b thread3 = Thread(target=task, args=("thread_3", 2, "c")) # 線程3:假設(shè)任務(wù)為打印2個(gè)c thread1.start() # 線程1啟動(dòng) thread2.start() # 任務(wù)2啟動(dòng) thread2.join() # 等待線程2 thread3.start() # 線程2完成任務(wù)后線程3才啟動(dòng) thread1.join() # 等待線程1完成線程 thread3.join() # 等待線程3完成線程
其輸出為:
【線程開始】thread_1
【線程開始】thread_2
[13:44:20] thread_2 輸出 b
[13:44:20] thread_1 輸出 a
[13:44:21] thread_2 輸出 b
[13:44:21] thread_1 輸出 a
[13:44:22] thread_2 輸出 b
[13:44:22] thread_1 輸出 a
[13:44:23] thread_2 輸出 b
【線程結(jié)束】thread_2
[13:44:23] thread_1 輸出 a
【線程開始】thread_3
[13:44:24] thread_3 輸出 c
[13:44:24] thread_1 輸出 a
[13:44:25] thread_1 輸出 a
[13:44:25] thread_3 輸出 c
【線程結(jié)束】thread_3
【線程結(jié)束】thread_1
由輸出可以看出,由于join的加入,thread2.join使得主進(jìn)程一直在等待thread2線程完成任務(wù),因此直到線程thread2結(jié)束后,thread3才開始任務(wù)。
由于這里thread1一共打印6個(gè)a,thread2打印4個(gè)b,thread3打印2個(gè)c。thread1的工作量等于thread2+thread3的工作量之和,因此整個(gè)程序可以看成是thread1與thread2+thread3并行運(yùn)行。
實(shí)例3-重寫父類threading.Thread創(chuàng)建線程
實(shí)例1和2中,我們已經(jīng)介紹了如何直接導(dǎo)入Thread函數(shù)創(chuàng)建線程以及如何利用join方法,但是這種創(chuàng)建線程的方法本質(zhì)上使用的是其父類的默認(rèn)設(shè)置,具有局限性。在實(shí)例3中,將進(jìn)一步深入探討如何繼承并重寫父類threading.Thread類創(chuàng)建子線程。
和實(shí)例2相同,我們假設(shè)需要用多個(gè)線程處理任務(wù)task1,thread1打印4個(gè)a字母(耗時(shí)4s),thread2線程打印2個(gè)b字母(耗時(shí)2s),如下:
""" <case3: 重寫父類threading.Thread創(chuàng)建線程> """ import threading import time from time import sleep # myThread繼承父類,并進(jìn)行重寫 class myThread(threading.Thread): # 重寫父類的構(gòu)造函數(shù) def __init__(self, number, letter): threading.Thread.__init__(self) self.number = number # 添加number變量 self.letter = letter # 添加letter變量 # 重寫父類中的run函數(shù) def run(self): print(f"【線程開始】{self.name}") task1(self.name, self.number, self.letter) print("【線程結(jié)束】", self.name) # 重寫父類析構(gòu)函數(shù) def __del__(self): print("【線程銷毀釋放內(nèi)存】", self.name) # 自定義的函數(shù),此處可以替換成任何其他想要多線程執(zhí)行的任務(wù) def task1(threadName, number, letter): m = 0 while m < number: sleep(1) m += 1 current_time = time.strftime('%H:%M:%S', time.localtime()) print(f"[{current_time}] {threadName} 輸出 {letter}") # def task2... # def task3... thread1 = myThread(4, "a") # 創(chuàng)建線程thread1:任務(wù)耗時(shí)2s thread2 = myThread(2, "b") # 創(chuàng)建線程thread2:任務(wù)耗時(shí)4s thread1.start() # 啟動(dòng)線程1 thread2.start() # 啟動(dòng)線程2 thread1.join() # 等待線程1 thread2.join() # 等待線程2
輸出為:
【線程開始】Thread-1
【線程開始】Thread-2
[10:37:58] Thread-1 輸出 a
[10:37:58] Thread-2 輸出 b
[10:37:59] Thread-1 輸出 a
[10:37:59] Thread-2 輸出 b
【線程結(jié)束】 Thread-2
[10:38:00] Thread-1 輸出 a
[10:38:01] Thread-1 輸出 a
【線程結(jié)束】 Thread-1
【線程銷毀釋放內(nèi)存】 Thread-1
【線程銷毀釋放內(nèi)存】 Thread-2
從輸出中,我們可以清楚的看到兩個(gè)并行任務(wù)從開始到結(jié)束,最后一起銷毀并釋放內(nèi)存的全過程,很好的體現(xiàn)了線程的一個(gè)完整生命周期過程。
最后實(shí)現(xiàn)的效果與實(shí)例1實(shí)現(xiàn)的效果相同,但是使用繼承重寫父類的方法,可以讓我們更加自由的定義各項(xiàng)參數(shù)以及定義線程處理的任務(wù),也能讓我們對(duì)threading模塊的理解更加深入。
實(shí)例4-前臺(tái)線程與后臺(tái)線程(守護(hù)線程)
在前面的所有實(shí)例中,我們忽略了threading.Thread的daemon參數(shù),其默認(rèn)為False,表示線程默認(rèn)就是一個(gè)前臺(tái)線程。
前臺(tái)線程表示當(dāng)所有的前臺(tái)線程都執(zhí)行完畢時(shí),整個(gè)程序才退出。將daemon參數(shù)設(shè)定為True是表示線程是一個(gè)后臺(tái)線程,此時(shí)主進(jìn)程結(jié)束時(shí),所有未執(zhí)行完成的后臺(tái)線程也都會(huì)直接自動(dòng)結(jié)束。
在上一個(gè)實(shí)例的基礎(chǔ)上,在初始化部分加入self.daemon=True,并去掉末尾的join方法,替換成sleep方法來阻塞主程序的運(yùn)行,我們來看看結(jié)果會(huì)變成什么樣,實(shí)例如下:
""" <case4: 前臺(tái)線程與后臺(tái)線程> """ import threading import time from time import sleep # myThread繼承父類,并進(jìn)行重寫 class myThread(threading.Thread): # 重寫父類的構(gòu)造函數(shù) def __init__(self, number, letter): threading.Thread.__init__(self) self.number = number # 添加number變量 self.letter = letter # 添加letter變量 self.daemon = True # 默認(rèn)前臺(tái)線程 # 重寫父類中的run函數(shù) def run(self): print(f"【線程開始】{self.name}") task1(self.name, self.number, self.letter) print("【線程結(jié)束】", self.name) # 重寫父類析構(gòu)函數(shù) def __del__(self): print("【線程銷毀釋放內(nèi)存】", self.name) # 自定義的函數(shù),此處可以替換成任何其他想要多線程執(zhí)行的任務(wù) def task1(threadName, number, letter): m = 0 while m < number: sleep(1) m += 1 current_time = time.strftime('%H:%M:%S', time.localtime()) print(f"[{current_time}] {threadName} 輸出 {letter}") # def task2... # def task3... thread1 = myThread(4, "a") # 創(chuàng)建線程thread1:假設(shè)任務(wù)耗時(shí)2s thread2 = myThread(2, "b") # 創(chuàng)建線程thread2:假設(shè)任務(wù)耗時(shí)4s thread1.start() # 啟動(dòng)線程1 thread2.start() # 啟動(dòng)線程2 time.sleep(3) # 主程序等待3s再繼續(xù)執(zhí)行
其輸出將變?yōu)椋?/p>
【線程開始】Thread-1
【線程開始】Thread-2
[10:31:45] Thread-1 輸出 a
[10:31:45] Thread-2 輸出 b
[10:31:46] Thread-1 輸出 a
[10:31:46] Thread-2 輸出 b
【線程結(jié)束】 Thread-2
Process finished with exit code 0
我們用sleep方法強(qiáng)行阻塞了主程序3s,但是由于我們將線程設(shè)定為了后臺(tái)線程,3s過后,主程序?qū)?zhí)行完畢,此時(shí)兩個(gè)子線程thread1和thread2無論是否執(zhí)行完成,都將強(qiáng)行結(jié)束。
將daemon參數(shù)設(shè)定為False,其輸出則與實(shí)例3相同,如下:
【線程開始】Thread-1
【線程開始】Thread-2
[10:30:14] Thread-1 輸出 a
[10:30:14] Thread-2 輸出 b
[10:30:15] Thread-1 輸出 a
[10:30:15] Thread-2 輸出 b
【線程結(jié)束】 Thread-2
[10:30:16] Thread-1 輸出 a
[10:30:17] Thread-1 輸出 a
【線程結(jié)束】 Thread-1
【線程銷毀釋放內(nèi)存】 Thread-1
【線程銷毀釋放內(nèi)存】 Thread-2
實(shí)例5-線程同步(線程鎖)
我們?cè)O(shè)想一下這種情況,當(dāng)多線程同時(shí)執(zhí)行時(shí),由于threading模塊的中線程的變量和數(shù)據(jù)結(jié)構(gòu)共享,可能會(huì)出現(xiàn)多個(gè)線程同時(shí)修改一個(gè)數(shù)據(jù)的情況,這絕對(duì)是不行的。
為了將各個(gè)線程同步,我們引入線程鎖的概念。當(dāng)某個(gè)線程訪問數(shù)據(jù)時(shí),先對(duì)其加鎖,其他線程若再想訪問這個(gè)數(shù)據(jù)就會(huì)被阻塞,直到前一個(gè)線程解鎖釋放。在threading模塊中,加鎖和釋放鎖主要使用Lock類,使用其中的acquire()和release()方法:
Lock = threading.Lock() # 在threading模塊中獲得鎖類 Lock.acquire() # 設(shè)置鎖 Lock.release() # 釋放鎖
在介紹線程鎖實(shí)例時(shí),我們就不使用前面幾個(gè)實(shí)例用的打印字母的任務(wù)了。為了讓各位更加直觀地體會(huì)到線程鎖的作用,我們使用多線程對(duì)一個(gè)列表list進(jìn)行數(shù)據(jù)刪改。
假設(shè)此時(shí)有多個(gè)線程都需要對(duì)這個(gè)列表進(jìn)行修改操作,實(shí)例如下:
""" <case5: 線程同步,線程鎖> """ import threading import time # 子類myThread繼承父類threading.Thread,并進(jìn)行重寫 class myThread(threading.Thread): # 重寫父類構(gòu)造函數(shù) def __init__(self, number): threading.Thread.__init__(self) self.number = number # 重寫父類run函數(shù),在調(diào)用start()時(shí)自動(dòng)調(diào)用run函數(shù) def run(self): print(f"【線程開始】{self.name}") Lock.acquire() # 設(shè)置線程鎖 edit_list(self.name, self.number) Lock.release() # 釋放線程鎖 # 重寫父類析構(gòu)函數(shù) def __del__(self): print("【線程銷毀】", self.name) # 自定義的任務(wù)函數(shù) def edit_list(threadName, number): while number > 0: time.sleep(1) data_list[number-1] += 1 current_time = time.strftime('%H:%M:%S', time.localtime()) print(f"[{current_time}] {threadName} 修改datalist為{data_list}") number -= 1 print(f"【線程{threadName}完成工作】") data_list = [0, 0, 0, 0] Lock = threading.Lock() # 創(chuàng)建3個(gè)子線程 thread1 = myThread(1) thread2 = myThread(2) thread3 = myThread(3) # 啟動(dòng)3個(gè)子線程 thread1.start() thread2.start() thread3.start() # 主進(jìn)程等待所有線程完成 thread1.join() thread2.join() thread3.join() print("【主進(jìn)程結(jié)束】")
輸出為:
【線程開始】Thread-1
【線程開始】Thread-2
【線程開始】Thread-3
[09:55:22] Thread-1 修改datalist為[1, 0, 0, 0]
【線程Thread-1完成工作】
[09:55:23] Thread-2 修改datalist為[1, 1, 0, 0]
[09:55:24] Thread-2 修改datalist為[2, 1, 0, 0]
【線程Thread-2完成工作】
[09:55:25] Thread-3 修改datalist為[2, 1, 1, 0]
[09:55:26] Thread-3 修改datalist為[2, 2, 1, 0]
[09:55:27] Thread-3 修改datalist為[3, 2, 1, 0]
【線程Thread-3完成工作】
【主進(jìn)程結(jié)束】
【線程銷毀】 Thread-1
【線程銷毀】 Thread-2
【線程銷毀】 Thread-3
當(dāng)三個(gè)線程都需要使用同一個(gè)數(shù)據(jù)時(shí),我們只需要對(duì)線程的run方法中進(jìn)行加鎖和釋放鎖的操作即可。此時(shí)三個(gè)子線程將會(huì)進(jìn)行順序操作,前一個(gè)子線程執(zhí)行完成釋放鎖后,后一個(gè)線程才會(huì)繼續(xù)執(zhí)行。要注意的是,這三個(gè)子線程使用的需要是同一把鎖。
threading模塊還有很多可選參數(shù)和方法可供使用,詳情可參見threading模塊的官方文檔
點(diǎn)擊鏈接:threading --- Thread-based parallelism — Python 3.12.3 文檔
以上就是Python多線程threading模塊實(shí)例詳解的詳細(xì)內(nèi)容,更多關(guān)于Python threading模塊的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
深入理解Python虛擬機(jī)中的Code?obejct
在本篇文章當(dāng)中主要給大家深入介紹在?cpython?當(dāng)中非常重要的一個(gè)數(shù)據(jù)結(jié)構(gòu)?code?object!?我們簡單介紹了一下在?code?object?當(dāng)中有哪些字段以及這些字段的簡單含義,在本篇文章當(dāng)中將會(huì)舉一些例子以便更加深入理解這些字段2023-04-04Python利用Pandas進(jìn)行數(shù)據(jù)分析的方法詳解
Pandas是最流行的用于數(shù)據(jù)分析的?Python?庫。它提供高度優(yōu)化的性能。本文將利用Python進(jìn)行數(shù)據(jù)分析,感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2022-09-09Python 時(shí)間操作例子和時(shí)間格式化參數(shù)小結(jié)
這篇文章主要介紹了Python 時(shí)間操作例子,例如取前幾天、后幾天、前一月、后一月等,需要的朋友可以參考下2014-04-04