基于asyncio 異步協(xié)程框架實現(xiàn)收集B站直播彈幕
前言
雖然標題是全站,但目前只做了等級 top 100 直播間的全天彈幕收集。
彈幕收集系統(tǒng)基于之前的B 站直播彈幕姬 Python 版修改而來。具體協(xié)議分析可以看上一篇文章。
直播彈幕協(xié)議是直接基于 TCP 協(xié)議,所以如果 B 站對類似我這種行為做反制措施,比較困難。應該有我不知道的技術手段來檢測類似我這種惡意行為。
我試過同時連接 100 個房間,和連接單個房間 100 次的實驗,都沒有問題。>150 會被關閉鏈接。
直播間的選取
現(xiàn)在彈幕收集系統(tǒng)在選取直播間上比較簡單,直接選取了等級 top100。
以后會修改這部分,改成定時去 http://live.bilibili.com/all 查看新開播的直播間,并動態(tài)添加任務。
異步任務和彈幕存儲
收集系統(tǒng)仍舊使用了 asyncio 異步協(xié)程框架,對于每一個直播間都使用如下方法來加進 loop 中。
danmuji = bilibiliClient(url, self.lock, self.commentq, self.numq) task1 = asyncio.ensure_future(danmuji.connectServer()) task2 = asyncio.ensure_future(danmuji.HeartbeatLoop())
其實若將心跳任務 HeartbeatLoop 放入 connectorServer 中去啟動,代碼看起來更優(yōu)雅一些。但這么做是因為我需要維護一個任務列表,后面會有描述。
在彈幕存儲上我花了些時間選擇。
數(shù)據(jù)庫存儲是一個同步 IO 的過程,Insert 的時候會阻塞彈幕收集的任務。雖然有 aiomysql 這種異步接口,但配置數(shù)據(jù)庫太麻煩,我的設想是這個小系統(tǒng)能夠方便地部署。
最終我選擇使用自帶的 sqlite3。但 sqlite3 無法做并行操作,故開了一個線程單獨進行數(shù)據(jù)庫存儲。在另一個線程中,100 * 2 個任務搜集所有的彈幕、人數(shù)信息,并塞進隊列 commentq, numq 中。存儲線程每隔 10s 喚醒一次,將隊列中的數(shù)據(jù)寫進 sqlite3 中,并清空隊列。
在多線程和異步的配合下,網(wǎng)絡流量沒有被阻塞。
可能的連接失敗場景處理
彈幕協(xié)議是直接基于 TCP,位與位直接關聯(lián)性較強,一旦解析錯誤,很容易就拋 Exception(個人感覺,雖然 TCP 是可靠傳輸,但B站服務器自身發(fā)生錯誤也是有可能的)。所以有必要設計一個自動重連機制。
在 asyncio 文檔中提到,
Done means either that a result / exception are available, or that the future was cancelled.
函數(shù)正常返回、拋出異?;蛘呤潜?cancel,都會退出當前任務。可以使用 done() 來判斷。
每一個直播間對應兩個任務,解析任務是最容易掛的,但并不會影響心跳任務,所以必須找出并將對應心跳任務結束。
在創(chuàng)建任務的時候使用字典記錄每個房間的兩個任務,
self.tasks[url] = [task1, task2]
在運行過程中,每隔 10s 做一次檢查,
for url in self.tasks: item = self.tasks[url] task1 = item[0] task2 = item[1] if task1.done() == True or task2.done() == True: if task1.done() == False: task1.cancel() if task2.done() == False: task2.cancel() danmuji = bilibiliClient(url, self.lock, self.commentq, self.numq) task11 = asyncio.ensure_future(danmuji.connectServer()) task22 = asyncio.ensure_future(danmuji.HeartbeatLoop()) self.tasks[url] = [task11, task22]
實際我只見過一次任務失敗的場景,是因為主播房間被封了,導致無法進入直播間。
結論
- B站人數(shù)是按照連接彈幕服務器的鏈接數(shù)量統(tǒng)計的。通過操縱鏈接量,可以瞬間增加任意人數(shù)觀看,有商機?
- 運行的這幾天中,發(fā)現(xiàn)即使大部分房間不在直播,也能有 >5 的人數(shù),包括凌晨。我只能猜測也有和我一樣的人在 24h 收集彈幕。
- top100 平均一天 40M 彈幕數(shù)據(jù)。
- 收集的彈幕能做什么?還沒想好,可能可以拿來做用戶行為分析 -_^
最后附上本源碼的GITHUB地址 https://github.com/lyyyuna/bilibili_danmu_colloector
相關文章
python 實現(xiàn)學生信息管理系統(tǒng)的示例
本篇文章主要分享python學生管理系統(tǒng)的使用,文章非常詳細地介紹了通過示例代碼實現(xiàn)的學生管理系統(tǒng),該系統(tǒng)對每個人的研究或工作都有一定的參考學習價值,希望你能在其中有所收獲。2020-11-11django restframework序列化字段校驗規(guī)則
本文主要介紹了django restframework序列化字段校驗規(guī)則,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2022-05-05python消費kafka數(shù)據(jù)批量插入到es的方法
今天小編就為大家分享一篇python消費kafka數(shù)據(jù)批量插入到es的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-12-12Python爬蟲爬取微博熱搜保存為 Markdown 文件的源碼
這篇文章主要介紹了Python爬蟲爬取微博熱搜保存為 Markdown 文件,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-02-02Python如何運用pyaudio庫去做一個固定采樣率音頻錄制器
這篇文章主要介紹了Python如何運用pyaudio庫去做一個固定采樣率音頻錄制器問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-05-05