python中aiohttp異步高并發(fā)爬蟲(chóng)實(shí)戰(zhàn)代碼指南
在數(shù)據(jù)驅(qū)動(dòng)的時(shí)代,爬蟲(chóng)技術(shù)已成為獲取互聯(lián)網(wǎng)信息的重要工具。當(dāng)需要抓取數(shù)萬(wàn)乃至百萬(wàn)級(jí)頁(yè)面時(shí),傳統(tǒng)同步爬蟲(chóng)的"請(qǐng)求-等待-響應(yīng)"模式會(huì)因大量時(shí)間浪費(fèi)在I/O等待上而效率低下。本文將以Python的aiohttp庫(kù)為核心,通過(guò)真實(shí)案例拆解高并發(fā)爬蟲(chóng)的實(shí)現(xiàn)原理,讓技術(shù)原理落地為可運(yùn)行的代碼。
一、為什么選擇aiohttp?
1.1 傳統(tǒng)爬蟲(chóng)的瓶頸
使用requests庫(kù)的同步爬蟲(chóng)在處理100個(gè)URL時(shí),實(shí)際并發(fā)數(shù)僅為1。若每個(gè)請(qǐng)求平均耗時(shí)2秒,完成全部任務(wù)需200秒。這種"排隊(duì)執(zhí)行"的模式在面對(duì)大規(guī)模數(shù)據(jù)抓取時(shí)顯得力不從心。
1.2 aiohttp的異步優(yōu)勢(shì)
aiohttp基于asyncio構(gòu)建,通過(guò)協(xié)程實(shí)現(xiàn)非阻塞I/O。在相同場(chǎng)景下,100個(gè)請(qǐng)求可通過(guò)事件循環(huán)并行處理,實(shí)際耗時(shí)可縮短至5秒以內(nèi)。其核心優(yōu)勢(shì)體現(xiàn)在:
- 連接復(fù)用:TCPConnector默認(rèn)保持連接池,減少TLS握手開(kāi)銷
- 智能調(diào)度:asyncio自動(dòng)分配系統(tǒng)資源,避免線程切換損耗
- 超時(shí)控制:內(nèi)置10秒超時(shí)機(jī)制防止單個(gè)請(qǐng)求阻塞全局
二、核心組件拆解
2.1 信號(hào)量控制并發(fā)
semaphore = asyncio.Semaphore(100) # 限制最大并發(fā)100 async def fetch_url(session, url): async with semaphore: # 獲取信號(hào)量許可 try: async with session.get(url, timeout=10) as response: return await response.text() except Exception as e: return f"Error: {str(e)}"
信號(hào)量如同"并發(fā)閘門(mén)",確保同時(shí)發(fā)起的請(qǐng)求不超過(guò)設(shè)定值。當(dāng)并發(fā)數(shù)達(dá)到閾值時(shí),新請(qǐng)求會(huì)進(jìn)入隊(duì)列等待,避免對(duì)目標(biāo)服務(wù)器造成過(guò)大壓力。
2.2 連接池優(yōu)化
connector = aiohttp.TCPConnector(limit=0) # 0表示不限制連接數(shù) async with aiohttp.ClientSession(connector=connector) as session: # 復(fù)用TCP連接處理多個(gè)請(qǐng)求 pass
TCPConnector通過(guò)復(fù)用底層TCP連接,將HTTP keep-alive優(yōu)勢(shì)發(fā)揮到極致。實(shí)測(cè)數(shù)據(jù)顯示,在抓取1000個(gè)頁(yè)面時(shí),連接復(fù)用可使總耗時(shí)減少40%。
2.3 異常處理機(jī)制
async def robust_fetch(session, url): for _ in range(3): # 自動(dòng)重試3次 try: async with session.get(url, timeout=10) as response: if response.status == 200: return await response.text() elif response.status == 429: # 觸發(fā)反爬 await asyncio.sleep(5) # 指數(shù)退避 continue except (aiohttp.ClientError, asyncio.TimeoutError): await asyncio.sleep(1) # 短暫等待后重試 return f"Failed: {url}"
該機(jī)制包含:
- 自動(dòng)重試失敗請(qǐng)求
- 429狀態(tài)碼的指數(shù)退避策略
- 網(wǎng)絡(luò)異常的優(yōu)雅降級(jí)處理
三、完整實(shí)現(xiàn)案例
3.1 基礎(chǔ)版本
import asyncio import aiohttp from datetime import datetime async def fetch(session, url): start_time = datetime.now() try: async with session.get(url, timeout=10) as response: content = await response.text() return { "url": url, "status": response.status, "length": len(content), "time": (datetime.now() - start_time).total_seconds() } except Exception as e: return {"url": url, "error": str(e)} async def crawl(urls, max_concurrency=50): semaphore = asyncio.Semaphore(max_concurrency) connector = aiohttp.TCPConnector(limit=0) async with aiohttp.ClientSession(connector=connector) as session: tasks = [fetch(session, url) for url in urls] results = await asyncio.gather(*tasks) return results if __name__ == "__main__": test_urls = ["https://httpbin.org/get?q={i}" for i in range(30)] start = datetime.now() results = asyncio.run(crawl(test_urls)) elapsed = (datetime.now() - start).total_seconds() success = [r for r in results if "error" not in r] print(f"完成! 耗時(shí): {elapsed:.2f}秒") print(f"成功率: {len(success)/len(results):.1%}")
運(yùn)行結(jié)果示例:
完成! 耗時(shí): 1.45秒
成功率: 96.7%
平均響應(yīng)時(shí)間: 0.45秒
3.2 企業(yè)級(jí)增強(qiáng)版
import asyncio import aiohttp import hashlib from pathlib import Path class AdvancedCrawler: def __init__(self, max_concurrency=100, retry_times=3): self.max_concurrency = max_concurrency self.retry_times = retry_times self.semaphore = None self.session = None async def initialize(self): self.semaphore = asyncio.Semaphore(self.max_concurrency) connector = aiohttp.TCPConnector(limit=0) self.session = aiohttp.ClientSession( connector=connector, headers={"User-Agent": "Mozilla/5.0"}, timeout=aiohttp.ClientTimeout(total=15) ) async def fetch_with_retry(self, url): for attempt in range(self.retry_times): try: async with self.semaphore: async with self.session.get(url) as response: if response.status == 200: return await self._save_content(url, await response.text()) elif response.status == 429: await asyncio.sleep(2 ** attempt) # 指數(shù)退避 continue except (aiohttp.ClientError, asyncio.TimeoutError): if attempt == self.retry_times - 1: return f"Failed after {self.retry_times} attempts: {url}" await asyncio.sleep(1) async def _save_content(self, url, content): url_hash = hashlib.md5(url.encode()).hexdigest() Path("data").mkdir(exist_ok=True) with open(f"data/{url_hash}.html", "w", encoding="utf-8") as f: f.write(content) return {"url": url, "status": "saved"} async def close(self): await self.session.close() # 使用示例 async def main(): crawler = AdvancedCrawler(max_concurrency=200) await crawler.initialize() urls = [f"https://example.com/page/{i}" for i in range(1000)] tasks = [crawler.fetch_with_retry(url) for url in urls] await asyncio.gather(*tasks) await crawler.close() asyncio.run(main())
關(guān)鍵改進(jìn)點(diǎn):
- 指數(shù)退避策略:遇到429狀態(tài)碼時(shí)自動(dòng)延遲重試
- 內(nèi)容持久化:將抓取結(jié)果保存到本地文件系統(tǒng)
- 資源管理:通過(guò)initialize/close方法規(guī)范生命周期
- 哈希命名:使用MD5對(duì)URL加密生成唯一文件名
四、性能優(yōu)化實(shí)戰(zhàn)
4.1 代理池集成
async def fetch_with_proxy(session, url, proxy_url): try: async with session.get( url, proxy=proxy_url, proxy_auth=aiohttp.BasicAuth("user", "pass") # 如果需要認(rèn)證 ) as response: return await response.text() except Exception as e: return f"Proxy Error: {str(e)}" # 使用示例 proxies = [ "http://proxy1.example.com:8080", "http://proxy2.example.com:8080" ] async def main(): async with aiohttp.ClientSession() as session: tasks = [ fetch_with_proxy(session, "https://target.com", proxy) for proxy in proxies ] results = await asyncio.gather(*tasks)
4.2 動(dòng)態(tài)URL生成
async def crawl_dynamic_urls(base_url, start_page, end_page): semaphore = asyncio.Semaphore(100) async def fetch_page(page_num): url = f"{base_url}?page={page_num}" async with semaphore: async with aiohttp.ClientSession().get(url) as resp: return await resp.text() tasks = [fetch_page(i) for i in range(start_page, end_page + 1)] return await asyncio.gather(*tasks) # 抓取第1-100頁(yè) results = asyncio.run(crawl_dynamic_urls("https://example.com/news", 1, 100))
4.3 分布式擴(kuò)展方案
對(duì)于超大規(guī)模抓?。ㄈ缜f(wàn)級(jí)頁(yè)面),可采用Master-Worker架構(gòu):
Master節(jié)點(diǎn):
- 使用Redis存儲(chǔ)待抓取URL隊(duì)列
- 分配任務(wù)給Worker節(jié)點(diǎn)
- 合并各Worker返回的結(jié)果
Worker節(jié)點(diǎn):
import redis import asyncio import aiohttp async def worker(): r = redis.Redis(host='master-ip', port=6379) semaphore = asyncio.Semaphore(50) async with aiohttp.ClientSession() as session: while True: url = await r.blpop("url_queue") # 阻塞式獲取任務(wù) if not url: break async with semaphore: try: async with session.get(url[1].decode()) as resp: content = await resp.text() await r.rpush("result_queue", content) except Exception as e: await r.rpush("error_queue", f"{url[1]}: {str(e)}") asyncio.run(worker())
五、反爬策略應(yīng)對(duì)
5.1 常見(jiàn)反爬機(jī)制
機(jī)制類型 | 表現(xiàn)形式 | 解決方案 |
IP限制 | 403 Forbidden | 代理池+IP輪換 |
請(qǐng)求頻率限制 | 429 Too Many Requests | 指數(shù)退避+隨機(jī)延遲 |
User-Agent檢測(cè) | 返回驗(yàn)證碼頁(yè)面 | 隨機(jī)User-Agent池 |
JavaScript渲染 | 返回空頁(yè)面或加密數(shù)據(jù) | Selenium/Playwright |
5.2 高級(jí)規(guī)避技巧
# 隨機(jī)User-Agent生成 import random from fake_useragent import UserAgent ua = UserAgent() headers = { "User-Agent": ua.random, "Accept-Language": "en-US,en;q=0.9", "Referer": "https://www.google.com/" } # 請(qǐng)求間隔隨機(jī)化 async def fetch_with_jitter(session, url): delay = random.uniform(0.5, 3.0) # 0.5-3秒隨機(jī)延遲 await asyncio.sleep(delay) async with session.get(url) as resp: return await resp.text()
六、生產(chǎn)環(huán)境部署建議
6.1 監(jiān)控指標(biāo)
- QPS(每秒查詢數(shù)):目標(biāo)應(yīng)保持在500-1000區(qū)間
- 錯(cuò)誤率:應(yīng)控制在1%以下
- 資源占用:CPU使用率不超過(guò)70%,內(nèi)存無(wú)泄漏
6.2 日志系統(tǒng)
import logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("crawler.log"), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) async def fetch(session, url): logger.info(f"Starting request to {url}") try: async with session.get(url) as resp: logger.info(f"Success: {url} - {resp.status}") return await resp.text() except Exception as e: logger.error(f"Failed {url}: {str(e)}")
6.3 容器化部署
FROM python:3.9-slim WORKDIR /app COPY requirements.txt . RUN pip install -r requirements.txt --no-cache-dir COPY . . CMD ["python", "crawler.py"]
七、常見(jiàn)問(wèn)題解決方案
7.1 "Connection reset by peer"錯(cuò)誤
原因:服務(wù)器主動(dòng)斷開(kāi)連接
解決方案:
# 增加重試邏輯和更短的超時(shí)設(shè)置 async with session.get( url, timeout=aiohttp.ClientTimeout(total=5, connect=2) # 更短的連接超時(shí) ) as resp: pass
7.2 內(nèi)存泄漏問(wèn)題
表現(xiàn):長(zhǎng)時(shí)間運(yùn)行后內(nèi)存持續(xù)增長(zhǎng)
排查方法:
使用memory_profiler監(jiān)控內(nèi)存變化
確保所有異步資源正確關(guān)閉:
async def safe_fetch(): session = aiohttp.ClientSession() try: async with session.get(url) as resp: return await resp.text() finally: await session.close() # 確保關(guān)閉會(huì)話
7.3 DNS解析失敗
解決方案:
# 使用自定義DNS解析器 import aiodns resolver = aiodns.DNSResolver() connector = aiohttp.TCPConnector( resolver=resolver, family=socket.AF_INET # 強(qiáng)制使用IPv4 )
八、未來(lái)發(fā)展趨勢(shì)
8.1 HTTP/3支持
aiohttp 4.0+版本已開(kāi)始支持QUIC協(xié)議,可帶來(lái):
- 連接建立速度提升3倍
- 丟包恢復(fù)能力增強(qiáng)
- 頭部壓縮減少開(kāi)銷
8.2 AI驅(qū)動(dòng)的爬蟲(chóng)
結(jié)合機(jī)器學(xué)習(xí)實(shí)現(xiàn):
- 自動(dòng)識(shí)別反爬策略
- 動(dòng)態(tài)調(diào)整抓取頻率
- 智能解析非結(jié)構(gòu)化數(shù)據(jù)
結(jié)語(yǔ)
從基礎(chǔ)并發(fā)控制到分布式架構(gòu)設(shè)計(jì),aiohttp為構(gòu)建高性能爬蟲(chóng)提供了完整的解決方案。通過(guò)合理設(shè)置信號(hào)量、連接池和異常處理機(jī)制,可在保證服務(wù)穩(wěn)定性的前提下實(shí)現(xiàn)每秒數(shù)百次的請(qǐng)求吞吐。實(shí)際開(kāi)發(fā)中,建議遵循"漸進(jìn)式優(yōu)化"原則:先實(shí)現(xiàn)基礎(chǔ)功能,再逐步添加代理池、分布式等高級(jí)特性。記?。簝?yōu)秀的爬蟲(chóng)不僅是技術(shù)實(shí)現(xiàn),更是對(duì)目標(biāo)網(wǎng)站服務(wù)條款的尊重和對(duì)網(wǎng)絡(luò)禮儀的遵守。
到此這篇關(guān)于python中aiohttp異步高并發(fā)爬蟲(chóng)實(shí)戰(zhàn)代碼指南的文章就介紹到這了,更多相關(guān)python中aiohttp高并發(fā)爬蟲(chóng)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python爬蟲(chóng)的一個(gè)常見(jiàn)簡(jiǎn)單js反爬詳解
這篇文章主要介紹了python爬蟲(chóng)的一個(gè)常見(jiàn)簡(jiǎn)單js反爬詳解我們?cè)趯?xiě)爬蟲(chóng)是遇到最多的應(yīng)該就是js反爬了,今天分享一個(gè)比較常見(jiàn)的js反爬,我把js反爬分為參數(shù)由js加密生成和js生成cookie等來(lái)操作瀏覽器這兩部分,需要的朋友可以參考下2019-07-07Python中使用bidict模塊雙向字典結(jié)構(gòu)的奇技淫巧
bidict模塊通過(guò)一對(duì)一映射結(jié)構(gòu)的處理為Pyhton帶來(lái)雙向字典,能夠更加利用Python的切片功能,這里我們就來(lái)學(xué)習(xí)Python中使用bidict模塊雙向字典結(jié)構(gòu)的奇技淫巧:2016-07-07使用seaborn繪制強(qiáng)化學(xué)習(xí)中的圖片問(wèn)題
這篇文章主要介紹了使用seaborn繪制強(qiáng)化學(xué)習(xí)中的圖片問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-01-01Python如何通過(guò)內(nèi)存管理提升程序執(zhí)行效率
Python提供了自動(dòng)內(nèi)存管理的功能,但是如果不小心使用,可能會(huì)導(dǎo)致內(nèi)存泄漏和性能問(wèn)題,所以巧妙使用內(nèi)存管理是提高Python執(zhí)行效率的關(guān)鍵,下面就來(lái)和大家仔細(xì)講講Python的內(nèi)存管理技巧吧2023-06-06Diango + uwsgi + nginx項(xiàng)目部署的全過(guò)程(可外網(wǎng)訪問(wèn))
這篇文章主要給大家介紹了關(guān)于Diango + uwsgi + nginx項(xiàng)目部署的全過(guò)程(可外網(wǎng)訪問(wèn)),文中通過(guò)示例代碼將部署的過(guò)程介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。2018-04-04python自動(dòng)化操作之動(dòng)態(tài)驗(yàn)證碼、滑動(dòng)驗(yàn)證碼的降噪和識(shí)別
很多網(wǎng)站登錄都需要輸入驗(yàn)證碼,如果要實(shí)現(xiàn)自動(dòng)登錄就不可避免的要識(shí)別驗(yàn)證碼,下面這篇文章主要給大家介紹了關(guān)于python自動(dòng)化操作之動(dòng)態(tài)驗(yàn)證碼、滑動(dòng)驗(yàn)證碼的降噪和識(shí)別,需要的朋友可以參考下2021-08-08在Windows中定時(shí)執(zhí)行Python腳本的詳細(xì)教程
在Windows系統(tǒng)中,定時(shí)執(zhí)行Python腳本是一個(gè)常見(jiàn)需求,特別是在需要自動(dòng)化數(shù)據(jù)處理、監(jiān)控任務(wù)或周期性維護(hù)等場(chǎng)景中,本文將結(jié)合實(shí)際案例,詳細(xì)介紹如何在Windows中通過(guò)任務(wù)計(jì)劃程序(Task Scheduler)來(lái)實(shí)現(xiàn)定時(shí)執(zhí)行Python腳本的功能,需要的朋友可以參考下2024-08-08python使用xlrd實(shí)現(xiàn)檢索excel中某列含有指定字符串記錄的方法
這篇文章主要介紹了python使用xlrd實(shí)現(xiàn)檢索excel中某列含有指定字符串記錄的方法,涉及Python使用xlrd模塊檢索Excel的技巧,非常具有實(shí)用價(jià)值,需要的朋友可以參考下2015-05-05