Scrapy的Pipeline之處理CPU密集型或阻塞型操作詳解
Pipeline處理CPU密集型或阻塞型操作
Twisted框架的reactor適合于處理短的、非阻塞的操作。但是如果要處理一些復雜的、或者包含阻塞的操作又該怎么辦呢?Twisted提供了線程池來在其他的線程而不是主線程(Twisted的reactor線程)中執(zhí)行慢的操作——使用reactor.callInThread() API。這就意味著reactor在執(zhí)行計算時還能保持運行并對事件做出反應。一定要記住線程池中的處理不是線程安全的。這就意味著當你使用了全局的狀態(tài)之后,還要面臨所有那些傳統(tǒng)的多線程編程的同步問題。下面是一個簡單的例子:
class UsingBlocking(object): @defer.inlineCallbacks def process_item(self, item, spider): price = item["price"][0] out = defer.Deferred() reactor.callInThread(self._do_calculation, price, out) item["price"][0] = yield out defer.returnValue(item) def _do_calculation(self, price, out): new_price = price + 1 time.sleep(0.10) reactor.callFromThread(out.callback, new_price)
在上面的Pipeline中,對于每個Item,我們提取出它的price字段,想要在_do_caculation()方法中對它進行處理。這個方法使用了time.sleep(),一個阻塞的操作。我們調用reactor.callInThread()方法使它運行在另一個線程中,該方法的第一個參數(shù)是想要調用的函數(shù),后面的參數(shù)則會全部傳遞給被調用的函數(shù)作為參數(shù)。在這里我們給被調用的函數(shù)傳遞了price,還有一個創(chuàng)建的Deferred對象out。當_do_caculation()函數(shù)完成計算后,我們會使用out的回調函數(shù)來返回這個值。接下來,yield這個 Deferred對象并為price設置一個新的值,最后返回Item。
在_do_caculation()函數(shù)中我們把price加一,然后休眠了100ms。其實這個時間是很長的,如果在reactor的線程中調用這個函數(shù),那就意味著我們每秒只能處理不超過10個頁面。不過如果把它放在另一個線程中來調用就不會出現(xiàn)這種問題了。這些計算任務會在線程池中排隊,等待某個線程處于可用狀態(tài),然后這個線程就會執(zhí)行這個任務,休眠100ms。最后一步是激活out的回調函數(shù)。通常情況下,我們可以這樣來激活:out.callback(new_price),但是既然現(xiàn)在我們處于另外一個線程中,這樣做就不安全了。如果我們執(zhí)意這樣做了,這個Deferred對象的代碼,也就是Scrapy的功能就會在別的線程中執(zhí)行,這樣會導致數(shù)據(jù)被損壞。所以我們調用了reactor.callFromThread()函數(shù),同樣的,它也是以一個函數(shù)作為參數(shù),并把額外的參數(shù)直接傳遞給被調用的函數(shù)。這個函數(shù)會在主線程中排隊并等待被調用,它反過來解鎖了process_item()方法中的yield語句,并恢復Scrapy對這個Item的操作。
如果我們的pipeline中含有全局狀態(tài)會怎么樣呢?比如,計數(shù)器或者平均值等,我們需要在_do_caculation()函數(shù)中使用的。例如有以下兩個變量,beta和delta:
class UsingBlocking(object): def __init__(self): self.beta, self.delta = 0, 0 ... def _do_calculation(self, price, out): self.beta += 1 time.sleep(0.001) self.delta += 1 new_price = price + self.beta - self.delta + 1 assert abs(new_price-price-1) < 0.01 time.sleep(0.10)...
上面的代碼有一些問題,并且在運行的時候會給出assertion錯誤。這是因為,如果一個線程在self.beta += 1和self.delta += 1語句之間切換的話,另一個線程就會恢復執(zhí)行并使用beta和delta的值來計算price,這里線程會發(fā)現(xiàn)這兩個值處于不一致的狀態(tài)(beta比delta大),這樣,錯誤的產生了。中間短的sleep會讓線程切換更可能發(fā)生,不過即使沒有它,同樣也會出現(xiàn)競態(tài)條件。為了阻止競態(tài)條件的發(fā)生,我們必須使用鎖,例如Python的threading.RLock()鎖。使用了這個遞歸鎖,就能確保兩個線程不會同時執(zhí)行鎖保護的臨界區(qū)的代碼:
class UsingBlocking(object): def __init__(self): ... self.lock = threading.RLock() ... def _do_calculation(self, price, out): with self.lock: self.beta += 1 ... new_price = price + self.beta - self.delta + 1 assert abs(new_price-price-1) < 0.01 ...
現(xiàn)在的代碼就沒問題了,要注意的是,我們不需要保護整個代碼,只需要能夠覆蓋全局狀態(tài)的使用就行了。
在ITEM_PIPELINES中加上:
ITEM_PIPELINES = { ... 'properties.pipelines.computation.UsingBlocking': 500, }
運行一下會發(fā)現(xiàn),時延由于100ms的休眠的緣故變調了,不過吞吐量還是保持不變,大約每秒25個。
到此這篇關于Scrapy的Pipeline之處理CPU密集型或阻塞型操作詳解的文章就介紹到這了,更多相關Pipeline處理CPU密集型或阻塞型操作內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
windows系統(tǒng)上通過whl文件安裝triton模塊的簡單步驟
這篇文章主要介紹了在Windows系統(tǒng)中通過.whl文件安裝Triton的步驟,包括確認系統(tǒng)環(huán)境、下載合適的.whl文件、使用pip安裝、驗證安裝、使用Triton以及解決潛在問題,需要的朋友可以參考下2025-01-01python pyautogui手動活動(模擬鼠標鍵盤)自動化庫使用
這篇文章主要為大家介紹了python pyautogui手動活動(模擬鼠標鍵盤)自動化庫使用示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2024-01-01淺談Python 字符串格式化輸出(format/printf)
下面小編就為大家?guī)硪黄獪\談Python 字符串格式化輸出(format/printf)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-07-07