python線程池 ThreadPoolExecutor 的用法示例
前言
從Python3.2開(kāi)始,標(biāo)準(zhǔn)庫(kù)為我們提供了 concurrent.futures 模塊,它提供了 ThreadPoolExecutor (線程池)和ProcessPoolExecutor (進(jìn)程池)兩個(gè)類。
相比 threading 等模塊,該模塊通過(guò) submit 返回的是一個(gè) future 對(duì)象,它是一個(gè)未來(lái)可期的對(duì)象,通過(guò)它可以獲悉線程的狀態(tài)主線程(或進(jìn)程)中可以獲取某一個(gè)線程(進(jìn)程)執(zhí)行的狀態(tài)或者某一個(gè)任務(wù)執(zhí)行的狀態(tài)及返回值:
主線程可以獲取某一個(gè)線程(或者任務(wù)的)的狀態(tài),以及返回值。
當(dāng)一個(gè)線程完成的時(shí)候,主線程能夠立即知道。
讓多線程和多進(jìn)程的編碼接口一致。
線程池的基本使用
# coding: utf-8 from concurrent.futures import ThreadPoolExecutor import time def spider(page): time.sleep(page) print(f"crawl task{page} finished") return page with ThreadPoolExecutor(max_workers=5) as t: # 創(chuàng)建一個(gè)最大容納數(shù)量為5的線程池 task1 = t.submit(spider, 1) task2 = t.submit(spider, 2) # 通過(guò)submit提交執(zhí)行的函數(shù)到線程池中 task3 = t.submit(spider, 3) print(f"task1: {task1.done()}") # 通過(guò)done來(lái)判斷線程是否完成 print(f"task2: {task2.done()}") print(f"task3: {task3.done()}") time.sleep(2.5) print(f"task1: {task1.done()}") print(f"task2: {task2.done()}") print(f"task3: {task3.done()}") print(task1.result()) # 通過(guò)result來(lái)獲取返回值
執(zhí)行結(jié)果如下:
task1: False
task2: False
task3: False
crawl task1 finished
crawl task2 finished
task1: True
task2: True
task3: False
1
crawl task3 finished
1.使用 with 語(yǔ)句 ,通過(guò) ThreadPoolExecutor 構(gòu)造實(shí)例,同時(shí)傳入 max_workers 參數(shù)來(lái)設(shè)置線程池中最多能同時(shí)運(yùn)行的線程數(shù)目。
2.使用 submit 函數(shù)來(lái)提交線程需要執(zhí)行的任務(wù)到線程池中,并返回該任務(wù)的句柄(類似于文件、畫(huà)圖),注意 submit() 不是阻塞的,而是立即返回。
3.通過(guò)使用 done() 方法判斷該任務(wù)是否結(jié)束。上面的例子可以看出,提交任務(wù)后立即判斷任務(wù)狀態(tài),顯示四個(gè)任務(wù)都未完成。在延時(shí)2.5后,task1 和 task2 執(zhí)行完畢,task3 仍在執(zhí)行中。
4.使用 result() 方法可以獲取任務(wù)的返回值。
主要方法
- wait
wait(fs, timeout=None, return_when=ALL_COMPLETED)
wait 接受三個(gè)參數(shù):
fs: 表示需要執(zhí)行的序列
timeout: 等待的最大時(shí)間,如果超過(guò)這個(gè)時(shí)間即使線程未執(zhí)行完成也將返回
return_when:表示wait返回結(jié)果的條件,默認(rèn)為 ALL_COMPLETED 全部執(zhí)行完成再返回
還是用上面那個(gè)例子來(lái)熟悉用法
示例:
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED, ALL_COMPLETED import time def spider(page): time.sleep(page) print(f"crawl task{page} finished") return page with ThreadPoolExecutor(max_workers=5) as t: all_task = [t.submit(spider, page) for page in range(1, 5)] wait(all_task, return_when=FIRST_COMPLETED) print('finished') print(wait(all_task, timeout=2.5)) # 運(yùn)行結(jié)果 crawl task1 finished finished crawl task2 finished crawl task3 finished DoneAndNotDoneFutures(done={<Future at 0x28c8710 state=finished returned int>, <Future at 0x2c2bfd0 state=finished returned int>, <Future at 0x2c1b7f0 state=finished returned int>}, not_done={<Future at 0x2c3a240 state=running>}) crawl task4 finished
1.代碼中返回的條件是:當(dāng)完成第一個(gè)任務(wù)的時(shí)候,就停止等待,繼續(xù)主線程任務(wù)
2.由于設(shè)置了延時(shí), 可以看到最后只有 task4 還在運(yùn)行中
- as_completed
上面雖然提供了判斷任務(wù)是否結(jié)束的方法,但是不能在主線程中一直判斷啊。最好的方法是當(dāng)某個(gè)任務(wù)結(jié)束了,就給主線程返回結(jié)果,而不是一直判斷每個(gè)任務(wù)是否結(jié)束。
ThreadPoolExecutorThreadPoolExecutor 中 的 as_completed() 就是這樣一個(gè)方法,當(dāng)子線程中的任務(wù)執(zhí)行完后,直接用 result() 獲取返回結(jié)果
用法如下:
# coding: utf-8 from concurrent.futures import ThreadPoolExecutor, as_completed import time def spider(page): time.sleep(page) print(f"crawl task{page} finished") return page def main(): with ThreadPoolExecutor(max_workers=5) as t: obj_list = [] for page in range(1, 5): obj = t.submit(spider, page) obj_list.append(obj) for future in as_completed(obj_list): data = future.result() print(f"main: {data}") # 執(zhí)行結(jié)果 crawl task1 finished main: 1 crawl task2 finished main: 2 crawl task3 finished main: 3 crawl task4 finished main: 4
as_completed() 方法是一個(gè)生成器,在沒(méi)有任務(wù)完成的時(shí)候,會(huì)一直阻塞,除非設(shè)置了 timeout。
當(dāng)有某個(gè)任務(wù)完成的時(shí)候,會(huì) yield 這個(gè)任務(wù),就能執(zhí)行 for 循環(huán)下面的語(yǔ)句,然后繼續(xù)阻塞住,循環(huán)到所有的任務(wù)結(jié)束。同時(shí),先完成的任務(wù)會(huì)先返回給主線程。
- map
map(fn, *iterables, timeout=None)
fn: 第一個(gè)參數(shù) fn 是需要線程執(zhí)行的函數(shù);
iterables:第二個(gè)參數(shù)接受一個(gè)可迭代對(duì)象;
timeout: 第三個(gè)參數(shù) timeout 跟 wait() 的 timeout 一樣,但由于 map 是返回線程執(zhí)行的結(jié)果,如果 timeout小于線程執(zhí)行時(shí)間會(huì)拋異常 TimeoutError。
用法如下:
import time from concurrent.futures import ThreadPoolExecutor def spider(page): time.sleep(page) return page start = time.time() executor = ThreadPoolExecutor(max_workers=4) i = 1 for result in executor.map(spider, [2, 3, 1, 4]): print("task{}:{}".format(i, result)) i += 1 # 運(yùn)行結(jié)果 task1:2 task2:3 task3:1 task4:4
使用 map 方法,無(wú)需提前使用 submit 方法,map 方法與 python 高階函數(shù) map 的含義相同,都是將序列中的每個(gè)元素都執(zhí)行同一個(gè)函數(shù)。
上面的代碼對(duì)列表中的每個(gè)元素都執(zhí)行 spider() 函數(shù),并分配各線程池。
可以看到執(zhí)行結(jié)果與上面的 as_completed() 方法的結(jié)果不同,輸出順序和列表的順序相同,就算 1s 的任務(wù)先執(zhí)行完成,也會(huì)先打印前面提交的任務(wù)返回的結(jié)果。
多線程實(shí)戰(zhàn)
以某網(wǎng)站為例,演示線程池和單線程兩種方式爬取的差異
# coding: utf-8 import requests from concurrent.futures import ThreadPoolExecutor, as_completed import time import json from requests import adapters from proxy import get_proxies headers = { "Host": "splcgk.court.gov.cn", "Origin": "https://splcgk.court.gov.cn", "User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36", "Referer": "https://splcgk.court.gov.cn/gzfwww/ktgg", } url = "https://splcgk.court.gov.cn/gzfwww/ktgglist?pageNo=1" def spider(page): data = { "bt": "", "fydw": "", "pageNum": page, } for _ in range(5): try: response = requests.post(url, headers=headers, data=data, proxies=get_proxies()) json_data = response.json() except (json.JSONDecodeError, adapters.SSLError): continue else: break else: return {} return json_data def main(): with ThreadPoolExecutor(max_workers=10) as t: obj_list = [] begin = time.time() for page in range(1, 15): obj = t.submit(spider, page) obj_list.append(obj) for future in as_completed(obj_list): data = future.result() print(data) print('*' * 50) times = time.time() - begin print(times) if __name__ == "__main__": main()
運(yùn)行結(jié)果:
單線程實(shí)戰(zhàn)
下面我們可以使用單線程來(lái)爬取,代碼基本和上面的一樣,加個(gè)單線程函數(shù)
代碼如下:
# coding: utf-8 import requests from concurrent.futures import ThreadPoolExecutor, as_completed import time import json from requests import adapters from proxy import get_proxies headers = { "Host": "splcgk.court.gov.cn", "Origin": "https://splcgk.court.gov.cn", "User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36", "Referer": "https://splcgk.court.gov.cn/gzfwww/ktgg", } url = "https://splcgk.court.gov.cn/gzfwww/ktgglist?pageNo=1" def spider(page): data = { "bt": "", "fydw": "", "pageNum": page, } for _ in range(5): try: response = requests.post(url, headers=headers, data=data, proxies=get_proxies()) json_data = response.json() except (json.JSONDecodeError, adapters.SSLError): continue else: break else: return {} return json_data def single(): begin = time.time() for page in range(1, 15): data = spider(page) print(data) print('*' * 50) times = time.time() - begin print(times) if __name__ == "__main__": single()
運(yùn)行結(jié)果:
可以看到,總共花了 19 秒。真是肉眼可見(jiàn)的差距??!如果數(shù)據(jù)量大的話,運(yùn)行時(shí)間差距會(huì)更大!
以上就是python線程池 ThreadPoolExecutor 的用法示例的詳細(xì)內(nèi)容,更多關(guān)于python線程池 ThreadPoolExecutor 的用法及實(shí)戰(zhàn)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
python安裝模塊如何通過(guò)setup.py安裝(超簡(jiǎn)單)
這篇文章主要介紹了python安裝模塊如何通過(guò)setup.py安裝,安裝方法其實(shí)很簡(jiǎn)單,感興趣的朋友跟隨腳本之家小編一起看看吧2018-05-05Python安裝第三方庫(kù)及常見(jiàn)問(wèn)題處理方法匯總
本文給大家匯總介紹了Python安裝第三方庫(kù)及常見(jiàn)問(wèn)題處理方法,非常的簡(jiǎn)單使用,有需要的小伙伴可以參考下2016-09-09Python對(duì)接 xray 和微信實(shí)現(xiàn)自動(dòng)告警
xray 是從長(zhǎng)亭洞鑒核心引擎中提取出的社區(qū)版漏洞掃描神器,支持主動(dòng)、被動(dòng)多種掃描方式,自備盲打平臺(tái)、可以靈活定義 POC,功能豐富,這篇文章主要介紹了對(duì)接 xray 和微信實(shí)現(xiàn)自動(dòng)告警,需要的朋友可以參考下2019-09-09Flask項(xiàng)目中實(shí)現(xiàn)短信驗(yàn)證碼和郵箱驗(yàn)證碼功能
這篇文章主要介紹了Flask項(xiàng)目中實(shí)現(xiàn)短信驗(yàn)證碼和郵箱驗(yàn)證碼功能,需本文通過(guò)截圖實(shí)例代碼的形式給大家介紹的非常詳細(xì),需要的朋友可以參考下2019-12-12Python機(jī)器學(xué)習(xí)庫(kù)scikit-learn安裝與基本使用教程
這篇文章主要介紹了Python機(jī)器學(xué)習(xí)庫(kù)scikit-learn安裝與基本使用,較為詳細(xì)的介紹了機(jī)器學(xué)習(xí)庫(kù)scikit-learn的功能、原理、基本安裝與簡(jiǎn)單使用方法,需要的朋友可以參考下2018-06-06趣味Python實(shí)戰(zhàn)練習(xí)之自動(dòng)更換桌面壁紙腳本附源碼
讀萬(wàn)卷書(shū)不如行萬(wàn)里路,學(xué)的扎不扎實(shí)要通過(guò)實(shí)戰(zhàn)才能看出來(lái),本篇文章手把手帶你編寫(xiě)一個(gè)自動(dòng)更換桌面壁紙的腳本,代碼簡(jiǎn)潔而且短,相信你一定看得懂,大家可以在過(guò)程中查缺補(bǔ)漏,看看自己掌握程度怎么樣2021-10-10Python實(shí)現(xiàn)將doc轉(zhuǎn)化pdf格式文檔的方法
這篇文章主要介紹了Python實(shí)現(xiàn)將doc轉(zhuǎn)化pdf格式文檔的方法,結(jié)合實(shí)例形式分析了Python實(shí)現(xiàn)doc格式文件讀取及轉(zhuǎn)換pdf格式文件的操作技巧,以及php調(diào)用py文件的具體實(shí)現(xiàn)方法,需要的朋友可以參考下2018-01-01