nodejs+socket.io實現(xiàn)p2p消息實時發(fā)送的項目實踐
常見的消息通知:
常見的站內(nèi)通知類別(括號里是對自己目前項目出現(xiàn)情況的分析,讀者忽略):
- 公告 Announcement (通道加入新的組織、某組織或用戶新上傳了某數(shù)據(jù)摘要、系統(tǒng)凌晨需要版本更新等事件)
- 提醒 Remind(用戶之間、系統(tǒng)與用戶之間)
- 資源訂閱提醒(關(guān)注的數(shù)據(jù)摘要更新了內(nèi)容、評論等)
- 資源發(fā)布提醒(我發(fā)布的數(shù)據(jù)摘要被評論了,被關(guān)注了,被申請交易了)
- 系統(tǒng)提醒
- 私信 Mailbox(類似聊天室吧,暫時沒有這需求)
實現(xiàn)思路與步驟等
除了用消息隊列MQ以外,筆者想到的是使用websocket協(xié)議實現(xiàn),該協(xié)議為全雙工通信full-duplex,長連接PersistentConnection,相比http來說是種持久化協(xié)議。
其中主要的開發(fā)步驟有:
- 綁定連接(用戶賬號和websocket之間的連接)
- 管理連接
- 收發(fā)消息(數(shù)據(jù)格式和讀取等具體實現(xiàn))。
其中,需要注意的點有:
- 長連接的心跳激活處理;
- 服務(wù)端調(diào)優(yōu)實現(xiàn)高并發(fā)量client同時在線(單機服務(wù)器可以實現(xiàn)百萬并發(fā)長連接);
- 群發(fā)消息;
- 服務(wù)端維持多用戶的狀態(tài);
- 從WebSocket中獲取HttpSession進行用戶相關(guān)操作等
具體實現(xiàn)思路:
- 前端使用WebSocket與服務(wù)端創(chuàng)建連接的時候,將用戶ID傳給服務(wù)端,服務(wù)端將用戶ID與channel關(guān)聯(lián)起來存儲,同時將channel放入到channel組中。(這里的channel就是服務(wù)器與客戶端之間的連接)
- 如果需要給所有用戶發(fā)送消息,直接執(zhí)行channel組的writeAndFlush()方法;
- 如果需要給指定用戶發(fā)送消息,根據(jù)用戶ID查詢到對應(yīng)的channel,然后執(zhí)行writeAndFlush()方法;
- 前端獲取到服務(wù)端推送的消息之后,將消息內(nèi)容展示到文本域中。
其他方法介紹
輪詢:客戶端定時向服務(wù)器發(fā)送Ajax請求,服務(wù)器接到請求后馬上返回響應(yīng)信息并關(guān)閉連接。 優(yōu)點:后端程序編寫比較容易。 缺點:請求中有大半是無用,浪費帶寬和服務(wù)器資源。 實例:適于小型應(yīng)用。
長輪詢:客戶端向服務(wù)器發(fā)送Ajax請求,服務(wù)器接到請求后hold住連接,直到有新消息才返回響應(yīng)信息并關(guān)閉連接,客戶端處理完響應(yīng)信息后再向服務(wù)器發(fā)送新的請求。 優(yōu)點:在無消息的情況下不會頻繁的請求,耗費資小。 缺點:服務(wù)器hold連接會消耗資源,返回數(shù)據(jù)順序無保證,難于管理維護。 Comet異步的ashx, 實例:WebQQ、Hi網(wǎng)頁版、Facebook IM。
長連接:在頁面里嵌入一個隱蔵iframe,將這個隱蔵iframe的src屬性設(shè)為對一個長連接的請求或是采用xhr請求,服務(wù)器端就能源源不斷地往客戶端輸入數(shù)據(jù)。 優(yōu)點:消息即時到達,不發(fā)無用請求;管理起來也相對便。 缺點:服務(wù)器維護一個長連接會增加開銷。 實例:Gmail聊天
Flash Socket:在頁面中內(nèi)嵌入一個使用了Socket類的 Flash 程序JavaScript通過調(diào)用此Flash程序提供的Socket接口與服務(wù)器端的Socket接口進行通信,JavaScript在收到服務(wù)器端傳送的信息后控制頁面的顯示。 優(yōu)點:實現(xiàn)真正的即時通信,而不是偽即時。 缺點:客戶端必須安裝Flash插件;非HTTP協(xié)議,無法自動穿越防火墻。 實例:網(wǎng)絡(luò)互動游戲。
技術(shù)實現(xiàn)與相關(guān)包介紹
包介紹
nodejs不像其他的服務(wù)器,對于不同的連接,不支持進程和線程操作,寫這類功能的時候就需要找更合適的包。
使用WebSocket協(xié)議的包有好多,這里我先講一種常用的包是nodejs-websocket包,網(wǎng)評說使用較為繁瑣,這里就沒使用。它需要依賴于底層的C++,Python的環(huán)境,支持以node做客戶端的訪問。當然了,這里我一定要說一下,nodejs-websocket是純粹的使用了WebSocket協(xié)議,因此使用時需要寫心跳檢測,檢測用戶是否在線等情況。
我采用的是socket.io,它使用起來較為簡單,功能強大,支持集成websocket服務(wù)器端和Express3框架與一身。它可以不需要心跳檢測,不過這也是個相對說法,因為它結(jié)合封裝了輪詢機制和實時通信,當websocket連接斷掉時,它會不停的嘗試連接,耗費資源。當然了,還有其他庫,比如node-websocket-server(不需要了解,直接放棄)。
技術(shù)實現(xiàn)
在實現(xiàn)前,考慮到發(fā)送消息時,向指定用戶發(fā)送WebSocket消息,但對方可能不在線,這種情況,我這么處理:
- 如果接收者在線,則存儲進redis并實時發(fā)送消息;
- 否則將消息存儲到redis,等用戶登陸上線后主動推送未讀消息。
socket.io的客戶端和服務(wù)端都有兩個函數(shù) on()、emit(),核心函數(shù),可輕松實現(xiàn)客戶端與服務(wù)端的雙向通信。
- emit:觸發(fā)一個事件,第一個參數(shù)是事件名稱,第二個參數(shù)是要發(fā)送到另一端的數(shù)據(jù),第三個參數(shù)是一個回調(diào)函數(shù)用來確認對方的接收信息(也可以說時回執(zhí)),可忽略。
- socket.emit 信息傳輸對象為當前 socket 對應(yīng)的 client ,各個client socket 相互不影響。
- socket.broadcast.emit 信息傳輸對象為所有 client ,排除當前socket 對應(yīng)的 client。
- io.sockets.emit信息傳輸對象為所有 client。
- on:注冊一個事件,用來監(jiān)聽 emit 觸發(fā)的事件。
服務(wù)端
直接上代碼:
'use strict'; // 維護socket連接的代碼 const { addSocketId, getSocketId, deleteSocketId } = require('../../../utils/socket/socketId'); // 保存消息 const message = require('../saveMessage'); // socket連接許可驗證 const { socketAuth } = require('../../../middleware/socket/index') // socket接口,傳入/bin/www.js function init(io) { /** * @description: 為每個傳入執(zhí)行的功能Socket,并且接收套接字和可選地將執(zhí)行延遲到下一個注冊的中間件的參數(shù) */ io.use((socket, next) => { if (socket.request.headers.cookie) return next(); next(new Error('Authentication error')); }); io.on('connection', function(socket) { /** * @description: 用戶登錄,則保存用戶連接的相關(guān)信息,并從redis拉取未讀消息,推送給該用戶 */ socket.on('user_login', function(socketInfo) { if(!socketInfo.userId) { // io.sockets.to(socketInfo['socketId']).emit('disconnect', ''); return; } // 將用戶與socket插入數(shù)據(jù)庫中 addSocketId(socketInfo); if (process.env.NODE_ENV === 'development') { displayUserInfo(socketInfo); }; // 推送所有消息 message.pushMessage(socketInfo['userId']).then(pushData => { io.sockets.to(socketInfo['socketId']).emit('push_message', pushData); }); }); /** * @description: 發(fā)給某用戶交易通知(在線實時通知,并存儲至redis) */ socket.on('todo', function(todoData) { // 存入redis message.addMessage(todoData); // 檢測用戶是否在線 message.isOnline(todoData['receiver_id']).then(isOnline => { // 用戶在線則通信 if (isOnline == true) { getSocketId(todoData['receiver_id']).then(socketId => { io.sockets.to(socketId).emit('todo_message', todoData); }); }; }); }); // TODO: 需要提醒前端在關(guān)閉窗口之前先斷開連接(窗口刷新之前應(yīng)該不需要) /** * @description: 斷開連接 */ socket.on('disconnect', function() { // 從數(shù)據(jù)庫中刪除連接 deleteSocketId(socket.id); // 判斷當前是否是開發(fā)環(huán)境 if (process.env.NODE_ENV === 'development') { displayUserInfo(); } }); }); } function displayUserInfo(user) { console.log(`當前登錄用戶信息:${user}`); return; } module.exports = { init };
上方代碼中,主要創(chuàng)建了connection事件,其下又有user_login、todo、disconnect事件,然后這些事件下又有其創(chuàng)建或監(jiān)聽的事件。其中,user_login事件主要是監(jiān)聽前端用戶的登錄成功,若用戶成功上線,則將redis內(nèi)的已讀未讀消息分類后推送給客戶端。todo事件則是判斷用戶在線后,實時傳遞消息,需要注意使用io.sockets.to(socketId).emit(eventname, eventdata)實現(xiàn)P2P消息傳送,socketId即為接收消息用戶的WebSocket連接的ID??蛻舳藙t需要監(jiān)聽后面emit()參數(shù)中的eventname事件。disconnect事件則是在客戶端用戶登出或刷新頁面等認為是斷開WebSocket連接時,在維護的socket連接組中刪除該用戶的WebSocket連接信息。
當然,在連接到connection事件前,有一個中間件io.use((socket, next) => {},是判斷對方的連接是否有效(帶有cookie的主動連接)。
然后,在/bin/www .js中引入io:
#!/usr/bin/env node // 模塊依賴 var app = require('../app'); var http = require('http'); const socketIndex = require('../src/routes/socket/index/socket'); // 從環(huán)境中取端口,應(yīng)用到express var port = normalizePort(process.env.PORT || '3000'); app.set('port', port); // 創(chuàng)建http服務(wù)(將express注冊到http中) server = http.createServer(app); // 監(jiān)聽 var io = require('socket.io')(server, { cors: { origin: '*' } // path: '/socket' // 重新定義socket連接路徑 }); // 全局聲明 global.io = io; // socket的程序文件下引入io socketIndex.init(io);
其中,引入函數(shù)init()即是上一段代碼中的init函數(shù),傳入?yún)?shù)即為在服務(wù)端入口中創(chuàng)建的io服務(wù)。io服務(wù)中需要傳入cors參數(shù),解決跨域問題,如果想更改websocket連接的地址,則使用path參數(shù),其參數(shù)值即是在原先基礎(chǔ)的websocket連接地址后加上。
客戶端
首先創(chuàng)建一個socket對象,io() 的第一個參數(shù)是鏈接服務(wù)器的 URL,默認情況下是 window.location(需要修改成服務(wù)端的URL,包括對應(yīng)的模塊或權(quán)限對應(yīng)的指定路徑,path參數(shù))。
到此這篇關(guān)于nodejs+socket.io實現(xiàn)p2p消息實時發(fā)送的項目實踐的文章就介紹到這了,更多相關(guān)node socket.io實現(xiàn)p2p實時發(fā)送內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解nodejs微信公眾號開發(fā)——2.自動回復(fù)
這篇文章主要介紹了詳解nodejs微信公眾號開發(fā)——2.自動回復(fù),非常具有實用價值,需要的朋友可以參考下2017-04-04node.js解決客戶端請求數(shù)據(jù)里面中文亂碼的事件方法
本文主要介紹了node.js解決客戶端請求數(shù)據(jù)里面中文亂碼的事件方法,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-12-12在Node.js應(yīng)用程序中處理大數(shù)的操作指南
在JavaScript生態(tài)系統(tǒng)中,你可以使用BigInt來處理大整數(shù),但是,你也可以使用具有類似于BigInt功能的第三方庫,本文將是使用BigInt和提供類似功能的流行庫管理大數(shù)的完整指南,我們還將比較第三方庫的用例、優(yōu)勢和劣勢2023-06-06node連接MongoDB數(shù)據(jù)庫錯誤:MongoServerSelectionError:?connect?ECON
使用node連接MongoDB數(shù)據(jù)庫時發(fā)生報錯,MongoServerSelectionError:?connect?ECONNREFUSED?::1:27017,本文給大家分享原因分析及解決方案,感興趣的朋友跟隨小編一起看看吧2023-04-04基于nodejs使用express創(chuàng)建web服務(wù)器的操作步驟
express實際上是對nodejs內(nèi)置http進行封裝后的第三方包,其中提供了快捷創(chuàng)建web服務(wù)器以及處理請求路由的方法,使我們可以更加方便快捷的實現(xiàn)一個web服務(wù)器項目,本文件給大家詳細介紹基于nodejs使用express?創(chuàng)建web服務(wù)器的操作步驟2023-07-07