Python 多線程超詳細(xì)到位總結(jié)
在實(shí)際處理數(shù)據(jù)時(shí),因系統(tǒng)內(nèi)存有限,我們不可能一次把所有數(shù)據(jù)都導(dǎo)出進(jìn)行操作,所以需要批量導(dǎo)出依次操作。為了加快運(yùn)行,我們會(huì)采用多線程的方法進(jìn)行數(shù)據(jù)處理,以下為我總結(jié)的多線程批量處理數(shù)據(jù)的模板:
import threading # 從數(shù)據(jù)庫提取數(shù)據(jù)的類 class Scheduler(): def __init__(self): self._lock = threading.RLock() self.start = 0 # 每次取10000條數(shù)據(jù) self.step = 10000 def getdata(self): # 上鎖,以免多線程同時(shí)對(duì)數(shù)據(jù)庫進(jìn)行訪問,取出重復(fù)數(shù)據(jù) self._lock.acquire() # 進(jìn)行取數(shù)據(jù)操作 data = 'select * from table' \ 'where id between self.start and self.start + self.step' # 取完數(shù)據(jù)后,指針后移 self.start += self.step self._lock.release() return data # 處理數(shù)據(jù)的過程寫在這里 def processdata(): # 從該實(shí)例中提取數(shù)據(jù) data = scheduler.getdata() while data: # 進(jìn)行處理數(shù)據(jù)的具體操作: # 去重、補(bǔ)缺、運(yùn)算...只要還有數(shù)據(jù),本線程就繼續(xù)取新數(shù)據(jù) # 然后再獲取數(shù)據(jù),進(jìn)行循環(huán) data = scheduler.getdata() # 創(chuàng)建多線程,threads_num為創(chuàng)建的線程數(shù) def threads_scheduler(threads_num): threads = [] for i in range(threads_num): # 創(chuàng)建線程 td = threading.Thread(target=processdata, name='th'+str(i+1)) threads.append(td) for t in threads: # 啟動(dòng)線程 t.start() for t in threads: # 子線程守護(hù) t.join() print('數(shù)據(jù)已全部處理成功') if __name__=='__main__': # 實(shí)例化一個(gè)調(diào)度器,初始化參數(shù) scheduler = Scheduler() # 創(chuàng)建線程,開始處理數(shù)據(jù) threads_scheduler(4)
主要分為三大部分:
- Scheduler類,負(fù)責(zé)初始化參數(shù),getdata方法負(fù)責(zé)提取數(shù)據(jù)
- processdata方法中寫具體處理數(shù)據(jù)的流程
- threads_scheduler方法負(fù)責(zé)創(chuàng)建線程
Python多線程的知識(shí)我分為4部分進(jìn)行講解,以下帶大家來回顧重點(diǎn):
多線程threading
本章先為大家介紹了線程的相關(guān)概念:
主線程:當(dāng)一個(gè)程序啟動(dòng)時(shí),就有一個(gè)進(jìn)程被操作系統(tǒng)(OS)創(chuàng)建,與此同時(shí)一個(gè)線程也立刻運(yùn)行,該線程通常叫做程序的主線程(Main Thread)。因?yàn)樗浅绦蜷_始時(shí)就執(zhí)行的,如果你需要再創(chuàng)建線程,那么創(chuàng)建的線程就是這個(gè)主線程的子線程。
子線程:使用threading、ThreadPoolExecutor創(chuàng)建的線性均為子線程。
主線程的重要性體現(xiàn)在兩方面:1.是產(chǎn)生其他子線程的線程;2.通常它必須最后完成執(zhí)行,比如執(zhí)行各種關(guān)閉動(dòng)作。
在飛車程序中,如果沒有多線程,我們就不能一邊聽歌一邊玩飛車,聽歌與玩游戲不能并行;在使用多線程后,我們就可以在玩游戲的同時(shí)聽背景音樂。在這個(gè)例子中啟動(dòng)飛車程序就是一個(gè)進(jìn)程,玩游戲和聽音樂是兩個(gè)線程。
Python提供了threading模塊來實(shí)現(xiàn)多線程:threading.Thread可以創(chuàng)建線程;setDaemon(True)為守護(hù)主線程,默認(rèn)為False;join()為守護(hù)子線程。
from time import sleep import threading def music(music_name): for i in range(2): print('正在聽{}'.format(music_name)) sleep(1) print('music over') def game(game_name): for i in range(2): print('正在玩{}'.format(game_name)) sleep(3) print('game over') threads = [] t1 = threading.Thread(target=music,args=('稻香',)) threads.append(t1) t2 = threading.Thread(target=game,args=('飛車',)) threads.append(t2) if __name__ == '__main__': for t in threads: # t.setDaemon(True) t.start() for t in threads: t.join() print('主線程運(yùn)行結(jié)束')
線程池
因?yàn)樾陆ň€程系統(tǒng)需要分配資源、終止線程系統(tǒng)需要回收資源,所以如果可以重用線程,則可以減去新建/終止的開銷以提升性能。同時(shí),使用線程池的語法比自己新建線程執(zhí)行線程更加簡潔。
Python為我們提供了ThreadPoolExecutor來實(shí)現(xiàn)線程池,此線程池默認(rèn)子線程守護(hù)。它的適應(yīng)場景為突發(fā)性大量請求或需要大量線程完成任務(wù),但實(shí)際任務(wù)處理時(shí)間較短。
from time import sleep # fun為定義的待運(yùn)行函數(shù) with ThreadPoolExecutor(max_workers=5) as executor: ans = executor.map(fun, [遍歷值]) for res in ans: print(res) with ThreadPoolExecutor(max_workers=5) as executor: list = [遍歷值] ans = [executor.submit(fun, i) for i in list] for res in as_completed(ans): print(res.result())
其中max_workers為線程池中的線程個(gè)數(shù),常用的遍歷方法有map和submit+as_completed。根據(jù)業(yè)務(wù)場景的不同,若我們需要輸出結(jié)果按遍歷順序返回,我們就用map方法,若想誰先完成就返回誰,我們就用submit+as_complete方法。
線程互斥
我們把一個(gè)時(shí)間段內(nèi)只允許一個(gè)線程使用的資源稱為臨界資源,對(duì)臨界資源的訪問,必須互斥的進(jìn)行?;コ猓卜Q間接制約關(guān)系。線程互斥指當(dāng)一個(gè)線程訪問某臨界資源時(shí),另一個(gè)想要訪問該臨界資源的線程必須等待。當(dāng)前訪問臨界資源的線程訪問結(jié)束,釋放該資源之后,另一個(gè)線程才能去訪問臨界資源。鎖的功能就是實(shí)現(xiàn)線程互斥。
我把線程互斥比作廁所包間上大號(hào)的過程,因?yàn)榘g里只有一個(gè)坑,所以只允許一個(gè)人進(jìn)行大號(hào)。當(dāng)?shù)谝粋€(gè)人要上廁所時(shí),會(huì)將門上上鎖,這時(shí)如果第二個(gè)人也想大號(hào),那就必須等第一個(gè)人上完,將鎖解開后才能進(jìn)行,在這期間第二個(gè)人就只能在門外等著。這個(gè)過程與代碼中使用鎖的原理如出一轍,這里的坑就是臨界資源。
Python 的 threading 模塊引入了鎖。threading 模塊提供了 Lock 類,它有如下方法加鎖和釋放鎖:
- acquire():對(duì) Lock加鎖,其中timeout參數(shù)指定加鎖多少秒
- release():釋放鎖
class Account: def __init__(self, card_id, balance): # 封裝賬戶ID、賬戶余額的兩個(gè)變量 self.card_id= card_id self.balance = balance def withdraw(account, money): # 進(jìn)行加鎖 lock.acquire() # 賬戶余額大于取錢數(shù)目 if account.balance >= money: # 吐出鈔票 print(threading.current_thread().name + "取錢成功!吐出鈔票:" + str(money),end=' ') # 修改余額 account.balance -= money print("\t余額為: " + str(account.balance)) else: print(threading.current_thread().name + "取錢失??!余額不足") # 進(jìn)行解鎖 lock.release() # 創(chuàng)建一個(gè)賬戶,銀行卡id為8888,存款1000元 acct = Account("8888" , 1000) # 模擬兩個(gè)對(duì)同一個(gè)賬戶取錢 # 在主線程中創(chuàng)建一把鎖 lock = threading.Lock() threading.Thread(name='窗口A', target=withdraw , args=(acct , 800)).start() threading.Thread(name='窗口B', target=withdraw , args=(acct , 800)).start()
lock與Rlock的區(qū)別
區(qū)別一:Lock被稱為原始鎖,一個(gè)線程只能請求一次;RLock被稱為重入鎖,可以被一個(gè)線程請求多次,即鎖中可以嵌套鎖。
import threading def main(): lock.acquire() print('第一道鎖') lock.acquire() print('第二道鎖') lock.release() lock.release() if __name__ == '__main__': lock = threading.Lock() main()
我們會(huì)發(fā)現(xiàn)這個(gè)程序只會(huì)打印“第一道鎖”,而且程序既沒有終止,也沒有繼續(xù)運(yùn)行。這是因?yàn)長ock鎖在同一線程內(nèi)第一次加鎖之后還沒有釋放時(shí),就進(jìn)行了第二次acquire請求,導(dǎo)致無法執(zhí)行release,所以鎖永遠(yuǎn)無法釋放,這就是死鎖。如果我們使用RLock就能正常運(yùn)行,不會(huì)發(fā)生死鎖的狀態(tài)。
區(qū)別二:當(dāng)Lock處于鎖定狀態(tài)時(shí),不屬于特定線程,可在另一個(gè)線程中進(jìn)行解鎖釋放;而RLock只有當(dāng)前線程才能釋放本線程上的鎖,不可由其他線程進(jìn)行釋放,所以在使用RLock時(shí),acquire與release必須成對(duì)出現(xiàn),即解鈴還須系鈴人。
import threading def main(): lock.release() print("在子線程解鎖后打印") if __name__ == '__main__': lock = threading.Lock() lock.acquire() t = threading.Thread(target=main) t.start()
在主線程中定義Lock鎖,然后上鎖,再創(chuàng)建一個(gè)子線程t運(yùn)行main函數(shù)釋放鎖,結(jié)果正常輸出,說明主線程上的鎖,可由子線程解鎖。
如果把上面的鎖改為RLock則報(bào)錯(cuò)。在實(shí)際中設(shè)計(jì)程序時(shí),我們會(huì)將每個(gè)功能分別封裝成一個(gè)函數(shù),每個(gè)函數(shù)中都可能會(huì)有臨界區(qū)域,所以就需要用到RLock。
import threading import time def fun_1(): print('開始') time.sleep(1) lock.acquire() print("第一道鎖") fun_2() lock.release() def fun_2(): lock.acquire() print("第二道鎖") lock.release() if __name__ == '__main__': lock = threading.RLock() t1 = threading.Thread(target=fun_1) t2 = threading.Thread(target=fun_1) t1.start() t2.start()
一句話總結(jié)就是Lock不能套娃,RLock可以套娃;Lock可以由其他線程中的鎖進(jìn)行操作,RLock只能由本線程進(jìn)行操作。
以上就是多線程所有內(nèi)容,喜歡的小伙伴支持,收藏。
技術(shù)交流
歡迎轉(zhuǎn)載、收藏、有所收獲點(diǎn)贊支持一下!
到此這篇關(guān)于Python 多線程超詳細(xì)到位總結(jié)的文章就介紹到這了,更多相關(guān)Python 多線程內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python實(shí)現(xiàn)公歷(陽歷)轉(zhuǎn)農(nóng)歷(陰歷)的方法示例
這篇文章主要介紹了Python實(shí)現(xiàn)公歷(陽歷)轉(zhuǎn)農(nóng)歷(陰歷)的方法,涉及農(nóng)歷算法原理及Python日期運(yùn)算相關(guān)操作技巧,需要的朋友可以參考下2017-08-08Python使用百度通用API進(jìn)行翻譯實(shí)現(xiàn)
本文主要介紹了Python使用百度通用API進(jìn)行翻譯實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-02-02python格式的Caffe圖片數(shù)據(jù)均值計(jì)算學(xué)習(xí)
這篇文章主要為大家介紹了python格式的Caffe圖片數(shù)據(jù)均值計(jì)算學(xué)習(xí)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-06-06Python進(jìn)階之@property動(dòng)態(tài)屬性的實(shí)現(xiàn)
這篇文章主要介紹了Python進(jìn)階之@property動(dòng)態(tài)屬性的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-04-04Pandas數(shù)據(jù)分析多文件批次聚合處理實(shí)例解析
這篇文章主要為大家介紹了Pandas數(shù)據(jù)分析多文件批次聚合處理實(shí)例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-02-02