一文帶你使用golang手?jǐn)]一個(gè)websocket中間件
序言
【1】:少年,你的項(xiàng)目要不要用到websocket呀?
(你說(shuō)通訊用來(lái)干嘛?在線聊天、消息推送、掃碼登錄、物聯(lián)網(wǎng)設(shè)備管理、游戲、彈幕等等這些需要立即得到反饋但又不想給服務(wù)器增加太多負(fù)擔(dān)的業(yè)務(wù)場(chǎng)景都需要用websocket)
【2】:為什么要使用websocket?
我們先來(lái)模擬一個(gè)簡(jiǎn)單的掃碼登錄網(wǎng)頁(yè)的場(chǎng)景,網(wǎng)頁(yè)端在生成的二維碼后要怎么知道用戶(hù)有沒(méi)有用手機(jī)掃描這個(gè)二維碼呢?在傳統(tǒng)的項(xiàng)目中最長(zhǎng)的方式就是不停的去請(qǐng)求后端接口問(wèn) “用戶(hù)掃了沒(méi)?用戶(hù)掃了沒(méi)?用戶(hù)掃了沒(méi)?(一直往復(fù))”,直到用戶(hù)掃完或者關(guān)閉了網(wǎng)頁(yè)。
這種方式就是最常見(jiàn)的 長(zhǎng)輪訓(xùn)(我最開(kāi)始學(xué)寫(xiě)代碼的時(shí)候也是用這種方式),這種方式是最簡(jiǎn)單的,但是也與之相對(duì)應(yīng)的問(wèn)題也很明顯,占用太多后臺(tái)資源 and 感官延遲,那有沒(méi)有一種別的方案能不占資源又快的方式呢?
在類(lèi)似這種需求下 websocket 誕生了,今天這里我們不談那些枯燥的理論知識(shí),只玩實(shí)操!
<!DOCTYPE html> <html> <head> </head> <body> <script> var ws = new WebSocket("ws://127.0.0.1:8282"); ws.onmessage = function(event) { console.log(event.data) }; ws.onclose = function(event) { console.log('ws連接已關(guān)閉') } </script> </body> </html>
上面是一個(gè)最簡(jiǎn)單的 websocket 連接代碼
嗯~,到這里,你已經(jīng)能正常的連接到服務(wù)器上并等待 服務(wù)器主動(dòng) 給你發(fā)送消息了。
好的,那現(xiàn)在客戶(hù)端準(zhǔn)備好了,服務(wù)端呢,別急,現(xiàn)在我們從 0 開(kāi)始一起手?jǐn)]一個(gè)websocket服務(wù)端
服務(wù)端設(shè)計(jì)
開(kāi)始搭建服務(wù)端前,我們必須先思考一下它架構(gòu)方式。
【1】:為什么是用golang來(lái)開(kāi)發(fā)服務(wù)端,用其他語(yǔ)言不行嗎?
當(dāng)然是可以的!在文章的標(biāo)題中我有提到這個(gè)用golang來(lái)開(kāi)發(fā),是因?yàn)槠渌Z(yǔ)言都有現(xiàn)成可以用的socket中間件,但是golang似乎還沒(méi)有,那我們就來(lái)自己擼一個(gè)吧!
【2】:服務(wù)端定位
開(kāi)始動(dòng)手寫(xiě)代碼前應(yīng)該提前思考這么一個(gè)問(wèn)題,這個(gè)websocket服務(wù)應(yīng)該以一種怎樣的方式存在于項(xiàng)目中呢?
(嵌套組件 OR 獨(dú)立中間件)
我的想法是中間件,出于以下原因考慮:
- 我希望它以一個(gè)獨(dú)立角色存在于我的項(xiàng)目中
- 我不需要它過(guò)多的侵入業(yè)務(wù)代碼
- 只要幫我管理好客戶(hù)端的鏈接和能正常、快速的推送消息即可
【3】:架構(gòu)設(shè)計(jì)
上面我有提到,這個(gè)websocket服務(wù)端的兩個(gè)主要功能是 管理連接和推送消息
好的,那首先圍繞第一個(gè)問(wèn)題,如何管理連接?當(dāng)服務(wù)端出現(xiàn)N多個(gè)連接時(shí)要怎么知道誰(shuí)是誰(shuí),消息應(yīng)該推給誰(shuí)?
寫(xiě)過(guò)php的同學(xué)應(yīng)該知道workerman這個(gè)中間件,在workerman中有三個(gè)非常重要的概念,client、user、group,其實(shí)就是 分類(lèi)管理,下面我分別解釋一下
- client 任何一個(gè)socket連接都會(huì)被視為一個(gè)client,所有的操作也是圍繞著client在進(jìn)行管理
- user 每個(gè)用戶(hù)可能會(huì)打開(kāi)多個(gè)socket或者說(shuō)叫頁(yè)面,那要把消息發(fā)給這個(gè)用戶(hù)時(shí)就需要把消息通知到不同的client,所以這時(shí)就可以把幾個(gè)client歸類(lèi)為一個(gè)user。
- group 還有另外一種需求,需要把消息通知到某些用戶(hù),這時(shí)就要考慮把client歸類(lèi)到一個(gè)group
在看下面的內(nèi)容前,大家一定要先消化了解這三個(gè)概念
還有另外一個(gè)問(wèn)題,怎么讓websocket服務(wù)不侵入業(yè)務(wù)代碼
這里我大概畫(huà)了一張草圖,三者之間的關(guān)系可以這樣理解
- 客戶(hù)端除了連接websocket發(fā)送心跳信息(這個(gè)是服務(wù)所必須的)外,只要被動(dòng)接受Socket服務(wù)推送過(guò)來(lái)的消息即可
- 客戶(hù)端需要發(fā)送消息時(shí)應(yīng)當(dāng)將請(qǐng)求發(fā)送到后端服務(wù),在有后端服務(wù)經(jīng)過(guò)業(yè)務(wù)處理后調(diào)取對(duì)應(yīng)的Socket接口
- Socket服務(wù)器除了管理連接和推送消息外不用再處理任何與業(yè)務(wù)相關(guān)的內(nèi)容
掉頭發(fā)時(shí)間
在博客我只展示一點(diǎn)點(diǎn)代碼哈,其他的都已經(jīng)完全開(kāi)源到github了,各位看官需要的話自取哈
我們要使用golang來(lái)實(shí)現(xiàn)socket服務(wù),自然離不開(kāi) github.com/gorilla/websocket
這個(gè)核心庫(kù)啦!
這里不得不說(shuō),golang的生態(tài)還是挺完善的。
gorilla/websocket幫我們解決了socket的連接和推送問(wèn)題,剩下連接關(guān)系管理和服務(wù)接口就是我們要關(guān)注的重點(diǎn)了。
【1】:連接關(guān)系管理
先來(lái)給大家上兩段代碼
server.go
package websocket import ( "fmt" "sync" "time" "github.com/golang-module/carbon" "github.com/gorilla/websocket" ) type WebSocketClientBase struct { ID string Conn *websocket.Conn LastHeartbeat int64 BindUid string JoinGroup []string } type WebSocketUserBase struct { Uid string ClientID []string } type WebSocketGroupBase struct { ClientID []string } var GatewayClients, GatewayUser, GatewayGroup sync.Map /** * @description: 客戶(hù)端心跳檢測(cè),超時(shí)即斷開(kāi)連接(主要是為了降低服務(wù)端承載壓力) * @param {string} clientID * @return {*} */ func clientHeartbeatCheck(clientID string) { for { time.Sleep(5 * time.Second) clientInterface, exists := GatewayClients.Load(clientID) if !exists { break } client, _ := clientInterface.(*WebSocketClientBase) if (carbon.Now().Timestamp() - client.LastHeartbeat) > int64(HeartbeatTime) { fmt.Println("Client", clientID, "heartbeat timeout") client.Conn.Close() GatewayClients.Delete(clientID) break } } } /** * @description: 客戶(hù)端斷線時(shí)自動(dòng)踢出Uid綁定列表 * @param {string} clientID * @param {string} uid * @return {*} */ func clientUnBindUid(clientID string, uid string) { value, ok := GatewayUser.Load(uid) if ok { users := value.(*WebSocketUserBase) for k, v := range users.ClientID { if v == clientID { users.ClientID = append(users.ClientID[:k], users.ClientID[k+1:]...) } } if len(users.ClientID) == 0 { GatewayUser.Delete(uid) } } } /** * @description: 客戶(hù)端斷線時(shí)自動(dòng)踢出已加入的群組 * @param {string} clientID * @return {*} */ func clientLeaveGroup(clientID string) { // 使用 Load 方法獲取值 value, ok := GatewayClients.Load(clientID) if !ok { // 如果沒(méi)有找到對(duì)應(yīng)的值,處理相應(yīng)的邏輯 return } client := value.(*WebSocketClientBase) // 遍歷 JoinGroup for _, v := range client.JoinGroup { // 使用 Load 方法獲取值 groupValue, groupOK := GatewayGroup.Load(v) if !groupOK { // 如果沒(méi)有找到對(duì)應(yīng)的值,處理相應(yīng)的邏輯 continue } group := groupValue.(*WebSocketGroupBase) // 在群組中找到對(duì)應(yīng)的 clientID,并刪除 for j, id := range group.ClientID { if id == clientID { copy(group.ClientID[j:], group.ClientID[j+1:]) group.ClientID = group.ClientID[:len(group.ClientID)-1] // 如果群組中沒(méi)有成員了,刪除群組 if len(group.ClientID) == 0 { GatewayGroup.Delete(v) } break } } } }
connect.go
package websocket import ( "fmt" "gateway-websocket/config" "net/http" "runtime/debug" "time" "github.com/gin-gonic/gin" "github.com/golang-module/carbon" "github.com/google/uuid" "github.com/gorilla/websocket" ) var ( upGrader = websocket.Upgrader{ // 設(shè)置消息接收緩沖區(qū)大?。╞yte),如果這個(gè)值設(shè)置得太小,可能會(huì)導(dǎo)致服務(wù)端在讀取客戶(hù)端發(fā)送的大型消息時(shí)遇到問(wèn)題 ReadBufferSize: config.GatewayConfig["ReadBufferSize"].(int), // 設(shè)置消息發(fā)送緩沖區(qū)大?。╞yte),如果這個(gè)值設(shè)置得太小,可能會(huì)導(dǎo)致服務(wù)端在發(fā)送大型消息時(shí)遇到問(wèn)題 WriteBufferSize: config.GatewayConfig["WriteBufferSize"].(int), // 消息包啟用壓縮 EnableCompression: config.GatewayConfig["MessageCompression"].(bool), // ws握手超時(shí)時(shí)間 HandshakeTimeout: time.Duration(config.GatewayConfig["WebsocketHandshakeTimeout"].(int)) * time.Second, // ws握手過(guò)程中允許跨域 CheckOrigin: func(r *http.Request) bool { return true }, } // 設(shè)置心跳檢測(cè)間隔時(shí)長(zhǎng)(秒) HeartbeatTime = config.GatewayConfig["HeartbeatTimeout"].(int) ) /** * @description: 初始化客戶(hù)端連接 * @param {*websocket.Conn} conn * @return {*} */ func handleClientInit(conn *websocket.Conn) string { clientID := uuid.New().String() client := &WebSocketClientBase{ ID: clientID, Conn: conn, LastHeartbeat: carbon.Now().Timestamp(), } // 使用 Store 方法存儲(chǔ)值 GatewayClients.Store(clientID, client) if err := conn.WriteMessage(config.GatewayConfig["MessageFormat"].(int), []byte(clientID)); err != nil { handleClientDisconnect(clientID) return "" } return clientID } /** * @description: 主動(dòng)關(guān)閉客戶(hù)端連接 * @param {string} clientID * @return {*} */ func handleClientDisconnect(clientID string) { // 使用 Load 和 Delete 方法,不需要額外的鎖定操作 v, ok := GatewayClients.Load(clientID) if ok { client := v.(*WebSocketClientBase) if client.BindUid != "" { clientUnBindUid(clientID, client.BindUid) } if len(client.JoinGroup) > 0 { clientLeaveGroup(clientID) } GatewayClients.Delete(clientID) } } /** * @description: 向客戶(hù)端回復(fù)心跳消息 * @param {*websocket.Conn} conn * @param {string} clientID * @param {int} messageType * @param {[]byte} message * @return {*} */ func handleClientMessage(conn *websocket.Conn, clientID string, messageType int, message []byte) { // 使用 Load 方法獲取值 v, ok := GatewayClients.Load(clientID) if !ok { // 如果沒(méi)有找到對(duì)應(yīng)的值,處理相應(yīng)的邏輯 handleClientDisconnect(clientID) return } client := v.(*WebSocketClientBase) if messageType == config.GatewayConfig["MessageFormat"].(int) && string(message) == "ping" { if err := conn.WriteMessage(config.GatewayConfig["MessageFormat"].(int), []byte("pong")); err != nil { handleClientDisconnect(clientID) return } GatewayClients.Store(clientID, &WebSocketClientBase{ ID: clientID, Conn: conn, LastHeartbeat: carbon.Now().Timestamp(), BindUid: client.BindUid, JoinGroup: client.JoinGroup, }) } } func WsServer(c *gin.Context) { defer func() { if err := recover(); err != nil { fmt.Printf("WsServer panic: %v\n", err) debug.PrintStack() } }() // 將 HTTP 連接升級(jí)為 WebSocket 連接 conn, err := upGrader.Upgrade(c.Writer, c.Request, nil) if err != nil { return } defer conn.Close() // 客戶(hù)端唯一身份標(biāo)識(shí) clientID := handleClientInit(conn) // 發(fā)送客戶(hù)端唯一標(biāo)識(shí) ID if clientID == "" { return } go clientHeartbeatCheck(clientID) for { // 讀取客戶(hù)端發(fā)送過(guò)來(lái)的消息 messageType, message, err := conn.ReadMessage() // 當(dāng)收到err時(shí)則標(biāo)識(shí)客戶(hù)端連接出現(xiàn)異常,如斷線 if err != nil { handleClientDisconnect(clientID) } else { handleClientMessage(conn, clientID, messageType, message) } } }
在上面的代碼中,我創(chuàng)建了一個(gè)websocket的連接服務(wù)和使用了3個(gè)sync.Map
來(lái)分別存放管理不同的客戶(hù)端連接
(在做這種存在高并發(fā)場(chǎng)景的業(yè)務(wù)時(shí)不要使用Map而是用sync.Map,因?yàn)間o的Map是非線程安全的,在并發(fā)時(shí)會(huì)造成資源競(jìng)爭(zhēng)從而導(dǎo)致你的程序宕掉,這點(diǎn)一定要注意!?。?
Stop,文章好像被拉的太長(zhǎng)了(⊙o⊙)…,那就只展示一點(diǎn)點(diǎn)吧,其他的代碼和php操作Demo都完全開(kāi)源到github啦,大家自取哈。
測(cè)試時(shí)間
代碼寫(xiě)完,先把程序run起來(lái)
然后壓測(cè)安排上
大家可以在自己電腦上試試看,我這個(gè)Jmeter不知道什么原因,線程數(shù)超過(guò)1000后就運(yùn)行很慢了
(單純是Jmeter慢,不是go哈,也可能是我電腦的問(wèn)題)
以上就是一文帶你使用golang手?jǐn)]一個(gè)websocket中間件的詳細(xì)內(nèi)容,更多關(guān)于go websocket中間件的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- Go語(yǔ)言實(shí)現(xiàn)百萬(wàn)級(jí)WebSocket連接架構(gòu)設(shè)計(jì)及服務(wù)優(yōu)化
- 使用Golang創(chuàng)建單獨(dú)的WebSocket會(huì)話
- Golang?WebSocket創(chuàng)建單獨(dú)會(huì)話詳細(xì)實(shí)例
- Golang構(gòu)建WebSocket服務(wù)器和客戶(hù)端的示例詳解
- golang用melody搭建輕量的websocket服務(wù)的示例代碼
- 基于Go+WebSocket實(shí)現(xiàn)實(shí)時(shí)通信功能
- Golang實(shí)現(xiàn)WebSocket服務(wù)的項(xiàng)目實(shí)踐
- Go語(yǔ)言實(shí)現(xiàn)websocket推送程序
- Go?實(shí)現(xiàn)?WebSockets之創(chuàng)建?WebSockets
- 深入理解Golang中WebSocket和WSS的支持
相關(guān)文章
Go Run, Go Build, Go Install的區(qū)別
本文深入探討Go語(yǔ)言中g(shù)orun、gobuild和goinstall三個(gè)常用命令的功能區(qū)別和適用場(chǎng)景,文中通過(guò)具體代碼示例,詳細(xì)解釋了各命令的使用方式及其應(yīng)用場(chǎng)景,幫助開(kāi)發(fā)者高效利用這些工具2024-10-10Go并發(fā)讀寫(xiě)文件、分片寫(xiě)、分片下載文件的實(shí)現(xiàn)示例
讀寫(xiě)文件在很多項(xiàng)目中都可以用到,本文主要介紹了Go并發(fā)讀寫(xiě)文件、分片寫(xiě)、分片下載文件的實(shí)現(xiàn)示例,具有一定的參考價(jià)值,感興趣的可以了解一下2024-01-01GO中?分組聲明與array,?slice,?map函數(shù)
這篇文章主要介紹了GO中?分組聲明與array,slice,map函數(shù),Go語(yǔ)言中,同時(shí)聲明多個(gè)常量、變量,或者導(dǎo)入多個(gè)包時(shí),可采用分組的方式進(jìn)行聲明,下面詳細(xì)介紹需要的小伙伴可以參考一下2022-03-03