欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python實現(xiàn)的異步代理爬蟲及代理池

 更新時間:2017年03月17日 09:38:56   作者:xmwd  
本文主要介紹了Python實現(xiàn)異步代理爬蟲及代理池的相關知識,具有很好的參考價值,下面跟著小編一起來看下吧

使用python asyncio實現(xiàn)了一個異步代理池,根據規(guī)則爬取代理網站上的免費代理,在驗證其有效后存入redis中,定期擴展代理的數量并檢驗池中代理的有效性,移除失效的代理。同時用aiohttp實現(xiàn)了一個server,其他的程序可以通過訪問相應的url來從代理池中獲取代理。

源碼

Github

環(huán)境

  • Python 3.5+
  • Redis
  • PhantomJS(可選)
  • Supervisord(可選)

因為代碼中大量使用了asyncio的async和await語法,它們是在Python3.5中才提供的,所以最好使用Python3.5及以上的版本,我使用的是Python3.6。

依賴

  • redis
  • aiohttp
  • bs4
  • lxml
  • requests
  • selenium

selenium包主要是用來操作PhantomJS的。

下面來對代碼進行說明。

1. 爬蟲部分

核心代碼

async def start(self):
 for rule in self._rules:
 parser = asyncio.ensure_future(self._parse_page(rule)) # 根據規(guī)則解析頁面來獲取代理
 logger.debug('{0} crawler started'.format(rule.__rule_name__))
 if not rule.use_phantomjs:
  await page_download(ProxyCrawler._url_generator(rule), self._pages, self._stop_flag) # 爬取代理網站的頁面
 else:
  await page_download_phantomjs(ProxyCrawler._url_generator(rule), self._pages,
rule.phantomjs_load_flag, self._stop_flag) # 使用PhantomJS爬取
 await self._pages.join()
 parser.cancel()
 logger.debug('{0} crawler finished'.format(rule.__rule_name__))

上面的核心代碼實際上是一個用asyncio.Queue實現(xiàn)的生產-消費者模型,下面是該模型的一個簡單實現(xiàn):

import asyncio
from random import random
async def produce(queue, n):
 for x in range(1, n + 1):
 print('produce ', x)
 await asyncio.sleep(random())
 await queue.put(x) # 向queue中放入item
async def consume(queue):
 while 1:
 item = await queue.get() # 等待從queue中獲取item
 print('consume ', item)
 await asyncio.sleep(random())
 queue.task_done() # 通知queue當前item處理完畢 
async def run(n):
 queue = asyncio.Queue()
 consumer = asyncio.ensure_future(consume(queue))
 await produce(queue, n) # 等待生產者結束
 await queue.join() # 阻塞直到queue不為空
 consumer.cancel() # 取消消費者任務,否則它會一直阻塞在get方法處
def aio_queue_run(n):
 loop = asyncio.get_event_loop()
 try:
 loop.run_until_complete(run(n)) # 持續(xù)運行event loop直到任務run(n)結束
 finally:
 loop.close()
if __name__ == '__main__':
 aio_queue_run(5)

運行上面的代碼,一種可能的輸出如下:

produce 1
produce 2
consume 1
produce 3
produce 4
consume 2
produce 5
consume 3
consume 4
consume 5

爬取頁面

async def page_download(urls, pages, flag):
 url_generator = urls
 async with aiohttp.ClientSession() as session:
 for url in url_generator:
  if flag.is_set():
  break
  await asyncio.sleep(uniform(delay - 0.5, delay + 1))
  logger.debug('crawling proxy web page {0}'.format(url))
  try:
  async with session.get(url, headers=headers, timeout=10) as response:
   page = await response.text()
   parsed = html.fromstring(decode_html(page)) # 使用bs4來輔助lxml解碼網頁:http://lxml.de/elementsoup.html#Using only the encoding detection
   await pages.put(parsed)
   url_generator.send(parsed) # 根據當前頁面來獲取下一頁的地址
  except StopIteration:
  break
  except asyncio.TimeoutError:
  logger.error('crawling {0} timeout'.format(url))
  continue # TODO: use a proxy
  except Exception as e:
  logger.error(e)

使用aiohttp實現(xiàn)的網頁爬取函數,大部分代理網站都可以使用上面的方法來爬取,對于使用js動態(tài)生成頁面的網站可以使用selenium控制PhantomJS來爬取——本項目對爬蟲的效率要求不高,代理網站的更新頻率是有限的,不需要頻繁的爬取,完全可以使用PhantomJS。

解析代理

最簡單的莫過于用xpath來解析代理了,使用Chrome瀏覽器的話,直接通過右鍵就能獲得選中的頁面元素的xpath:

 

安裝Chrome的擴展“XPath Helper”就可以直接在頁面上運行和調試xpath,十分方便:

 

BeautifulSoup不支持xpath,使用lxml來解析頁面,代碼如下:

async def _parse_proxy(self, rule, page):
 ips = page.xpath(rule.ip_xpath) # 根據xpath解析得到list類型的ip地址集合
 ports = page.xpath(rule.port_xpath) # 根據xpath解析得到list類型的ip地址集合
 if not ips or not ports:
 logger.warning('{2} crawler could not get ip(len={0}) or port(len={1}), please check the xpaths or network'.
  format(len(ips), len(ports), rule.__rule_name__))
 return
 proxies = map(lambda x, y: '{0}:{1}'.format(x.text.strip(), y.text.strip()), ips, ports)
 if rule.filters: # 根據過濾字段來過濾代理,如“高匿”、“透明”等
 filters = []
 for i, ft in enumerate(rule.filters_xpath):
  field = page.xpath(ft)
  if not field:
  logger.warning('{1} crawler could not get {0} field, please check the filter xpath'.
   format(rule.filters[i], rule.__rule_name__))
  continue
  filters.append(map(lambda x: x.text.strip(), field))
 filters = zip(*filters)
 selector = map(lambda x: x == rule.filters, filters)
 proxies = compress(proxies, selector)
for proxy in proxies:
await self._proxies.put(proxy) # 解析后的代理放入asyncio.Queue中

爬蟲規(guī)則

網站爬取、代理解析、濾等等操作的規(guī)則都是由各個代理網站的規(guī)則類定義的,使用元類和基類來管理規(guī)則類。基類定義如下:

class CrawlerRuleBase(object, metaclass=CrawlerRuleMeta):
 start_url = None
 page_count = 0
 urls_format = None
 next_page_xpath = None
 next_page_host = ''
 use_phantomjs = False
 phantomjs_load_flag = None
 filters = ()
 ip_xpath = None
 port_xpath = None
 filters_xpath = ()

各個參數的含義如下:

start_url(必需)

爬蟲的起始頁面。

ip_xpath(必需)

爬取IP的xpath規(guī)則。

port_xpath(必需)

爬取端口號的xpath規(guī)則。

page_count

爬取的頁面數量。

urls_format

頁面地址的格式字符串,通過urls_format.format(start_url, n)來生成第n頁的地址,這是比較常見的頁面地址格式。

next_page_xpathnext_page_host

由xpath規(guī)則來獲取下一頁的url(常見的是相對路徑),結合host得到下一頁的地址:next_page_host + url。

use_phantomjs, phantomjs_load_flag

use_phantomjs用于標識爬取該網站是否需要使用PhantomJS,若使用,需定義phantomjs_load_flag(網頁上的某個元素,str類型)作為PhantomJS頁面加載完畢的標志。

filters

過濾字段集合,可迭代類型。用于過濾代理。

爬取各個過濾字段的xpath規(guī)則,與過濾字段按順序一一對應。

元類CrawlerRuleMeta用于管理規(guī)則類的定義,如:如果定義use_phantomjs=True,則必須定義phantomjs_load_flag,否則會拋出異常,不在此贅述。

目前已經實現(xiàn)的規(guī)則有西刺代理、快代理、360代理、66代理和 秘密代理。新增規(guī)則類也很簡單,通過繼承CrawlerRuleBase來定義新的規(guī)則類YourRuleClass,放在proxypool/rules目錄下,并在該目錄下的__init__.py中添加from . import YourRuleClass(這樣通過CrawlerRuleBase.__subclasses__()就可以獲取全部的規(guī)則類了),重啟正在運行的proxy pool即可應用新的規(guī)則。

2. 檢驗部分

免費的代理雖然多,但是可用的卻不多,所以爬取到代理后需要對其進行檢驗,有效的代理才能放入代理池中,而代理也是有時效性的,還要定期對池中的代理進行檢驗,及時移除失效的代理。

這部分就很簡單了,使用aiohttp通過代理來訪問某個網站,若超時,則說明代理無效。

async def validate(self, proxies):
 logger.debug('validator started')
 while 1:
 proxy = await proxies.get()
 async with aiohttp.ClientSession() as session:
  try:
  real_proxy = 'http://' + proxy
  async with session.get(self.validate_url, proxy=real_proxy, timeout=validate_timeout) as resp:
   self._conn.put(proxy)
  except Exception as e:
  logger.error(e)
 proxies.task_done()

3. server部分

使用aiohttp實現(xiàn)了一個web server,啟動后,訪問http://host:port即可顯示主頁:

因為主頁是一個靜態(tài)的html頁面,為避免每來一個訪問主頁的請求都要打開、讀取以及關閉該html文件的開銷,將其緩存到了redis中,通過html文件的修改時間來判斷其是否被修改過,如果修改時間與redis緩存的修改時間不同,則認為html文件被修改了,則重新讀取文件,并更新緩存,否則從redis中獲取主頁的內容。

返回代理是通過aiohttp.web.Response(text=ip.decode('utf-8'))實現(xiàn)的,text要求str類型,而從redis中獲取到的是bytes類型,需要進行轉換。返回的多個代理,使用eval即可轉換為list類型。

返回主頁則不同,是通過aiohttp.web.Response(body=main_page_cache, content_type='text/html') ,這里body要求的是bytes類型,直接將從redis獲取的緩存返回即可,conten_type='text/html'必不可少,否則無法通過瀏覽器加載主頁,而是會將主頁下載下來——在運行官方文檔中的示例代碼的時候也要注意這點,那些示例代碼基本上都沒有設置content_type。

這部分不復雜,注意上面提到的幾點,而關于主頁使用的靜態(tài)資源文件的路徑,可以參考之前的博客《aiohttp之添加靜態(tài)資源路徑》。

4. 運行

將整個代理池的功能分成了3個獨立的部分:

proxypool

定期檢查代理池容量,若低于下限則啟動代理爬蟲并對代理檢驗,通過檢驗的爬蟲放入代理池,達到規(guī)定的數量則停止爬蟲。

proxyvalidator

用于定期檢驗代理池中的代理,移除失效代理。

proxyserver

啟動server。

這3個獨立的任務通過3個進程來運行,在Linux下可以使用supervisod來=管理這些進程,下面是supervisord的配置文件示例:

; supervisord.conf
[unix_http_server]
file=/tmp/supervisor.sock 

[inet_http_server]  
port=127.0.0.1:9001 

[supervisord]
logfile=/tmp/supervisord.log 
logfile_maxbytes=5MB 
logfile_backups=10  
loglevel=debug  
pidfile=/tmp/supervisord.pid 
nodaemon=false  
minfds=1024   
minprocs=200   

[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface

[supervisorctl]
serverurl=unix:///tmp/supervisor.sock

[program:proxyPool]
command=python /path/to/ProxyPool/run_proxypool.py  
redirect_stderr=true
stdout_logfile=NONE

[program:proxyValidator]
command=python /path/to/ProxyPool/run_proxyvalidator.py
redirect_stderr=true  
stdout_logfile=NONE

[program:proxyServer]
command=python /path/to/ProxyPool/run_proxyserver.py
autostart=false
redirect_stderr=true  
stdout_logfile=NONE

因為項目自身已經配置了日志,所以這里就不需要再用supervisord捕獲stdout和stderr了。通過supervisord -c supervisord.conf啟動supervisord,proxyPool和proxyServer則會隨之自動啟動,proxyServer需要手動啟動,訪問http://127.0.0.1:9001即可通過網頁來管理這3個進程了:

supervisod的官方文檔說目前(版本3.3.1)不支持python3,但是我在使用過程中沒有發(fā)現(xiàn)什么問題,可能也是由于我并沒有使用supervisord的復雜功能,只是把它當作了一個簡單的進程狀態(tài)監(jiān)控和啟停工具了。

以上就是本文的全部內容,希望本文的內容對大家的學習或者工作能帶來一定的幫助,同時也希望多多支持腳本之家!

相關文章

最新評論