一文帶你使用golang手?jǐn)]一個websocket中間件
序言
【1】:少年,你的項目要不要用到websocket呀?
(你說通訊用來干嘛?在線聊天、消息推送、掃碼登錄、物聯(lián)網(wǎng)設(shè)備管理、游戲、彈幕等等這些需要立即得到反饋但又不想給服務(wù)器增加太多負(fù)擔(dān)的業(yè)務(wù)場景都需要用websocket)
【2】:為什么要使用websocket?
我們先來模擬一個簡單的掃碼登錄網(wǎng)頁的場景,網(wǎng)頁端在生成的二維碼后要怎么知道用戶有沒有用手機(jī)掃描這個二維碼呢?在傳統(tǒng)的項目中最長的方式就是不停的去請求后端接口問 “用戶掃了沒?用戶掃了沒?用戶掃了沒?(一直往復(fù))”,直到用戶掃完或者關(guān)閉了網(wǎng)頁。
這種方式就是最常見的 長輪訓(xùn)(我最開始學(xué)寫代碼的時候也是用這種方式),這種方式是最簡單的,但是也與之相對應(yīng)的問題也很明顯,占用太多后臺資源 and 感官延遲,那有沒有一種別的方案能不占資源又快的方式呢?
在類似這種需求下 websocket 誕生了,今天這里我們不談那些枯燥的理論知識,只玩實操!
<!DOCTYPE html> <html> <head> </head> <body> <script> var ws = new WebSocket("ws://127.0.0.1:8282"); ws.onmessage = function(event) { console.log(event.data) }; ws.onclose = function(event) { console.log('ws連接已關(guān)閉') } </script> </body> </html>
上面是一個最簡單的 websocket 連接代碼
嗯~,到這里,你已經(jīng)能正常的連接到服務(wù)器上并等待 服務(wù)器主動 給你發(fā)送消息了。
好的,那現(xiàn)在客戶端準(zhǔn)備好了,服務(wù)端呢,別急,現(xiàn)在我們從 0 開始一起手?jǐn)]一個websocket服務(wù)端
服務(wù)端設(shè)計
開始搭建服務(wù)端前,我們必須先思考一下它架構(gòu)方式。
【1】:為什么是用golang來開發(fā)服務(wù)端,用其他語言不行嗎?
當(dāng)然是可以的!在文章的標(biāo)題中我有提到這個用golang來開發(fā),是因為其他語言都有現(xiàn)成可以用的socket中間件,但是golang似乎還沒有,那我們就來自己擼一個吧!
【2】:服務(wù)端定位
開始動手寫代碼前應(yīng)該提前思考這么一個問題,這個websocket服務(wù)應(yīng)該以一種怎樣的方式存在于項目中呢?
(嵌套組件 OR 獨立中間件)
我的想法是中間件,出于以下原因考慮:
- 我希望它以一個獨立角色存在于我的項目中
- 我不需要它過多的侵入業(yè)務(wù)代碼
- 只要幫我管理好客戶端的鏈接和能正常、快速的推送消息即可
【3】:架構(gòu)設(shè)計
上面我有提到,這個websocket服務(wù)端的兩個主要功能是 管理連接和推送消息
好的,那首先圍繞第一個問題,如何管理連接?當(dāng)服務(wù)端出現(xiàn)N多個連接時要怎么知道誰是誰,消息應(yīng)該推給誰?
寫過php的同學(xué)應(yīng)該知道workerman這個中間件,在workerman中有三個非常重要的概念,client、user、group,其實就是 分類管理,下面我分別解釋一下
- client 任何一個socket連接都會被視為一個client,所有的操作也是圍繞著client在進(jìn)行管理
- user 每個用戶可能會打開多個socket或者說叫頁面,那要把消息發(fā)給這個用戶時就需要把消息通知到不同的client,所以這時就可以把幾個client歸類為一個user。
- group 還有另外一種需求,需要把消息通知到某些用戶,這時就要考慮把client歸類到一個group
在看下面的內(nèi)容前,大家一定要先消化了解這三個概念
還有另外一個問題,怎么讓websocket服務(wù)不侵入業(yè)務(wù)代碼
這里我大概畫了一張草圖,三者之間的關(guān)系可以這樣理解
- 客戶端除了連接websocket發(fā)送心跳信息(這個是服務(wù)所必須的)外,只要被動接受Socket服務(wù)推送過來的消息即可
- 客戶端需要發(fā)送消息時應(yīng)當(dāng)將請求發(fā)送到后端服務(wù),在有后端服務(wù)經(jīng)過業(yè)務(wù)處理后調(diào)取對應(yīng)的Socket接口
- Socket服務(wù)器除了管理連接和推送消息外不用再處理任何與業(yè)務(wù)相關(guān)的內(nèi)容
掉頭發(fā)時間
在博客我只展示一點點代碼哈,其他的都已經(jīng)完全開源到github了,各位看官需要的話自取哈
我們要使用golang來實現(xiàn)socket服務(wù),自然離不開 github.com/gorilla/websocket
這個核心庫啦!
這里不得不說,golang的生態(tài)還是挺完善的。
gorilla/websocket幫我們解決了socket的連接和推送問題,剩下連接關(guān)系管理和服務(wù)接口就是我們要關(guān)注的重點了。
【1】:連接關(guān)系管理
先來給大家上兩段代碼
server.go
package websocket import ( "fmt" "sync" "time" "github.com/golang-module/carbon" "github.com/gorilla/websocket" ) type WebSocketClientBase struct { ID string Conn *websocket.Conn LastHeartbeat int64 BindUid string JoinGroup []string } type WebSocketUserBase struct { Uid string ClientID []string } type WebSocketGroupBase struct { ClientID []string } var GatewayClients, GatewayUser, GatewayGroup sync.Map /** * @description: 客戶端心跳檢測,超時即斷開連接(主要是為了降低服務(wù)端承載壓力) * @param {string} clientID * @return {*} */ func clientHeartbeatCheck(clientID string) { for { time.Sleep(5 * time.Second) clientInterface, exists := GatewayClients.Load(clientID) if !exists { break } client, _ := clientInterface.(*WebSocketClientBase) if (carbon.Now().Timestamp() - client.LastHeartbeat) > int64(HeartbeatTime) { fmt.Println("Client", clientID, "heartbeat timeout") client.Conn.Close() GatewayClients.Delete(clientID) break } } } /** * @description: 客戶端斷線時自動踢出Uid綁定列表 * @param {string} clientID * @param {string} uid * @return {*} */ func clientUnBindUid(clientID string, uid string) { value, ok := GatewayUser.Load(uid) if ok { users := value.(*WebSocketUserBase) for k, v := range users.ClientID { if v == clientID { users.ClientID = append(users.ClientID[:k], users.ClientID[k+1:]...) } } if len(users.ClientID) == 0 { GatewayUser.Delete(uid) } } } /** * @description: 客戶端斷線時自動踢出已加入的群組 * @param {string} clientID * @return {*} */ func clientLeaveGroup(clientID string) { // 使用 Load 方法獲取值 value, ok := GatewayClients.Load(clientID) if !ok { // 如果沒有找到對應(yīng)的值,處理相應(yīng)的邏輯 return } client := value.(*WebSocketClientBase) // 遍歷 JoinGroup for _, v := range client.JoinGroup { // 使用 Load 方法獲取值 groupValue, groupOK := GatewayGroup.Load(v) if !groupOK { // 如果沒有找到對應(yīng)的值,處理相應(yīng)的邏輯 continue } group := groupValue.(*WebSocketGroupBase) // 在群組中找到對應(yīng)的 clientID,并刪除 for j, id := range group.ClientID { if id == clientID { copy(group.ClientID[j:], group.ClientID[j+1:]) group.ClientID = group.ClientID[:len(group.ClientID)-1] // 如果群組中沒有成員了,刪除群組 if len(group.ClientID) == 0 { GatewayGroup.Delete(v) } break } } } }
connect.go
package websocket import ( "fmt" "gateway-websocket/config" "net/http" "runtime/debug" "time" "github.com/gin-gonic/gin" "github.com/golang-module/carbon" "github.com/google/uuid" "github.com/gorilla/websocket" ) var ( upGrader = websocket.Upgrader{ // 設(shè)置消息接收緩沖區(qū)大?。╞yte),如果這個值設(shè)置得太小,可能會導(dǎo)致服務(wù)端在讀取客戶端發(fā)送的大型消息時遇到問題 ReadBufferSize: config.GatewayConfig["ReadBufferSize"].(int), // 設(shè)置消息發(fā)送緩沖區(qū)大小(byte),如果這個值設(shè)置得太小,可能會導(dǎo)致服務(wù)端在發(fā)送大型消息時遇到問題 WriteBufferSize: config.GatewayConfig["WriteBufferSize"].(int), // 消息包啟用壓縮 EnableCompression: config.GatewayConfig["MessageCompression"].(bool), // ws握手超時時間 HandshakeTimeout: time.Duration(config.GatewayConfig["WebsocketHandshakeTimeout"].(int)) * time.Second, // ws握手過程中允許跨域 CheckOrigin: func(r *http.Request) bool { return true }, } // 設(shè)置心跳檢測間隔時長(秒) HeartbeatTime = config.GatewayConfig["HeartbeatTimeout"].(int) ) /** * @description: 初始化客戶端連接 * @param {*websocket.Conn} conn * @return {*} */ func handleClientInit(conn *websocket.Conn) string { clientID := uuid.New().String() client := &WebSocketClientBase{ ID: clientID, Conn: conn, LastHeartbeat: carbon.Now().Timestamp(), } // 使用 Store 方法存儲值 GatewayClients.Store(clientID, client) if err := conn.WriteMessage(config.GatewayConfig["MessageFormat"].(int), []byte(clientID)); err != nil { handleClientDisconnect(clientID) return "" } return clientID } /** * @description: 主動關(guān)閉客戶端連接 * @param {string} clientID * @return {*} */ func handleClientDisconnect(clientID string) { // 使用 Load 和 Delete 方法,不需要額外的鎖定操作 v, ok := GatewayClients.Load(clientID) if ok { client := v.(*WebSocketClientBase) if client.BindUid != "" { clientUnBindUid(clientID, client.BindUid) } if len(client.JoinGroup) > 0 { clientLeaveGroup(clientID) } GatewayClients.Delete(clientID) } } /** * @description: 向客戶端回復(fù)心跳消息 * @param {*websocket.Conn} conn * @param {string} clientID * @param {int} messageType * @param {[]byte} message * @return {*} */ func handleClientMessage(conn *websocket.Conn, clientID string, messageType int, message []byte) { // 使用 Load 方法獲取值 v, ok := GatewayClients.Load(clientID) if !ok { // 如果沒有找到對應(yīng)的值,處理相應(yīng)的邏輯 handleClientDisconnect(clientID) return } client := v.(*WebSocketClientBase) if messageType == config.GatewayConfig["MessageFormat"].(int) && string(message) == "ping" { if err := conn.WriteMessage(config.GatewayConfig["MessageFormat"].(int), []byte("pong")); err != nil { handleClientDisconnect(clientID) return } GatewayClients.Store(clientID, &WebSocketClientBase{ ID: clientID, Conn: conn, LastHeartbeat: carbon.Now().Timestamp(), BindUid: client.BindUid, JoinGroup: client.JoinGroup, }) } } func WsServer(c *gin.Context) { defer func() { if err := recover(); err != nil { fmt.Printf("WsServer panic: %v\n", err) debug.PrintStack() } }() // 將 HTTP 連接升級為 WebSocket 連接 conn, err := upGrader.Upgrade(c.Writer, c.Request, nil) if err != nil { return } defer conn.Close() // 客戶端唯一身份標(biāo)識 clientID := handleClientInit(conn) // 發(fā)送客戶端唯一標(biāo)識 ID if clientID == "" { return } go clientHeartbeatCheck(clientID) for { // 讀取客戶端發(fā)送過來的消息 messageType, message, err := conn.ReadMessage() // 當(dāng)收到err時則標(biāo)識客戶端連接出現(xiàn)異常,如斷線 if err != nil { handleClientDisconnect(clientID) } else { handleClientMessage(conn, clientID, messageType, message) } } }
在上面的代碼中,我創(chuàng)建了一個websocket的連接服務(wù)和使用了3個sync.Map
來分別存放管理不同的客戶端連接
(在做這種存在高并發(fā)場景的業(yè)務(wù)時不要使用Map而是用sync.Map,因為go的Map是非線程安全的,在并發(fā)時會造成資源競爭從而導(dǎo)致你的程序宕掉,這點一定要注意!??!)
Stop,文章好像被拉的太長了(⊙o⊙)…,那就只展示一點點吧,其他的代碼和php操作Demo都完全開源到github啦,大家自取哈。
測試時間
代碼寫完,先把程序run起來
然后壓測安排上
大家可以在自己電腦上試試看,我這個Jmeter不知道什么原因,線程數(shù)超過1000后就運(yùn)行很慢了
(單純是Jmeter慢,不是go哈,也可能是我電腦的問題)
以上就是一文帶你使用golang手?jǐn)]一個websocket中間件的詳細(xì)內(nèi)容,更多關(guān)于go websocket中間件的資料請關(guān)注腳本之家其它相關(guān)文章!
- Go語言實現(xiàn)百萬級WebSocket連接架構(gòu)設(shè)計及服務(wù)優(yōu)化
- 使用Golang創(chuàng)建單獨的WebSocket會話
- Golang?WebSocket創(chuàng)建單獨會話詳細(xì)實例
- Golang構(gòu)建WebSocket服務(wù)器和客戶端的示例詳解
- golang用melody搭建輕量的websocket服務(wù)的示例代碼
- 基于Go+WebSocket實現(xiàn)實時通信功能
- Golang實現(xiàn)WebSocket服務(wù)的項目實踐
- Go語言實現(xiàn)websocket推送程序
- Go?實現(xiàn)?WebSockets之創(chuàng)建?WebSockets
- 深入理解Golang中WebSocket和WSS的支持
相關(guān)文章
Go Run, Go Build, Go Install的區(qū)別
本文深入探討Go語言中g(shù)orun、gobuild和goinstall三個常用命令的功能區(qū)別和適用場景,文中通過具體代碼示例,詳細(xì)解釋了各命令的使用方式及其應(yīng)用場景,幫助開發(fā)者高效利用這些工具2024-10-10Go并發(fā)讀寫文件、分片寫、分片下載文件的實現(xiàn)示例
讀寫文件在很多項目中都可以用到,本文主要介紹了Go并發(fā)讀寫文件、分片寫、分片下載文件的實現(xiàn)示例,具有一定的參考價值,感興趣的可以了解一下2024-01-01GO中?分組聲明與array,?slice,?map函數(shù)
這篇文章主要介紹了GO中?分組聲明與array,slice,map函數(shù),Go語言中,同時聲明多個常量、變量,或者導(dǎo)入多個包時,可采用分組的方式進(jìn)行聲明,下面詳細(xì)介紹需要的小伙伴可以參考一下2022-03-03