詳解Python開(kāi)啟線程和線程池的方法
Python開(kāi)啟線程和線程池的方法
一.最佳線程數(shù)的獲?。?/h3>
1、通過(guò)用戶慢慢遞增來(lái)進(jìn)行性能壓測(cè),觀察QPS(即每秒的響應(yīng)請(qǐng)求數(shù),也即是最大吞吐能力。),響應(yīng)時(shí)間
2、根據(jù)公式計(jì)算:服務(wù)器端最佳線程數(shù)量=((線程等待時(shí)間+線程cpu時(shí)間)/線程cpu時(shí)間) * cpu數(shù)量
3、單用戶壓測(cè),查看CPU的消耗,然后直接乘以百分比,再進(jìn)行壓測(cè),一般這個(gè)值的附近應(yīng)該就是最佳線程數(shù)量。
二、為什么要使用線程池?
1.多線程中,線程的數(shù)量并非越多越好
2.節(jié)省每次開(kāi)啟線程的開(kāi)銷
三、如何實(shí)現(xiàn)線程池?
threadpool模塊
concurrent.futures
重寫(xiě)threadpool或者future的函數(shù)
vthread 模塊
1、過(guò)去:
使用threadpool模塊,這是個(gè)python的第三方模塊,支持python2和python3,具體使用方式如下:
#! /usr/bin/env python # -*- coding: utf-8 -*- import threadpool import time def sayhello (a): print("hello: "+a) time.sleep(2) def main(): global result seed=["a","b","c"] start=time.time() task_pool=threadpool.ThreadPool(5) requests=threadpool.makeRequests(sayhello,seed) for req in requests: task_pool.putRequest(req) task_pool.wait() end=time.time() time_m = end-start print("time: "+str(time_m)) start1=time.time() for each in seed: sayhello(each) end1=time.time() print("time1: "+str(end1-start1)) if __name__ == '__main__': main()
運(yùn)行結(jié)果如下:
threadpool是一個(gè)比較老的模塊了,現(xiàn)在雖然還有一些人在用,但已經(jīng)不再是主流了,關(guān)于python多線程,現(xiàn)在已經(jīng)開(kāi)始步入未來(lái)(future模塊)了
2、未來(lái):
使用concurrent.futures模塊,這個(gè)模塊是python3中自帶的模塊,但是,python2.7以上版本也可以安裝使用,具體使用方式如下:
#! /usr/bin/env python # -*- coding: utf-8 -*- from concurrent.futures import ThreadPoolExecutor import time def sayhello(a): print("hello: "+a) time.sleep(2) def main(): seed=["a","b","c"] start1=time.time() for each in seed: sayhello(each) end1=time.time() print("time1: "+str(end1-start1)) start2=time.time() with ThreadPoolExecutor(3) as executor: for each in seed: executor.submit(sayhello,each) end2=time.time() print("time2: "+str(end2-start2)) start3=time.time() with ThreadPoolExecutor(3) as executor1: executor1.map(sayhello,seed) end3=time.time() print("time3: "+str(end3-start3)) if __name__ == '__main__': main()
運(yùn)行結(jié)果如下:
注意到一點(diǎn):
concurrent.futures.ThreadPoolExecutor,在提交任務(wù)的時(shí)候,有兩種方式,一種是submit()函數(shù),另一種是map()函數(shù),兩者的主要區(qū)別在于:
2.1、map可以保證輸出的順序, submit輸出的順序是亂的
2.2、如果你要提交的任務(wù)的函數(shù)是一樣的,就可以簡(jiǎn)化成map。但是假如提交的任務(wù)函數(shù)是不一樣的,或者執(zhí)行的過(guò)程之可能出現(xiàn)異常(使用map執(zhí)行過(guò)程中發(fā)現(xiàn)問(wèn)題會(huì)直接拋出錯(cuò)誤)就要用到submit()
2.3、submit和map的參數(shù)是不同的,submit每次都需要提交一個(gè)目標(biāo)函數(shù)和對(duì)應(yīng)的參數(shù),map只需要提交一次目標(biāo)函數(shù),目標(biāo)函數(shù)的參數(shù)放在一個(gè)迭代器(列表,字典)里就可以。
3.現(xiàn)在?
這里要考慮一個(gè)問(wèn)題,以上兩種線程池的實(shí)現(xiàn)都是封裝好的,任務(wù)只能在線程池初始化的時(shí)候添加一次,那么,假設(shè)我現(xiàn)在有這樣一個(gè)需求,需要在線程池運(yùn)行時(shí),再往里面添加新的任務(wù)(注意,是新任務(wù),不是新線程),那么要怎么辦?
其實(shí)有兩種方式:
3.1、重寫(xiě)threadpool或者future的函數(shù):
這個(gè)方法需要閱讀源模塊的源碼,必須搞清楚源模塊線程池的實(shí)現(xiàn)機(jī)制才能正確的根據(jù)自己的需要重寫(xiě)其中的方法。
3.2、自己構(gòu)建一個(gè)線程池:
這個(gè)方法就需要對(duì)線程池的有一個(gè)清晰的了解了,附上我自己構(gòu)建的一個(gè)線程池:
#! /usr/bin/env python # -*- coding: utf-8 -*- #學(xué)習(xí)中遇到問(wèn)題沒(méi)人解答?小編創(chuàng)建了一個(gè)Python學(xué)習(xí)交流群:711312441 import threading import Queue import hashlib import logging from utils.progress import PrintProgress from utils.save import SaveToSqlite class ThreadPool(object): def __init__(self, thread_num, args): self.args = args self.work_queue = Queue.Queue() self.save_queue = Queue.Queue() self.threads = [] self.running = 0 self.failure = 0 self.success = 0 self.tasks = {} self.thread_name = threading.current_thread().getName() self.__init_thread_pool(thread_num) # 線程池初始化 def __init_thread_pool(self, thread_num): # 下載線程 for i in range(thread_num): self.threads.append(WorkThread(self)) # 打印進(jìn)度信息線程 self.threads.append(PrintProgress(self)) # 保存線程 self.threads.append(SaveToSqlite(self, self.args.dbfile)) # 添加下載任務(wù) def add_task(self, func, url, deep): # 記錄任務(wù),判斷是否已經(jīng)下載過(guò) url_hash = hashlib.new('md5', url.encode("utf8")).hexdigest() if not url_hash in self.tasks: self.tasks[url_hash] = url self.work_queue.put((func, url, deep)) logging.info("{0} add task {1}".format(self.thread_name, url.encode("utf8"))) # 獲取下載任務(wù) def get_task(self): # 從隊(duì)列里取元素,如果block=True,則一直阻塞到有可用元素為止。 task = self.work_queue.get(block=False) return task def task_done(self): # 表示隊(duì)列中的某個(gè)元素已經(jīng)執(zhí)行完畢。 self.work_queue.task_done() # 開(kāi)始任務(wù) def start_task(self): for item in self.threads: item.start() logging.debug("Work start") def increase_success(self): self.success += 1 def increase_failure(self): self.failure += 1 def increase_running(self): self.running += 1 def decrease_running(self): self.running -= 1 def get_running(self): return self.running # 打印執(zhí)行信息 def get_progress_info(self): progress_info = {} progress_info['work_queue_number'] = self.work_queue.qsize() progress_info['tasks_number'] = len(self.tasks) progress_info['save_queue_number'] = self.save_queue.qsize() progress_info['success'] = self.success progress_info['failure'] = self.failure return progress_info def add_save_task(self, url, html): self.save_queue.put((url, html)) def get_save_task(self): save_task = self.save_queue.get(block=False) return save_task def wait_all_complete(self): for item in self.threads: if item.isAlive(): # join函數(shù)的意義,只有當(dāng)前執(zhí)行join函數(shù)的線程結(jié)束,程序才能接著執(zhí)行下去 item.join() # WorkThread 繼承自threading.Thread class WorkThread(threading.Thread): # 這里的thread_pool就是上面的ThreadPool類 def __init__(self, thread_pool): threading.Thread.__init__(self) self.thread_pool = thread_pool #定義線程功能方法,即,當(dāng)thread_1,...,thread_n,調(diào)用start()之后,執(zhí)行的操作。 def run(self): print (threading.current_thread().getName()) while True: try: # get_task()獲取從工作隊(duì)列里獲取當(dāng)前正在下載的線程,格式為func,url,deep do, url, deep = self.thread_pool.get_task() self.thread_pool.increase_running() # 判斷deep,是否獲取新的鏈接 flag_get_new_link = True if deep >= self.thread_pool.args.deep: flag_get_new_link = False #學(xué)習(xí)中遇到問(wèn)題沒(méi)人解答?小編創(chuàng)建了一個(gè)Python學(xué)習(xí)交流群:711312441 # 此處do為工作隊(duì)列傳過(guò)來(lái)的func,返回值為一個(gè)頁(yè)面內(nèi)容和這個(gè)頁(yè)面上所有的新鏈接 html, new_link = do(url, self.thread_pool.args, flag_get_new_link) if html == '': self.thread_pool.increase_failure() else: self.thread_pool.increase_success() # html添加到待保存隊(duì)列 self.thread_pool.add_save_task(url, html) # 添加新任務(wù),即,將新頁(yè)面上的不重復(fù)的鏈接加入工作隊(duì)列。 if new_link: for url in new_link: self.thread_pool.add_task(do, url, deep + 1) self.thread_pool.decrease_running() # self.thread_pool.task_done() except Queue.Empty: if self.thread_pool.get_running() <= 0: break except Exception, e: self.thread_pool.decrease_running() # print str(e) break
安裝vthread函數(shù)庫(kù)系統(tǒng)命令行下執(zhí)行:
pip install vthread
一句話實(shí)現(xiàn)簡(jiǎn)單多線程
import vthread,requests @vthread.thread(5) #開(kāi)5個(gè)線程執(zhí)行同一個(gè)函數(shù) def compete(url): r = requests.get(url) if r.status_code == 200 : print("[*]Success") else: print("[*]Fail. Retrying...") compete("http://www.baidu.com/")
相同效果:
import vthread,requests @vthread.thread def compete(url): r = requests.get(url) if r.status_code == 200 : print("[*]Success") else: print("[*]Fail. Retrying...") for i in range(5): #線程數(shù) compete("http://www.baidu.com/")
線程池包裝
import vthread,requests @vthread.pool(10) #包裝10條線程池 def compete(url): r = requests.get(url) if r.status_code == 200 : print("[*]Success") else: print("[*]Fail. Retrying...") for i in range(20): #20線程 compete("http://www.baidu.com/")
到此這篇關(guān)于Python開(kāi)啟線程和線程池的方法的文章就介紹到這了,更多相關(guān)Python開(kāi)啟線程和線程池內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
教你用Python創(chuàng)建微信聊天機(jī)器人
這篇文章主要手把手教你用Python創(chuàng)建微信聊天機(jī)器人,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-03-03python如何實(shí)現(xiàn)數(shù)組元素兩兩相加
這篇文章主要介紹了python如何實(shí)現(xiàn)數(shù)組元素兩兩相加,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-05-05python 采用paramiko 遠(yuǎn)程執(zhí)行命令及報(bào)錯(cuò)解決
這篇文章主要介紹了python 采用paramiko 遠(yuǎn)程執(zhí)行命令及報(bào)錯(cuò)解決,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-10-10Python+OpenCV實(shí)現(xiàn)圖像的全景拼接
這篇文章主要為大家詳細(xì)介紹了Python+OpenCV實(shí)現(xiàn)圖像的全景拼接,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2020-03-03python GUI庫(kù)圖形界面開(kāi)發(fā)之PyQt5控件數(shù)據(jù)拖曳Drag與Drop詳細(xì)使用方法與實(shí)例
這篇文章主要介紹了python GUI庫(kù)圖形界面開(kāi)發(fā)之PyQt5控件數(shù)據(jù)拖曳Drag與Drop詳細(xì)使用方法與實(shí)例,需要的朋友可以參考下2020-02-02Django多數(shù)據(jù)庫(kù)配置及逆向生成model教程
這篇文章主要介紹了Django多數(shù)據(jù)庫(kù)配置及逆向生成model教程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-03-03Python利用wxPython模塊打造ChatGPT式打字效果程序
這篇文章主要為大家介紹了如何利用Python和wxPython模塊打造一個(gè)ChatGPT式打字效果程序,從而增強(qiáng)用戶體驗(yàn)或提高應(yīng)用程序的可讀性,感興趣的可以了解一下2023-05-05OpenCV-Python使用分水嶺算法實(shí)現(xiàn)圖像的分割與提取
在圖像的處理過(guò)程中,經(jīng)常需要從圖像中將前景對(duì)象作為目標(biāo)圖像分割或者提取出來(lái)。本文就介紹了使用分水嶺算法實(shí)現(xiàn)圖像的分割與提取,感興趣的可以了解一下2021-06-06