Python實(shí)現(xiàn)多線程抓取網(wǎng)頁(yè)功能實(shí)例詳解
本文實(shí)例講述了Python實(shí)現(xiàn)多線程抓取網(wǎng)頁(yè)功能。分享給大家供大家參考,具體如下:
最近,一直在做網(wǎng)絡(luò)爬蟲(chóng)相關(guān)的東西。 看了一下開(kāi)源C++寫(xiě)的larbin爬蟲(chóng),仔細(xì)閱讀了里面的設(shè)計(jì)思想和一些關(guān)鍵技術(shù)的實(shí)現(xiàn)。
1、larbin的URL去重用的很高效的bloom filter算法;
2、DNS處理,使用的adns異步的開(kāi)源組件;
3、對(duì)于url隊(duì)列的處理,則是用部分緩存到內(nèi)存,部分寫(xiě)入文件的策略。
4、larbin對(duì)文件的相關(guān)操作做了很多工作
5、在larbin里有連接池,通過(guò)創(chuàng)建套接字,向目標(biāo)站點(diǎn)發(fā)送HTTP協(xié)議中GET方法,獲取內(nèi)容,再解析header之類(lèi)的東西
6、大量描述字,通過(guò)poll方法進(jìn)行I/O復(fù)用,很高效
7、larbin可配置性很強(qiáng)
8、作者所使用的大量數(shù)據(jù)結(jié)構(gòu)都是自己從最底層寫(xiě)起的,基本沒(méi)用STL之類(lèi)的東西
......
還有很多,以后有時(shí)間在好好寫(xiě)篇文章,總結(jié)下。
這兩天,用python寫(xiě)了個(gè)多線程下載頁(yè)面的程序,對(duì)于I/O密集的應(yīng)用而言,多線程顯然是個(gè)很好的解決方案。剛剛寫(xiě)過(guò)的線程池,也正好可以利用上了。其實(shí)用python爬取頁(yè)面非常簡(jiǎn)單,有個(gè)urllib2的模塊,使用起來(lái)很方便,基本兩三行代碼就可以搞定。雖然使用第三方模塊,可以很方便的解決問(wèn)題,但是對(duì)個(gè)人的技術(shù)積累而言沒(méi)有什么好處,因?yàn)殛P(guān)鍵的算法都是別人實(shí)現(xiàn)的,而不是你自己實(shí)現(xiàn)的,很多細(xì)節(jié)的東西,你根本就無(wú)法了解。 我們做技術(shù)的,不能一味的只是用別人寫(xiě)好的模塊或是api,要自己動(dòng)手實(shí)現(xiàn),才能讓自己學(xué)習(xí)得更多。
我決定從socket寫(xiě)起,也是去封裝GET協(xié)議,解析header,而且還可以把DNS的解析過(guò)程單獨(dú)處理,例如DNS緩存一下,所以這樣自己寫(xiě)的話,可控性更強(qiáng),更有利于擴(kuò)展。對(duì)于timeout的處理,我用的全局的5秒鐘的超時(shí)處理,對(duì)于重定位(301or302)的處理是,最多重定位3次,因?yàn)橹皽y(cè)試過(guò)程中,發(fā)現(xiàn)很多站點(diǎn)的重定位又定位到自己,這樣就無(wú)限循環(huán)了,所以設(shè)置了上限。具體原理,比較簡(jiǎn)單,直接看代碼就好了。
自己寫(xiě)完之后,與urllib2進(jìn)行了下性能對(duì)比,自己寫(xiě)的效率還是比較高的,而且urllib2的錯(cuò)誤率稍高一些,不知道為什么。網(wǎng)上有人說(shuō)urllib2在多線程背景下有些小問(wèn)題,具體我也不是特別清楚。
先貼代碼:
fetchPage.py 使用Http協(xié)議的Get方法,進(jìn)行頁(yè)面下載,并存儲(chǔ)為文件
''' Created on 2012-3-13 Get Page using GET method Default using HTTP Protocol , http port 80 @author: xiaojay ''' import socket import statistics import datetime import threading socket.setdefaulttimeout(statistics.timeout) class Error404(Exception): '''Can not find the page.''' pass class ErrorOther(Exception): '''Some other exception''' def __init__(self,code): #print 'Code :',code pass class ErrorTryTooManyTimes(Exception): '''try too many times''' pass def downPage(hostname ,filename , trytimes=0): try : #To avoid too many tries .Try times can not be more than max_try_times if trytimes >= statistics.max_try_times : raise ErrorTryTooManyTimes except ErrorTryTooManyTimes : return statistics.RESULTTRYTOOMANY,hostname+filename try: s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) #DNS cache if statistics.DNSCache.has_key(hostname): addr = statistics.DNSCache[hostname] else: addr = socket.gethostbyname(hostname) statistics.DNSCache[hostname] = addr #connect to http server ,default port 80 s.connect((addr,80)) msg = 'GET '+filename+' HTTP/1.0\r\n' msg += 'Host: '+hostname+'\r\n' msg += 'User-Agent:xiaojay\r\n\r\n' code = '' f = None s.sendall(msg) first = True while True: msg = s.recv(40960) if not len(msg): if f!=None: f.flush() f.close() break # Head information must be in the first recv buffer if first: first = False headpos = msg.index("\r\n\r\n") code,other = dealwithHead(msg[:headpos]) if code=='200': #statistics.fetched_url += 1 f = open('pages/'+str(abs(hash(hostname+filename))),'w') f.writelines(msg[headpos+4:]) elif code=='301' or code=='302': #if code is 301 or 302 , try down again using redirect location if other.startswith("http") : hname, fname = parse(other) downPage(hname,fname,trytimes+1)#try again else : downPage(hostname,other,trytimes+1) elif code=='404': raise Error404 else : raise ErrorOther(code) else: if f!=None :f.writelines(msg) s.shutdown(socket.SHUT_RDWR) s.close() return statistics.RESULTFETCHED,hostname+filename except Error404 : return statistics.RESULTCANNOTFIND,hostname+filename except ErrorOther: return statistics.RESULTOTHER,hostname+filename except socket.timeout: return statistics.RESULTTIMEOUT,hostname+filename except Exception, e: return statistics.RESULTOTHER,hostname+filename def dealwithHead(head): '''deal with HTTP HEAD''' lines = head.splitlines() fstline = lines[0] code =fstline.split()[1] if code == '404' : return (code,None) if code == '200' : return (code,None) if code == '301' or code == '302' : for line in lines[1:]: p = line.index(':') key = line[:p] if key=='Location' : return (code,line[p+2:]) return (code,None) def parse(url): '''Parse a url to hostname+filename''' try: u = url.strip().strip('\n').strip('\r').strip('\t') if u.startswith('http://') : u = u[7:] elif u.startswith('https://'): u = u[8:] if u.find(':80')>0 : p = u.index(':80') p2 = p + 3 else: if u.find('/')>0: p = u.index('/') p2 = p else: p = len(u) p2 = -1 hostname = u[:p] if p2>0 : filename = u[p2:] else : filename = '/' return hostname, filename except Exception ,e: print "Parse wrong : " , url print e def PrintDNSCache(): '''print DNS dict''' n = 1 for hostname in statistics.DNSCache.keys(): print n,'\t',hostname, '\t',statistics.DNSCache[hostname] n+=1 def dealwithResult(res,url): '''Deal with the result of downPage''' statistics.total_url+=1 if res==statistics.RESULTFETCHED : statistics.fetched_url+=1 print statistics.total_url , '\t fetched :', url if res==statistics.RESULTCANNOTFIND : statistics.failed_url+=1 print "Error 404 at : ", url if res==statistics.RESULTOTHER : statistics.other_url +=1 print "Error Undefined at : ", url if res==statistics.RESULTTIMEOUT : statistics.timeout_url +=1 print "Timeout ",url if res==statistics.RESULTTRYTOOMANY: statistics.trytoomany_url+=1 print e ,"Try too many times at", url if __name__=='__main__': print 'Get Page using GET method'
下面,我將利用上一篇的線程池作為輔助,實(shí)現(xiàn)多線程下的并行爬取,并用上面自己寫(xiě)的下載頁(yè)面的方法和urllib2進(jìn)行一下性能對(duì)比。
''' Created on 2012-3-16 @author: xiaojay ''' import fetchPage import threadpool import datetime import statistics import urllib2 '''one thread''' def usingOneThread(limit): urlset = open("input.txt","r") start = datetime.datetime.now() for u in urlset: if limit <= 0 : break limit-=1 hostname , filename = parse(u) res= fetchPage.downPage(hostname,filename,0) fetchPage.dealwithResult(res) end = datetime.datetime.now() print "Start at :\t" , start print "End at :\t" , end print "Total Cost :\t" , end - start print 'Total fetched :', statistics.fetched_url '''threadpoll and GET method''' def callbackfunc(request,result): fetchPage.dealwithResult(result[0],result[1]) def usingThreadpool(limit,num_thread): urlset = open("input.txt","r") start = datetime.datetime.now() main = threadpool.ThreadPool(num_thread) for url in urlset : try : hostname , filename = fetchPage.parse(url) req = threadpool.WorkRequest(fetchPage.downPage,args=[hostname,filename],kwds={},callback=callbackfunc) main.putRequest(req) except Exception: print Exception.message while True: try: main.poll() if statistics.total_url >= limit : break except threadpool.NoResultsPending: print "no pending results" break except Exception ,e: print e end = datetime.datetime.now() print "Start at :\t" , start print "End at :\t" , end print "Total Cost :\t" , end - start print 'Total url :',statistics.total_url print 'Total fetched :', statistics.fetched_url print 'Lost url :', statistics.total_url - statistics.fetched_url print 'Error 404 :' ,statistics.failed_url print 'Error timeout :',statistics.timeout_url print 'Error Try too many times ' ,statistics.trytoomany_url print 'Error Other faults ',statistics.other_url main.stop() '''threadpool and urllib2 ''' def downPageUsingUrlib2(url): try: req = urllib2.Request(url) fd = urllib2.urlopen(req) f = open("pages3/"+str(abs(hash(url))),'w') f.write(fd.read()) f.flush() f.close() return url ,'success' except Exception: return url , None def writeFile(request,result): statistics.total_url += 1 if result[1]!=None : statistics.fetched_url += 1 print statistics.total_url,'\tfetched :', result[0], else: statistics.failed_url += 1 print statistics.total_url,'\tLost :',result[0], def usingThreadpoolUrllib2(limit,num_thread): urlset = open("input.txt","r") start = datetime.datetime.now() main = threadpool.ThreadPool(num_thread) for url in urlset : try : req = threadpool.WorkRequest(downPageUsingUrlib2,args=[url],kwds={},callback=writeFile) main.putRequest(req) except Exception ,e: print e while True: try: main.poll() if statistics.total_url >= limit : break except threadpool.NoResultsPending: print "no pending results" break except Exception ,e: print e end = datetime.datetime.now() print "Start at :\t" , start print "End at :\t" , end print "Total Cost :\t" , end - start print 'Total url :',statistics.total_url print 'Total fetched :', statistics.fetched_url print 'Lost url :', statistics.total_url - statistics.fetched_url main.stop() if __name__ =='__main__': '''too slow''' #usingOneThread(100) '''use Get method''' #usingThreadpool(3000,50) '''use urllib2''' usingThreadpoolUrllib2(3000,50)
實(shí)驗(yàn)分析:
實(shí)驗(yàn)數(shù)據(jù):larbin抓取下來(lái)的3000條url,經(jīng)過(guò)Mercator隊(duì)列模型(我用c++實(shí)現(xiàn)的,以后有機(jī)會(huì)發(fā)個(gè)blog)處理后的url集合,具有隨機(jī)和代表性。使用50個(gè)線程的線程池。
實(shí)驗(yàn)環(huán)境:ubuntu10.04,網(wǎng)絡(luò)較好,python2.6
存儲(chǔ):小文件,每個(gè)頁(yè)面,一個(gè)文件進(jìn)行存儲(chǔ)
PS:由于學(xué)校上網(wǎng)是按流量收費(fèi)的,做網(wǎng)絡(luò)爬蟲(chóng),灰常費(fèi)流量啊?。?!過(guò)幾天,可能會(huì)做個(gè)大規(guī)模url下載的實(shí)驗(yàn),用個(gè)幾十萬(wàn)的url試試。
實(shí)驗(yàn)結(jié)果:
使用urllib2 ,usingThreadpoolUrllib2(3000,50)
Start at : 2012-03-16 22:18:20.956054
End at : 2012-03-16 22:22:15.203018
Total Cost : 0:03:54.246964
Total url : 3001
Total fetched : 2442
Lost url : 559
下載頁(yè)面的物理存儲(chǔ)大小:84088kb
使用自己的getPageUsingGet ,usingThreadpool(3000,50)
Start at : 2012-03-16 22:23:40.206730
End at : 2012-03-16 22:26:26.843563
Total Cost : 0:02:46.636833
Total url : 3002
Total fetched : 2484
Lost url : 518
Error 404 : 94
Error timeout : 312
Error Try too many times 0
Error Other faults 112
下載頁(yè)面的物理存儲(chǔ)大?。?7168kb
小結(jié): 自己寫(xiě)的下載頁(yè)面程序,效率還是很不錯(cuò)的,而且丟失的頁(yè)面也較少。但其實(shí)自己考慮一下,還是有很多地方可以?xún)?yōu)化的,比如文件過(guò)于分散,過(guò)多的小文件創(chuàng)建和釋放定會(huì)產(chǎn)生不小的性能開(kāi)銷(xiāo),而且程序里用的是hash命名,也會(huì)產(chǎn)生很多的計(jì)算,如果有好的策略,其實(shí)這些開(kāi)銷(xiāo)都是可以省略的。另外DNS,也可以不使用python自帶的DNS解析,因?yàn)槟J(rèn)的DNS解析都是同步的操作,而DNS解析一般比較耗時(shí),可以采取多線程的異步的方式進(jìn)行,再加以適當(dāng)?shù)腄NS緩存很大程度上可以提高效率。不僅如此,在實(shí)際的頁(yè)面抓取過(guò)程中,會(huì)有大量的url ,不可能一次性把它們存入內(nèi)存,而應(yīng)該按照一定的策略或是算法進(jìn)行合理的分配。 總之,采集頁(yè)面要做的東西以及可以?xún)?yōu)化的東西,還有很多很多。
附:demo源碼點(diǎn)擊此處本站下載。
更多關(guān)于Python相關(guān)內(nèi)容感興趣的讀者可查看本站專(zhuān)題:《Python進(jìn)程與線程操作技巧總結(jié)》、《Python Socket編程技巧總結(jié)》、《Python數(shù)據(jù)結(jié)構(gòu)與算法教程》、《Python函數(shù)使用技巧總結(jié)》、《Python字符串操作技巧匯總》、《Python入門(mén)與進(jìn)階經(jīng)典教程》及《Python文件與目錄操作技巧匯總》
希望本文所述對(duì)大家Python程序設(shè)計(jì)有所幫助。
相關(guān)文章
python監(jiān)控網(wǎng)卡流量并使用graphite繪圖的示例
這篇文章主要介紹了python監(jiān)控網(wǎng)卡流量并使用graphite繪圖的示例,需要的朋友可以參考下2014-04-04Windows11使用Cpython?編譯文件報(bào)錯(cuò)?error:?Unable?to?find?vcvars
這篇文章主要介紹了Windows11使用Cpython編譯文件報(bào)錯(cuò)error:Unable?to find?vcvarsall.bat完美解決方法,本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-05-05python2.7實(shí)現(xiàn)復(fù)制大量文件及文件夾資料
這篇文章主要為大家詳細(xì)介紹了python2.7實(shí)現(xiàn)復(fù)制大量文件及文件夾資料,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-08-08PyTorch中torch.matmul()函數(shù)常見(jiàn)用法總結(jié)
torch.matmul()也是一種類(lèi)似于矩陣相乘操作的tensor連乘操作。但是它可以利用python中的廣播機(jī)制,處理一些維度不同的tensor結(jié)構(gòu)進(jìn)行相乘操作,這篇文章主要介紹了PyTorch中torch.matmul()函數(shù)用法總結(jié),需要的朋友可以參考下2023-04-04基于Python實(shí)現(xiàn)貪吃蛇小游戲(附源碼)
本次我們將編寫(xiě)一個(gè)貪吃蛇的游戲。通過(guò)鍵盤(pán)上、下、左、右控制小蛇上、下、左、右移動(dòng),吃到食物后長(zhǎng)度加1;蛇頭碰到自身或窗口邊緣,游戲失敗,需要的可以參考一下2022-11-11pygame用blit()實(shí)現(xiàn)動(dòng)畫(huà)效果的示例代碼
這篇文章主要介紹了pygame用blit()實(shí)現(xiàn)動(dòng)畫(huà)效果的示例代碼,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-05-05Python編寫(xiě)一個(gè)優(yōu)美的下載器
這篇文章主要教大家如何使用Python編寫(xiě)一個(gè)優(yōu)美的下載器,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-04-04