欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

python中aiohttp異步高并發(fā)爬蟲實戰(zhàn)代碼指南

 更新時間:2025年07月19日 14:17:55   作者:1站大爺IP  
本文詳解python中aiohttp異步爬蟲技術(shù),通過信號量、連接池和異常處理優(yōu)化高并發(fā)效率,解決傳統(tǒng)同步爬蟲I/O等待瓶頸,結(jié)合代理池、分布式架構(gòu)應(yīng)對反爬策略,強(qiáng)調(diào)技術(shù)落地與合規(guī)性平衡

 在數(shù)據(jù)驅(qū)動的時代,爬蟲技術(shù)已成為獲取互聯(lián)網(wǎng)信息的重要工具。當(dāng)需要抓取數(shù)萬乃至百萬級頁面時,傳統(tǒng)同步爬蟲的"請求-等待-響應(yīng)"模式會因大量時間浪費(fèi)在I/O等待上而效率低下。本文將以Python的aiohttp庫為核心,通過真實案例拆解高并發(fā)爬蟲的實現(xiàn)原理,讓技術(shù)原理落地為可運(yùn)行的代碼。

一、為什么選擇aiohttp?

1.1 傳統(tǒng)爬蟲的瓶頸

使用requests庫的同步爬蟲在處理100個URL時,實際并發(fā)數(shù)僅為1。若每個請求平均耗時2秒,完成全部任務(wù)需200秒。這種"排隊執(zhí)行"的模式在面對大規(guī)模數(shù)據(jù)抓取時顯得力不從心。

1.2 aiohttp的異步優(yōu)勢

aiohttp基于asyncio構(gòu)建,通過協(xié)程實現(xiàn)非阻塞I/O。在相同場景下,100個請求可通過事件循環(huán)并行處理,實際耗時可縮短至5秒以內(nèi)。其核心優(yōu)勢體現(xiàn)在:

  • 連接復(fù)用:TCPConnector默認(rèn)保持連接池,減少TLS握手開銷
  • 智能調(diào)度:asyncio自動分配系統(tǒng)資源,避免線程切換損耗
  • 超時控制:內(nèi)置10秒超時機(jī)制防止單個請求阻塞全局

二、核心組件拆解

2.1 信號量控制并發(fā)

semaphore = asyncio.Semaphore(100)  # 限制最大并發(fā)100
 
async def fetch_url(session, url):
    async with semaphore:  # 獲取信號量許可
        try:
            async with session.get(url, timeout=10) as response:
                return await response.text()
        except Exception as e:
            return f"Error: {str(e)}"

信號量如同"并發(fā)閘門",確保同時發(fā)起的請求不超過設(shè)定值。當(dāng)并發(fā)數(shù)達(dá)到閾值時,新請求會進(jìn)入隊列等待,避免對目標(biāo)服務(wù)器造成過大壓力。

2.2 連接池優(yōu)化

connector = aiohttp.TCPConnector(limit=0)  # 0表示不限制連接數(shù)
async with aiohttp.ClientSession(connector=connector) as session:
    # 復(fù)用TCP連接處理多個請求
    pass

TCPConnector通過復(fù)用底層TCP連接,將HTTP keep-alive優(yōu)勢發(fā)揮到極致。實測數(shù)據(jù)顯示,在抓取1000個頁面時,連接復(fù)用可使總耗時減少40%。

2.3 異常處理機(jī)制

async def robust_fetch(session, url):
    for _ in range(3):  # 自動重試3次
        try:
            async with session.get(url, timeout=10) as response:
                if response.status == 200:
                    return await response.text()
                elif response.status == 429:  # 觸發(fā)反爬
                    await asyncio.sleep(5)  # 指數(shù)退避
                    continue
        except (aiohttp.ClientError, asyncio.TimeoutError):
            await asyncio.sleep(1)  # 短暫等待后重試
    return f"Failed: {url}"

該機(jī)制包含:

  • 自動重試失敗請求
  • 429狀態(tài)碼的指數(shù)退避策略
  • 網(wǎng)絡(luò)異常的優(yōu)雅降級處理

三、完整實現(xiàn)案例

3.1 基礎(chǔ)版本

import asyncio
import aiohttp
from datetime import datetime
 
async def fetch(session, url):
    start_time = datetime.now()
    try:
        async with session.get(url, timeout=10) as response:
            content = await response.text()
            return {
                "url": url,
                "status": response.status,
                "length": len(content),
                "time": (datetime.now() - start_time).total_seconds()
            }
    except Exception as e:
        return {"url": url, "error": str(e)}
 
async def crawl(urls, max_concurrency=50):
    semaphore = asyncio.Semaphore(max_concurrency)
    connector = aiohttp.TCPConnector(limit=0)
    
    async with aiohttp.ClientSession(connector=connector) as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results
 
if __name__ == "__main__":
    test_urls = ["https://httpbin.org/get?q={i}" for i in range(30)]
    start = datetime.now()
    results = asyncio.run(crawl(test_urls))
    elapsed = (datetime.now() - start).total_seconds()
    
    success = [r for r in results if "error" not in r]
    print(f"完成! 耗時: {elapsed:.2f}秒")
    print(f"成功率: {len(success)/len(results):.1%}")

運(yùn)行結(jié)果示例:

完成! 耗時: 1.45秒
成功率: 96.7%
平均響應(yīng)時間: 0.45秒

3.2 企業(yè)級增強(qiáng)版

import asyncio
import aiohttp
import hashlib
from pathlib import Path
 
class AdvancedCrawler:
    def __init__(self, max_concurrency=100, retry_times=3):
        self.max_concurrency = max_concurrency
        self.retry_times = retry_times
        self.semaphore = None
        self.session = None
        
    async def initialize(self):
        self.semaphore = asyncio.Semaphore(self.max_concurrency)
        connector = aiohttp.TCPConnector(limit=0)
        self.session = aiohttp.ClientSession(
            connector=connector,
            headers={"User-Agent": "Mozilla/5.0"},
            timeout=aiohttp.ClientTimeout(total=15)
        )
    
    async def fetch_with_retry(self, url):
        for attempt in range(self.retry_times):
            try:
                async with self.semaphore:
                    async with self.session.get(url) as response:
                        if response.status == 200:
                            return await self._save_content(url, await response.text())
                        elif response.status == 429:
                            await asyncio.sleep(2 ** attempt)  # 指數(shù)退避
                            continue
            except (aiohttp.ClientError, asyncio.TimeoutError):
                if attempt == self.retry_times - 1:
                    return f"Failed after {self.retry_times} attempts: {url}"
                await asyncio.sleep(1)
    
    async def _save_content(self, url, content):
        url_hash = hashlib.md5(url.encode()).hexdigest()
        Path("data").mkdir(exist_ok=True)
        with open(f"data/{url_hash}.html", "w", encoding="utf-8") as f:
            f.write(content)
        return {"url": url, "status": "saved"}
    
    async def close(self):
        await self.session.close()
 
# 使用示例
async def main():
    crawler = AdvancedCrawler(max_concurrency=200)
    await crawler.initialize()
    
    urls = [f"https://example.com/page/{i}" for i in range(1000)]
    tasks = [crawler.fetch_with_retry(url) for url in urls]
    await asyncio.gather(*tasks)
    await crawler.close()
 
asyncio.run(main())

關(guān)鍵改進(jìn)點(diǎn):

  • 指數(shù)退避策略:遇到429狀態(tài)碼時自動延遲重試
  • 內(nèi)容持久化:將抓取結(jié)果保存到本地文件系統(tǒng)
  • 資源管理:通過initialize/close方法規(guī)范生命周期
  • 哈希命名:使用MD5對URL加密生成唯一文件名

四、性能優(yōu)化實戰(zhàn)

4.1 代理池集成

async def fetch_with_proxy(session, url, proxy_url):
    try:
        async with session.get(
            url,
            proxy=proxy_url,
            proxy_auth=aiohttp.BasicAuth("user", "pass")  # 如果需要認(rèn)證
        ) as response:
            return await response.text()
    except Exception as e:
        return f"Proxy Error: {str(e)}"
 
# 使用示例
proxies = [
    "http://proxy1.example.com:8080",
    "http://proxy2.example.com:8080"
]
 
async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [
            fetch_with_proxy(session, "https://target.com", proxy)
            for proxy in proxies
        ]
        results = await asyncio.gather(*tasks)

4.2 動態(tài)URL生成

async def crawl_dynamic_urls(base_url, start_page, end_page):
    semaphore = asyncio.Semaphore(100)
    
    async def fetch_page(page_num):
        url = f"{base_url}?page={page_num}"
        async with semaphore:
            async with aiohttp.ClientSession().get(url) as resp:
                return await resp.text()
    
    tasks = [fetch_page(i) for i in range(start_page, end_page + 1)]
    return await asyncio.gather(*tasks)
 
# 抓取第1-100頁
results = asyncio.run(crawl_dynamic_urls("https://example.com/news", 1, 100))

4.3 分布式擴(kuò)展方案

對于超大規(guī)模抓?。ㄈ缜f級頁面),可采用Master-Worker架構(gòu):

Master節(jié)點(diǎn):

  • 使用Redis存儲待抓取URL隊列
  • 分配任務(wù)給Worker節(jié)點(diǎn)
  • 合并各Worker返回的結(jié)果

Worker節(jié)點(diǎn):

import redis
import asyncio
import aiohttp
 
async def worker():
    r = redis.Redis(host='master-ip', port=6379)
    semaphore = asyncio.Semaphore(50)
    
    async with aiohttp.ClientSession() as session:
        while True:
            url = await r.blpop("url_queue")  # 阻塞式獲取任務(wù)
            if not url:
                break
                
            async with semaphore:
                try:
                    async with session.get(url[1].decode()) as resp:
                        content = await resp.text()
                        await r.rpush("result_queue", content)
                except Exception as e:
                    await r.rpush("error_queue", f"{url[1]}: {str(e)}")
 
asyncio.run(worker())

五、反爬策略應(yīng)對

5.1 常見反爬機(jī)制

機(jī)制類型

表現(xiàn)形式

解決方案

IP限制

403 Forbidden

代理池+IP輪換

請求頻率限制

429 Too Many Requests

指數(shù)退避+隨機(jī)延遲

User-Agent檢測

返回驗證碼頁面

隨機(jī)User-Agent池

JavaScript渲染

返回空頁面或加密數(shù)據(jù)

Selenium/Playwright

5.2 高級規(guī)避技巧

# 隨機(jī)User-Agent生成
import random
from fake_useragent import UserAgent
 
ua = UserAgent()
headers = {
    "User-Agent": ua.random,
    "Accept-Language": "en-US,en;q=0.9",
    "Referer": "https://www.google.com/"
}
 
# 請求間隔隨機(jī)化
async def fetch_with_jitter(session, url):
    delay = random.uniform(0.5, 3.0)  # 0.5-3秒隨機(jī)延遲
    await asyncio.sleep(delay)
    async with session.get(url) as resp:
        return await resp.text()

六、生產(chǎn)環(huán)境部署建議

6.1 監(jiān)控指標(biāo)

  • QPS(每秒查詢數(shù)):目標(biāo)應(yīng)保持在500-1000區(qū)間
  • 錯誤率:應(yīng)控制在1%以下
  • 資源占用:CPU使用率不超過70%,內(nèi)存無泄漏

6.2 日志系統(tǒng)

import logging
 
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("crawler.log"),
        logging.StreamHandler()
    ]
)
 
logger = logging.getLogger(__name__)
 
async def fetch(session, url):
    logger.info(f"Starting request to {url}")
    try:
        async with session.get(url) as resp:
            logger.info(f"Success: {url} - {resp.status}")
            return await resp.text()
    except Exception as e:
        logger.error(f"Failed {url}: {str(e)}")

6.3 容器化部署

FROM python:3.9-slim
 
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt --no-cache-dir
 
COPY . .
CMD ["python", "crawler.py"]

七、常見問題解決方案

7.1 "Connection reset by peer"錯誤

原因:服務(wù)器主動斷開連接
解決方案:

# 增加重試邏輯和更短的超時設(shè)置
async with session.get(
    url,
    timeout=aiohttp.ClientTimeout(total=5, connect=2)  # 更短的連接超時
) as resp:
    pass

7.2 內(nèi)存泄漏問題

表現(xiàn):長時間運(yùn)行后內(nèi)存持續(xù)增長
排查方法:

使用memory_profiler監(jiān)控內(nèi)存變化
確保所有異步資源正確關(guān)閉:

async def safe_fetch():
    session = aiohttp.ClientSession()
    try:
        async with session.get(url) as resp:
            return await resp.text()
    finally:
        await session.close()  # 確保關(guān)閉會話

7.3 DNS解析失敗

解決方案:

# 使用自定義DNS解析器
import aiodns
 
resolver = aiodns.DNSResolver()
connector = aiohttp.TCPConnector(
    resolver=resolver,
    family=socket.AF_INET  # 強(qiáng)制使用IPv4
)

八、未來發(fā)展趨勢

8.1 HTTP/3支持
aiohttp 4.0+版本已開始支持QUIC協(xié)議,可帶來:

  • 連接建立速度提升3倍
  • 丟包恢復(fù)能力增強(qiáng)
  • 頭部壓縮減少開銷

8.2 AI驅(qū)動的爬蟲
結(jié)合機(jī)器學(xué)習(xí)實現(xiàn):

  • 自動識別反爬策略
  • 動態(tài)調(diào)整抓取頻率
  • 智能解析非結(jié)構(gòu)化數(shù)據(jù)

結(jié)語

從基礎(chǔ)并發(fā)控制到分布式架構(gòu)設(shè)計,aiohttp為構(gòu)建高性能爬蟲提供了完整的解決方案。通過合理設(shè)置信號量、連接池和異常處理機(jī)制,可在保證服務(wù)穩(wěn)定性的前提下實現(xiàn)每秒數(shù)百次的請求吞吐。實際開發(fā)中,建議遵循"漸進(jìn)式優(yōu)化"原則:先實現(xiàn)基礎(chǔ)功能,再逐步添加代理池、分布式等高級特性。記?。簝?yōu)秀的爬蟲不僅是技術(shù)實現(xiàn),更是對目標(biāo)網(wǎng)站服務(wù)條款的尊重和對網(wǎng)絡(luò)禮儀的遵守。

到此這篇關(guān)于python中aiohttp異步高并發(fā)爬蟲實戰(zhàn)代碼指南的文章就介紹到這了,更多相關(guān)python中aiohttp高并發(fā)爬蟲內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • python爬蟲的一個常見簡單js反爬詳解

    python爬蟲的一個常見簡單js反爬詳解

    這篇文章主要介紹了python爬蟲的一個常見簡單js反爬詳解我們在寫爬蟲是遇到最多的應(yīng)該就是js反爬了,今天分享一個比較常見的js反爬,我把js反爬分為參數(shù)由js加密生成和js生成cookie等來操作瀏覽器這兩部分,需要的朋友可以參考下
    2019-07-07
  • Anaconda的安裝與虛擬環(huán)境建立

    Anaconda的安裝與虛擬環(huán)境建立

    這篇文章主要介紹了Anaconda的安裝與虛擬環(huán)境建立
    2020-11-11
  • Python中如何保留并查看關(guān)鍵字

    Python中如何保留并查看關(guān)鍵字

    保留關(guān)鍵字是Python語言中具有特殊含義和功能的詞匯,這些詞匯構(gòu)成了Python的語法基礎(chǔ),下面就跟隨小編一起來了解下Python中如何保留和查看這些關(guān)鍵字吧
    2025-04-04
  • Python中使用bidict模塊雙向字典結(jié)構(gòu)的奇技淫巧

    Python中使用bidict模塊雙向字典結(jié)構(gòu)的奇技淫巧

    bidict模塊通過一對一映射結(jié)構(gòu)的處理為Pyhton帶來雙向字典,能夠更加利用Python的切片功能,這里我們就來學(xué)習(xí)Python中使用bidict模塊雙向字典結(jié)構(gòu)的奇技淫巧:
    2016-07-07
  • 使用seaborn繪制強(qiáng)化學(xué)習(xí)中的圖片問題

    使用seaborn繪制強(qiáng)化學(xué)習(xí)中的圖片問題

    這篇文章主要介紹了使用seaborn繪制強(qiáng)化學(xué)習(xí)中的圖片問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-01-01
  • Python如何通過內(nèi)存管理提升程序執(zhí)行效率

    Python如何通過內(nèi)存管理提升程序執(zhí)行效率

    Python提供了自動內(nèi)存管理的功能,但是如果不小心使用,可能會導(dǎo)致內(nèi)存泄漏和性能問題,所以巧妙使用內(nèi)存管理是提高Python執(zhí)行效率的關(guān)鍵,下面就來和大家仔細(xì)講講Python的內(nèi)存管理技巧吧
    2023-06-06
  • Diango + uwsgi + nginx項目部署的全過程(可外網(wǎng)訪問)

    Diango + uwsgi + nginx項目部署的全過程(可外網(wǎng)訪問)

    這篇文章主要給大家介紹了關(guān)于Diango + uwsgi + nginx項目部署的全過程(可外網(wǎng)訪問),文中通過示例代碼將部署的過程介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧。
    2018-04-04
  • python自動化操作之動態(tài)驗證碼、滑動驗證碼的降噪和識別

    python自動化操作之動態(tài)驗證碼、滑動驗證碼的降噪和識別

    很多網(wǎng)站登錄都需要輸入驗證碼,如果要實現(xiàn)自動登錄就不可避免的要識別驗證碼,下面這篇文章主要給大家介紹了關(guān)于python自動化操作之動態(tài)驗證碼、滑動驗證碼的降噪和識別,需要的朋友可以參考下
    2021-08-08
  • 在Windows中定時執(zhí)行Python腳本的詳細(xì)教程

    在Windows中定時執(zhí)行Python腳本的詳細(xì)教程

    在Windows系統(tǒng)中,定時執(zhí)行Python腳本是一個常見需求,特別是在需要自動化數(shù)據(jù)處理、監(jiān)控任務(wù)或周期性維護(hù)等場景中,本文將結(jié)合實際案例,詳細(xì)介紹如何在Windows中通過任務(wù)計劃程序(Task Scheduler)來實現(xiàn)定時執(zhí)行Python腳本的功能,需要的朋友可以參考下
    2024-08-08
  • python使用xlrd實現(xiàn)檢索excel中某列含有指定字符串記錄的方法

    python使用xlrd實現(xiàn)檢索excel中某列含有指定字符串記錄的方法

    這篇文章主要介紹了python使用xlrd實現(xiàn)檢索excel中某列含有指定字符串記錄的方法,涉及Python使用xlrd模塊檢索Excel的技巧,非常具有實用價值,需要的朋友可以參考下
    2015-05-05

最新評論