以Python的Pyspider為例剖析搜索引擎的網(wǎng)絡(luò)爬蟲實(shí)現(xiàn)方法
在這篇文章中,我們將分析一個網(wǎng)絡(luò)爬蟲。
網(wǎng)絡(luò)爬蟲是一個掃描網(wǎng)絡(luò)內(nèi)容并記錄其有用信息的工具。它能打開一大堆網(wǎng)頁,分析每個頁面的內(nèi)容以便尋找所有感興趣的數(shù)據(jù),并將這些數(shù)據(jù)存儲在一個數(shù)據(jù)庫中,然后對其他網(wǎng)頁進(jìn)行同樣的操作。
如果爬蟲正在分析的網(wǎng)頁中有一些鏈接,那么爬蟲將會根據(jù)這些鏈接分析更多的頁面。
搜索引擎就是基于這樣的原理實(shí)現(xiàn)的。
這篇文章中,我特別選了一個穩(wěn)定的、”年輕”的開源項(xiàng)目pyspider,它是由 binux 編碼實(shí)現(xiàn)的。
注:據(jù)認(rèn)為pyspider持續(xù)監(jiān)控網(wǎng)絡(luò),它假定網(wǎng)頁在一段時間后會發(fā)生變化,因此一段時間后它將會重新訪問相同的網(wǎng)頁。
概述
爬蟲pyspider主要由四個組件組成。包括調(diào)度程序(scheduler),抓取程序(fetcher),內(nèi)容處理程序(processor)以及一個監(jiān)控組件。
調(diào)度程序接受任務(wù)并決定該做什么。這里有幾種可能性,它可以丟棄一個任務(wù)(可能這個特定的網(wǎng)頁剛剛被抓取過了),或者給任務(wù)分配不同的優(yōu)先級。
當(dāng)各個任務(wù)的優(yōu)先級確定之后,它們被傳入抓取程序。它重新抓取網(wǎng)頁。這個過程很復(fù)雜,但邏輯上比較簡單。
當(dāng)網(wǎng)絡(luò)上的資源被抓取下來,內(nèi)容處理程序就負(fù)責(zé)抽取有用的信息。它運(yùn)行一個用戶編寫的Python腳本,這個腳本并不像沙盒一樣被隔離。它的職責(zé)還包括捕獲異?;蛉罩?,并適當(dāng)?shù)毓芾硭鼈儭?/p>
最后,爬蟲pyspider中有一個監(jiān)控組件。
爬蟲pyspider提供一個異常強(qiáng)大的網(wǎng)頁界面(web ui),它允許你編輯和調(diào)試你的腳本,管理整個抓取過程,監(jiān)控正在進(jìn)行的任務(wù),并最終輸出結(jié)果。
項(xiàng)目和任務(wù)
在pyspider中,我們有項(xiàng)目和任務(wù)的概念。
一個任務(wù)指的是一個需要從網(wǎng)站檢索并進(jìn)行分析的單獨(dú)頁面。
一個項(xiàng)目指的是一個更大的實(shí)體,它包括爬蟲涉及到的所有頁面,分析網(wǎng)頁所需要的python腳本,以及用于存儲數(shù)據(jù)的數(shù)據(jù)庫等等。
在pyspider中我們可以同時運(yùn)行多個項(xiàng)目。
代碼結(jié)構(gòu)分析
根目錄
在根目錄中可以找到的文件夾有:
- data,空文件夾,它是存放由爬蟲所生成的數(shù)據(jù)的地方。
- docs,包含該項(xiàng)目文檔,里邊有一些markdown代碼。
- pyspider,包含項(xiàng)目實(shí)際的代碼。
- test,包含相當(dāng)多的測試代碼。
- 這里我將重點(diǎn)介紹一些重要的文件:
- .travis.yml,一個很棒的、連續(xù)性測試的整合。你如何確定你的項(xiàng)目確實(shí)有效?畢竟僅在你自己的帶有固定版本的庫的機(jī)器上進(jìn)行測試是不夠的。
- Dockerfile,同樣很棒的工具!如果我想在我的機(jī)器上嘗試一個項(xiàng)目,我只需要運(yùn)行Docker,我不需要手動安裝任何東西,這是一個使開發(fā)者參與到你的項(xiàng)目中的很好的方式。
- LICENSE,對于任何開源項(xiàng)目都是必需的,(如果你自己有開源項(xiàng)目的話)不要忘記自己項(xiàng)目中的該文件。
- requirements.txt,在Python世界中,該文件用于指明為了運(yùn)行該軟件,需要在你的系統(tǒng)中安裝什么Python包,在任何的Python項(xiàng)目中該文件都是必須的。
- run.py,該軟件的主入口點(diǎn)。
- setup.py,該文件是一個Python腳本,用于在你的系統(tǒng)中安裝pyspider項(xiàng)目。
已經(jīng)分析完項(xiàng)目的根目錄了,僅根目錄就能說明該項(xiàng)目是以一種非常專業(yè)的方式進(jìn)行開發(fā)的。如果你正在開發(fā)任何的開源程序,希望你能達(dá)到這樣的水準(zhǔn)。
文件夾pyspider
讓我們更深入一點(diǎn)兒,一起來分析實(shí)際的代碼。
在這個文件夾中還能找到其他的文件夾,整個軟件背后的邏輯已經(jīng)被分割,以便更容易的進(jìn)行管理和擴(kuò)展。
這些文件夾是:database、fetcher、libs、processor、result、scheduler、webui。
在這個文件夾中我們也能找到整個項(xiàng)目的主入口點(diǎn),run.py。
文件run.py
這個文件首先完成所有必需的雜事,以保證爬蟲成功地運(yùn)行。最終它產(chǎn)生所有必需的計算單元。向下滾動我們可以看到整個項(xiàng)目的入口點(diǎn),cli()。
函數(shù)cli()
這個函數(shù)好像很復(fù)雜,但與我相隨,你會發(fā)現(xiàn)它并沒有你想象中復(fù)雜。函數(shù)cli()的主要目的是創(chuàng)建數(shù)據(jù)庫和消息系統(tǒng)的所有連接。它主要解析命令行參數(shù),并利用所有我們需要的東西創(chuàng)建一個大字典。最后,我們通過調(diào)用函數(shù)all()開始真正的工作。
函數(shù)all()
一個網(wǎng)絡(luò)爬蟲會進(jìn)行大量的IO操作,因此一個好的想法是產(chǎn)生不同的線程或子進(jìn)程來管理所有的這些工作。通過這種方式,你可以在等待網(wǎng)絡(luò)獲取你當(dāng)前html頁面的同時,提取前一個頁面的有用信息。
函數(shù)all()決定是否運(yùn)行子進(jìn)程或者線程,然后調(diào)用不同的線程或子進(jìn)程里的所有的必要函數(shù)。這時pyspider將產(chǎn)生包括webui在內(nèi)的,爬蟲的所有邏輯模塊所需要的,足夠數(shù)量的線程。當(dāng)我們完成項(xiàng)目并關(guān)閉webui時,我們將干凈漂亮地關(guān)閉每一個進(jìn)程。
現(xiàn)在我們的爬蟲就開始運(yùn)行了,讓我們進(jìn)行更深入一點(diǎn)兒的探索。
調(diào)度程序
調(diào)度程序從兩個不同的隊(duì)列中獲取任務(wù)(newtask_queue和status_queue),并把任務(wù)加入到另外一個隊(duì)列(out_queue),這個隊(duì)列稍后會被抓取程序讀取。
調(diào)度程序做的第一件事情是從數(shù)據(jù)庫中加載所需要完成的所有的任務(wù)。之后,它開始一個無限循環(huán)。在這個循環(huán)中會調(diào)用幾個方法:
1._update_projects():嘗試更新的各種設(shè)置,例如,我們想在爬蟲工作的時候調(diào)整爬取速度。
2._check_task_done():分析已完成的任務(wù)并將其保存到數(shù)據(jù)庫,它從status_queue中獲取任務(wù)。
3._check_request():如果內(nèi)容處理程序要求分析更多的頁面,把這些頁面放在隊(duì)列newtask_queue中,該函數(shù)會從該隊(duì)列中獲得新的任務(wù)。
4._check_select():把新的網(wǎng)頁加入到抓取程序的隊(duì)列中。
5._check_delete():刪除已被用戶標(biāo)記的任務(wù)和項(xiàng)目。
6._try_dump_cnt():記錄一個文件中已完成任務(wù)的數(shù)量。對于防止程序異常所導(dǎo)致的數(shù)據(jù)丟失,這是有必要的。
def run(self): while not self._quit: try: time.sleep(self.LOOP_INTERVAL) self._update_projects() self._check_task_done() self._check_request() while self._check_cronjob(): pass self._check_select() self._check_delete() self._try_dump_cnt() self._exceptions = 0 except KeyboardInterrupt: break except Exception as e: logger.exception(e) self._exceptions += 1 if self._exceptions > self.EXCEPTION_LIMIT: break continue
循環(huán)也會檢查運(yùn)行過程中的異常,或者我們是否要求python停止處理。
finally: # exit components run in subprocess for each in threads: if not each.is_alive(): continue if hasattr(each, 'terminate'): each.terminate() each.join()
抓取程序
抓取程序的目的是檢索網(wǎng)絡(luò)資源。
pyspider能夠處理普通HTML文本頁面和基于AJAX的頁面。只有抓取程序能意識到這種差異,了解這一點(diǎn)非常重要。我們將僅專注于普通的html文本抓取,然而大部分的想法可以很容易地移植到Ajax抓取器。
這里的想法在某種形式上類似于調(diào)度程序,我們有分別用于輸入和輸出的兩個隊(duì)列,以及一個大的循環(huán)。對于輸入隊(duì)列中的所有元素,抓取程序生成一個請求,并將結(jié)果放入輸出隊(duì)列中。
它聽起來簡單但有一個大問題。網(wǎng)絡(luò)通常是極其緩慢的,如果因?yàn)榈却粋€網(wǎng)頁而阻止了所有的計算,那么整個過程將會運(yùn)行的極其緩慢。解決方法非常的簡單,即不要在等待網(wǎng)絡(luò)的時候阻塞所有的計算。這個想法即在網(wǎng)絡(luò)上發(fā)送大量消息,并且相當(dāng)一部分消息是同時發(fā)送的,然后異步等待響應(yīng)的返回。一旦我們收回一個響應(yīng),我們將會調(diào)用另外的回調(diào)函數(shù),回調(diào)函數(shù)將會以最適合的方式管理這樣的響應(yīng)。
爬蟲pyspider中的所有的復(fù)雜的異步調(diào)度都是由另一個優(yōu)秀的開源項(xiàng)目
http://www.tornadoweb.org/en/stable/
完成。
現(xiàn)在我們的腦海里已經(jīng)有了極好的想法了,讓我們更深入地探索這是如何實(shí)現(xiàn)的。
def run(self): def queue_loop(): if not self.outqueue or not self.inqueue: return while not self._quit: try: if self.outqueue.full(): break task = self.inqueue.get_nowait() task = utils.decode_unicode_obj(task) self.fetch(task) except queue.Empty: break tornado.ioloop.PeriodicCallback(queue_loop, 100, io_loop=self.ioloop).start() self._running = True self.ioloop.start() <strong>
函數(shù)run()</strong>
函數(shù)run()是抓取程序fetcher中的一個大的循環(huán)程序。
函數(shù)run()中定義了另外一個函數(shù)queue_loop(),該函數(shù)接收輸入隊(duì)列中的所有任務(wù),并抓取它們。同時該函數(shù)也監(jiān)聽中斷信號。函數(shù)queue_loop()作為參數(shù)傳遞給tornado的類PeriodicCallback,如你所猜,PeriodicCallback會每隔一段具體的時間調(diào)用一次queue_loop()函數(shù)。函數(shù)queue_loop()也會調(diào)用另一個能使我們更接近于實(shí)際檢索Web資源操作的函數(shù):fetch()。
函數(shù)fetch(self, task, callback=None)
網(wǎng)絡(luò)上的資源必須使用函數(shù)phantomjs_fetch()或簡單的http_fetch()函數(shù)檢索,函數(shù)fetch()只決定檢索該資源的正確方法是什么。接下來我們看一下函數(shù)http_fetch()。
函數(shù)http_fetch(self, url, task, callback)
def http_fetch(self, url, task, callback): '''HTTP fetcher''' fetch = copy.deepcopy(self.default_options) fetch['url'] = url fetch['headers']['User-Agent'] = self.user_agent def handle_response(response): ... return task, result try: request = tornado.httpclient.HTTPRequest(header_callback=header_callback, **fetch) if self.async: self.http_client.fetch(request, handle_response) else: return handle_response(self.http_client.fetch(request))
終于,這里才是完成真正工作的地方。這個函數(shù)的代碼有點(diǎn)長,但有清晰的結(jié)構(gòu),容易閱讀。
在函數(shù)的開始部分,它設(shè)置了抓取請求的header,比如User-Agent、超時timeout等等。然后定義一個處理響應(yīng)response的函數(shù):handle_response(),后邊我們會分析這個函數(shù)。最后我們得到一個tornado的請求對象request,并發(fā)送這個請求對象。請注意在異步和非異步的情況下,是如何使用相同的函數(shù)來處理響應(yīng)response的。
讓我們往回看一下,分析一下函數(shù)handle_response()做了什么。
函數(shù)handle_response(response)
def handle_response(response): result = {} result['orig_url'] = url result['content'] = response.body or '' callback('http', task, result) return task, result
這個函數(shù)以字典的形式保存一個response的所有相關(guān)信息,例如url,狀態(tài)碼和實(shí)際響應(yīng)等,然后調(diào)用回調(diào)函數(shù)。這里的回調(diào)函數(shù)是一個小方法:send_result()。
函數(shù)send_result(self, type, task, result)
def send_result(self, type, task, result): if self.outqueue: self.outqueue.put((task, result))
這個最后的函數(shù)將結(jié)果放入到輸出隊(duì)列中,等待內(nèi)容處理程序processor的讀取。
內(nèi)容處理程序processor
內(nèi)容處理程序的目的是分析已經(jīng)抓取回來的頁面。它的過程同樣也是一個大循環(huán),但輸出中有三個隊(duì)列(status_queue, newtask_queue 以及result_queue)而輸入中只有一個隊(duì)列(inqueue)。
讓我們稍微深入地分析一下函數(shù)run()中的循環(huán)過程。
函數(shù)run(self)
def run(self): try: task, response = self.inqueue.get(timeout=1) self.on_task(task, response) self._exceptions = 0 except KeyboardInterrupt: break except Exception as e: self._exceptions += 1 if self._exceptions > self.EXCEPTION_LIMIT: break continue
這個函數(shù)的代碼比較少,易于理解,它簡單地從隊(duì)列中得到需要被分析的下一個任務(wù),并利用on_task(task, response)函數(shù)對其進(jìn)行分析。這個循環(huán)監(jiān)聽中斷信號,只要我們給Python發(fā)送這樣的信號,這個循環(huán)就會終止。最后這個循環(huán)統(tǒng)計它引發(fā)的異常的數(shù)量,異常數(shù)量過多會終止這個循環(huán)。
函數(shù)on_task(self, task, response)
def on_task(self, task, response): response = rebuild_response(response) project = task['project'] project_data = self.project_manager.get(project, updatetime) ret = project_data['instance'].run( status_pack = { 'taskid': task['taskid'], 'project': task['project'], 'url': task.get('url'), ... } self.status_queue.put(utils.unicode_obj(status_pack)) if ret.follows: self.newtask_queue.put( [utils.unicode_obj(newtask) for newtask in ret.follows]) for project, msg, url in ret.messages: self.inqueue.put(({...},{...})) return True
函數(shù)on_task()是真正干活的方法。
它嘗試?yán)幂斎氲娜蝿?wù)找到任務(wù)所屬的項(xiàng)目。然后它運(yùn)行項(xiàng)目中的定制腳本。最后它分析定制腳本返回的響應(yīng)response。如果一切順利,將會創(chuàng)建一個包含所有我們從網(wǎng)頁上得到的信息的字典。最后將字典放到隊(duì)列status_queue中,稍后它會被調(diào)度程序重新使用。
如果在分析的頁面中有一些新的鏈接需要處理,新鏈接會被放入到隊(duì)列newtask_queue中,并在稍后被調(diào)度程序使用。
現(xiàn)在,如果有需要的話,pyspider會將結(jié)果發(fā)送給其他項(xiàng)目。
最后如果發(fā)生了一些錯誤,像頁面返回錯誤,錯誤信息會被添加到日志中。
結(jié)束!
相關(guān)文章
python3實(shí)現(xiàn)從kafka獲取數(shù)據(jù),并解析為json格式,寫入到mysql中
今天小編就為大家分享一篇python3實(shí)現(xiàn)從kafka獲取數(shù)據(jù),并解析為json格式,寫入到mysql中,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-12-12探索Python?Slice函數(shù)靈活而強(qiáng)大的序列切片技術(shù)
Python中的Slice函數(shù)是一種強(qiáng)大且靈活的序列切片技術(shù),用于從字符串、列表、元組等序列類型中提取子集,本文將深入研究Slice函數(shù)的功能和用法,提供詳細(xì)的示例代碼和解釋,幫助讀者更全面地了解和應(yīng)用這一功能2024-01-01Python模塊Typing.overload的使用場景分析
在 Python 中,typing.overload 是一個用于定義函數(shù)重載的裝飾器,函數(shù)重載是指在一個類中可以定義多個相同名字但參數(shù)不同的函數(shù),使得在調(diào)用函數(shù)時可以根據(jù)參數(shù)的不同選擇不同的函數(shù)執(zhí)行,這篇文章主要介紹了Python模塊Typing.overload的使用,需要的朋友可以參考下2024-02-02Python中列表,元組,字典和集合的區(qū)別及它們之間的轉(zhuǎn)換
這篇文章主要介紹了Python中列表,元組,字典和集合的區(qū)別及它們之間的轉(zhuǎn)換方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-05-05pandas實(shí)現(xiàn)將日期轉(zhuǎn)換成timestamp
今天小編就為大家分享一篇pandas實(shí)現(xiàn)將日期轉(zhuǎn)換成timestamp,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-12-12詳解Python中的format格式化函數(shù)的使用方法
這篇文章主要介紹了詳解Python中的format格式化函數(shù)的使用方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-11-11