ASP.NET Core WebSocket集群實現(xiàn)思路詳解
前言
提到WebSocket
相信大家都聽說過,它的初衷是為了解決客戶端瀏覽器與服務(wù)端進行雙向通信,是在單個TCP
連接上進行全雙工通訊的協(xié)議。在沒有WebSocket之前只能通過瀏覽器到服務(wù)端的請求應(yīng)答模式比如輪詢,來實現(xiàn)服務(wù)端的變更響應(yīng)到客戶端,現(xiàn)在服務(wù)端也可以主動發(fā)送數(shù)據(jù)到客戶端瀏覽器。WebSocket
協(xié)議和Http
協(xié)議平行,都屬于TCP/IP四層模型
中的第四層應(yīng)用層。由于WebSocket
握手階段采用HTTP
協(xié)議,所以也需要進行跨域處理。它的協(xié)議標識是ws
或wss
對應(yīng)了常規(guī)標識和安全通信協(xié)議標識。本文重點并不是介紹WebSocket
協(xié)議相關(guān),而是提供一種基于ASP.NET Core原生WebSocket的方式實現(xiàn)集群的實現(xiàn)思路。關(guān)于這套思路其實很早之前我就構(gòu)思過了,只是之前一直沒有系統(tǒng)的整理出來,本篇文章就來和大家分享一下,由于主要是提供一種思路,所以涉及到具體細節(jié)或者業(yè)務(wù)相關(guān)的可能沒有體現(xiàn)出來,還望大家理解。
實現(xiàn)
咱們的重點關(guān)鍵字就是兩個WebSocket
和集群
,實現(xiàn)的框架便是基于ASP.NET Core
,我也基于golang
實現(xiàn)了一套,本文涉及到的相關(guān)源碼和golang版本的實現(xiàn)都已上傳至我的github,具體倉庫地址可以轉(zhuǎn)到文末自行跳轉(zhuǎn)到#示例源碼中查看。既然涉及到集群,這里咱們就用nginx
作為反向代理,來搭建一個集群實例。大致的示例結(jié)構(gòu)如下圖所示
redis
在這里扮演的角色呢,是用來處理Server
端的消息相互傳遞用的,主要是使用的redis的pub/sub
功能來實現(xiàn)的,這里便涉及到幾個核心問題
- 首先,集群狀態(tài)每個用戶被分發(fā)到具體的哪臺服務(wù)器上是不得而知的
- 其次,處在不同
Server
端的不同用戶間的相互通信是需要一個傳遞媒介 - 最后,針對不同的場景比如單發(fā)消息、分組消息、全部通知等要有不同的處理策略
這里需要考慮的是,如果需要搭建實時通信服務(wù)器的話,需要注意集群的隔離性,主要是和核心業(yè)務(wù)進行隔離,畢竟WebSocket
需要保持長鏈接、且消息的大小需要評估。
上面提到了redis
的主要功能就是用來傳遞消息用的,畢竟每個server服務(wù)器是無狀態(tài)的。這當(dāng)然不是必須的,任何可以進行消息分發(fā)的中間件都可以,比如消息隊列rabbitmq、kafka、rocketmq、mqtt等,甚至只要能把要處理的消息存儲起來都可以比如緩存甚至是關(guān)系型數(shù)據(jù)庫等等。這壓力使用redis主要是因為操作起來簡單、輕量級、靈活,讓大家關(guān)注點在思路上,而不是使用中案件的代碼上。
nginx配置
通過上面的圖我們可以看到,我們這里構(gòu)建集群示例使用的nginx,如果讓nginx支持WebSocket的話,需要額外的配置,這個在網(wǎng)上有很多相關(guān)的文章介紹,這里就來列一下咱們示例的nginx配置,在配置文件nginx.conf
里
//上游服務(wù)器地址也就是websocket服務(wù)的真實地址 upstream wsbackend { server 127.0.0.1:5001; server 127.0.0.1:5678; } server { listen 5000; server_name localhost; location ~/chat/{ //upstream地址 proxy_pass http://wsbackend; proxy_connect_timeout 60s; proxy_read_timeout 3600s; proxy_send_timeout 3600s; //記得轉(zhuǎn)發(fā)避免踩坑 proxy_set_header Host $host; proxy_http_version 1.1; //http升級成websocket協(xié)議的頭標識 proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "Upgrade"; } }
這套配置呢,在搜索引擎上能收到很多,不過不妨礙我把使用的粘貼出來。這一套親測有效,也是我使用的配置,請放心使用。個人認為如果是線上環(huán)境采用的負載均衡策略可以選擇ip_hash
的方式,保證同一個ip的客戶端用戶可以分發(fā)到一臺WebSocket實例中去,這樣的話能盡量避免使用redis的用戶頻道做消息傳遞。好了,接下來準備開始展示具體實現(xiàn)的代碼了。
一對一發(fā)送
首先介紹的就是一對一發(fā)送的情況,也就是我把消息發(fā)給你,聊天的時候私聊的情況。這里呢涉及到兩種情況
- 如果你需要通信的客戶端和你連接在一個Server端里,這樣的話可以直接在鏈接里找到這個端的通信實例直接發(fā)送。
- 如果你需要通信的客戶端和你不在一個Server端里,這個時候咱們就需要借助redis的
pub/sub
的功能,把消息傳遞給另一個Server端。
咱們通過一張圖大致的展示一下它的工作方式
解釋一下,每個客戶端注冊到WebSocket
服務(wù)里的時候會在redis里訂閱一個user:用戶唯一標識
的頻道,這個頻道用于接收和當(dāng)前WebSocket連接不在一個服務(wù)端的其他WebSocket發(fā)送過來的消息。
每次發(fā)送消息的時候你會知道你要發(fā)送給誰,不在當(dāng)前服務(wù)器的話則發(fā)送到redis的user:用戶唯一標識
頻道,這樣的話目標WebSocket就能收到消息了。
首先是注入相關(guān)的依賴項,這里我使用的redis客戶端是freeredis
,主要是因為操作起來簡單,具體實現(xiàn)代碼如下
var builder = WebApplication.CreateBuilder(args); //注冊freeredis builder.Services.AddSingleton(provider => { var logger = provider.GetService<ILogger<WebSocketChannelHandler>>(); RedisClient cli = new RedisClient("127.0.0.1:6379"); cli.Notice += (s, e) => logger?.LogInformation(e.Log); return cli; }); //注冊WebSocket具體操作的類 builder.Services.AddSingleton<WebSocketHandler>(); builder.Services.AddControllers(); var app = builder.Build(); var webSocketOptions = new WebSocketOptions { KeepAliveInterval = TimeSpan.FromMinutes(2) }; //注冊WebSocket中間件 app.UseWebSockets(webSocketOptions); app.MapGet("/", () => "Hello World!"); app.MapControllers(); app.Run();
接下來我們定義一個Controller用來處理WebSocket請求
public class WebSocketController : ControllerBase { private readonly ILogger<WebSocketController> _logger; private readonly WebSocketHandler _socketHandler; public WebSocketController(ILogger<WebSocketController> logger, WebSocketHandler socketHandler, WebSocketChannelHandler webSocketChannelHandler) { _logger = logger; _socketHandler = socketHandler; } //這里的id代表當(dāng)前連接的客戶端唯一標識比如用戶唯一標識 [HttpGet("/chat/user/{id}")] public async Task ChatUser(string id) { //判斷是否是WebSocket請求 if (HttpContext.WebSockets.IsWebSocketRequest) { _logger.LogInformation($"user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join"); var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync(); //處理請求相關(guān) await _socketHandler.Handle(id, webSocket); } else { HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest; } } }
這里的WebSocketHandler是用來處理具體邏輯用的,咱們看一下相關(guān)代碼
public class WebSocketHandler:IDisposable { //存儲當(dāng)前服務(wù)用戶的集合 private readonly UserConnection UserConnection = new(); //redis頻道前綴 private readonly string userPrefix = "user:"; //用戶對應(yīng)的redis頻道 private readonly ConcurrentDictionary<string, IDisposable> _disposables = new(); private readonly ILogger<WebSocketHandler> _logger; //redis客戶端 private readonly RedisClient _redisClient; public WebSocketHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient) { _logger = logger; _redisClient = redisClient; } public async Task Handle(string id, WebSocket webSocket) { //把當(dāng)前用戶連接存儲起來 _ = UserConnection.GetOrAdd(id, webSocket); //訂閱一個當(dāng)前用戶的頻道 await SubMsg($"{userPrefix}{id}"); var buffer = new byte[1024 * 4]; //接收發(fā)送過來的消息,這個方法是阻塞的,如果沒收到消息則一直阻塞 var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None); //循環(huán)接收消息 while (webSocket.State == WebSocketState.Open) { try { //因為緩沖區(qū)長度是固定的所以要獲取實際長度 string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('\0'); //接收的到消息轉(zhuǎn)換成實體 MsgBody msgBody = JsonConvert.DeserializeObject<MsgBody>(msg); //發(fā)送到其他客戶端的數(shù)據(jù) byte[] sendByte = Encoding.UTF8.GetBytes($"user {id} send:{msgBody.Msg}"); _logger.LogInformation($"user {id} send:{msgBody.Msg}"); //判斷目標客戶端是否在當(dāng)前當(dāng)前服務(wù),如果在當(dāng)前服務(wù)直接扎到目標連接直接發(fā)送 if (UserConnection.TryGetValue(msgBody.Id, out var targetSocket)) { if (targetSocket.State == WebSocketState.Open) { await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), receiveResult.MessageType, true, CancellationToken.None); } } else { //如果要發(fā)送的目標端不在當(dāng)前服務(wù),則發(fā)送給目標redis端的頻道 ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, ToId = msgBody.Id, Msg = msgBody.Msg }; //目標的redis頻道 _redisClient.Publish($"{userPrefix}{msgBody.Id}", JsonConvert.SerializeObject(channelMsgBody)); } //繼續(xù)阻塞循環(huán)接收消息 receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None); } catch (Exception ex) { _logger.LogError(ex, ex.Message); break; } } //循環(huán)結(jié)束意味著當(dāng)前端已經(jīng)退出 //從當(dāng)前用戶的集合移除當(dāng)前用戶 _ = UserConnection.TryRemove(id, out _); //關(guān)閉當(dāng)前WebSocket連接 await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None); //在當(dāng)前訂閱集合移除當(dāng)前用戶 _disposables.TryRemove($"{userPrefix}{id}", out var disposable); //關(guān)閉當(dāng)前用戶的通道 disposable.Dispose(); } private async Task SubMsg(string channel) { //訂閱當(dāng)前用戶頻道 var sub = _redisClient.Subscribe(channel, async (channel, data) => { //接收過來當(dāng)前頻道數(shù)據(jù),說明發(fā)送端不在當(dāng)前服務(wù) ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString()); byte[] sendByte = Encoding.UTF8.GetBytes($"user {msgBody.FromId} send:{msgBody.Msg}"); //在當(dāng)前服務(wù)找到目標的WebSocket連接并發(fā)送消息 if (UserConnection.TryGetValue(msgBody.ToId, out var targetSocket)) { if (targetSocket.State == WebSocketState.Open) { await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None); } } }); //把redis訂閱頻道添加到集合中 _disposables.TryAdd(channel, sub); } //程序退出的時候取消當(dāng)前服務(wù)訂閱的redis頻道 public void Dispose() { foreach (var disposable in _disposables) { disposable.Value.Dispose(); } _disposables.Clear(); } }
這里涉及到幾個輔助相關(guān)的類,其中UserConnection
類是存儲注冊到當(dāng)前服務(wù)的連接,MsgBody
類用來接受客戶端發(fā)送過來的消息,ChannelMsgBody
是用來發(fā)送redis頻道的相關(guān)消息,因為要把相關(guān)消息通過redis發(fā)布出去,咱們列一下這幾個類的相關(guān)代碼
//注冊到當(dāng)前服務(wù)的連接 public class UserConnection : IEnumerable<KeyValuePair<string, WebSocket>> { //存儲用戶唯一標識和WebSocket的對應(yīng)關(guān)系 private ConcurrentDictionary<string, WebSocket> _users = new ConcurrentDictionary<string, WebSocket>(); //當(dāng)前服務(wù)的用戶數(shù)量 public int Count => _users.Count; public WebSocket GetOrAdd(string userId, WebSocket webSocket) { return _users.GetOrAdd(userId, webSocket); } public bool TryGetValue(string userId, out WebSocket webSocket) { return _users.TryGetValue(userId, out webSocket); } public bool TryRemove(string userId, out WebSocket webSocket) { return _users.TryRemove(userId, out webSocket); } public void Clear() { _users.Clear(); } public IEnumerator<KeyValuePair<string, WebSocket>> GetEnumerator() { return _users.GetEnumerator(); } IEnumerator IEnumerable.GetEnumerator() { return this.GetEnumerator(); } } //客戶端消息 public class MsgBody { //目標用戶標識 public string Id { get; set; } //要發(fā)送的消息 public string Msg { get; set; } } //頻道訂閱消息 public class ChannelMsgBody { //用戶標識 public string FromId { get; set; } //目標用戶標識,也就是要發(fā)送給誰 public string ToId { get; set; } //要發(fā)送的消息 public string Msg { get; set; } }
這樣的話關(guān)于一對一發(fā)送消息的相關(guān)邏輯就實現(xiàn)完成了,啟動兩個Server端,由于nginx默認的負載均衡策略是輪詢,所以注冊兩個用戶的話會被分發(fā)到不同的服務(wù)里去
用Postman
連接三個連接唯一標識分別是1、2、3
,模擬一下消息發(fā)送,效果如下,發(fā)送效果
接收效果
群組發(fā)送
上面我們展示了一對一發(fā)送的情況,接下來我們來看一下,群組發(fā)送的情況。群組發(fā)送的話就是只要大家都加入一個群組,只要客戶端在群組里發(fā)送一條消息,則注冊到當(dāng)前群組內(nèi)的所有客戶端都可以收到消息。相對于一對一的情況就是如果當(dāng)前WebSocket服務(wù)端如果存在用戶加入某個群組,則當(dāng)前當(dāng)前WebSocket服務(wù)端則可以訂閱一個group:群組唯一標識
的redis頻道,集群中的其他WebSocket服務(wù)器通過這個redis頻道接收群組消息,通過一張圖描述一下
群組的實現(xiàn)方式相對于一對一要簡單一點
- 發(fā)送端可以不用考慮當(dāng)前服務(wù)中的客戶端連接,一股腦的交給redis把消息發(fā)布出去
- 如果有WebSocket服務(wù)中的用戶訂閱了當(dāng)前分組則可以接受消息,獲取組內(nèi)的用戶循環(huán)發(fā)送消息
展示一下代碼實現(xiàn)的方式,首先是定義一個action用于表示群組的相關(guān)場景
//包含兩個標識一個是組別標識一個是注冊到組別的用戶 [HttpGet("/chat/group/{groupId}/{userId}")] public async Task ChatGroup(string groupId, string userId) { if (HttpContext.WebSockets.IsWebSocketRequest) { _logger.LogInformation($"group:{groupId} user:{userId}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join"); var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync(); //調(diào)用HandleGroup處理群組相關(guān)的消息 await _socketHandler.HandleGroup(groupId, userId, webSocket); } else { HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest; } }
接下來看一下HandleGroup的相關(guān)邏輯,還是在WebSocketHandler類中,看一下代碼實現(xiàn)
public class WebSocketHandler:IDisposable { private readonly UserConnection UserConnection = new(); private readonly GroupUser GroupUser = new(); private readonly SemaphoreSlim _lock = new(1, 1); private readonly ConcurrentDictionary<string, IDisposable> _disposables = new(); private readonly string groupPrefix = "group:"; private readonly ILogger<WebSocketHandler> _logger; private readonly RedisClient _redisClient; public WebSocketHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient) { _logger = logger; _redisClient = redisClient; } public async Task HandleGroup(string groupId, string userId, WebSocket webSocket) { //因為群組的集合可能會存在很多用戶一起訪問所以限制訪問數(shù)量 await _lock.WaitAsync(); //初始化群組容器 群唯一標識為key 群員容器為value var currentGroup = GroupUser.Groups.GetOrAdd(groupId, new UserConnection { }); //當(dāng)前用戶加入當(dāng)前群組 _ = currentGroup.GetOrAdd(userId, webSocket); //只有有當(dāng)前WebSocket服務(wù)的第一個加入當(dāng)前組的時候才去訂閱群組頻道 //如果不限制的話則會出現(xiàn)如果當(dāng)前WebSocket服務(wù)有多個用戶在一個組內(nèi)則會重復(fù)收到redis消息 if (currentGroup.Count == 1) { //訂閱redis頻道 await SubGroupMsg($"{groupPrefix}{groupId}"); } _lock.Release(); var buffer = new byte[1024 * 4]; //阻塞接收WebSocket消息 var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None); //服務(wù)不退出的話則一直等待接收 while (webSocket.State == WebSocketState.Open) { try { string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('\0'); _logger.LogInformation($"group 【{groupId}】 user 【{userId}】 send:{msg}"); //組裝redis頻道發(fā)布的消息,目標為群組標識 ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = userId, ToId = groupId, Msg = msg }; //通過redis發(fā)布消息 _redisClient.Publish($"{groupPrefix}{groupId}", JsonConvert.SerializeObject(channelMsgBody)); receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None); } catch (Exception ex) { _logger.LogError(ex, ex.Message); break; } } //如果客戶端退出則在當(dāng)前群組集合刪除當(dāng)前用戶 _ = currentGroup.TryRemove(userId, out _); await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None); } private async Task SubGroupMsg(string channel) { var sub = _redisClient.Subscribe(channel, async (channel, data) => { ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString()); byte[] sendByte = Encoding.UTF8.GetBytes($"group 【{msgBody.ToId}】 user 【{msgBody.FromId}】 send:{msgBody.Msg}"); //在當(dāng)前WebSocket服務(wù)器找到當(dāng)前群組里的用戶 GroupUser.Groups.TryGetValue(msgBody.ToId, out var currentGroup); //循環(huán)當(dāng)前WebSocket服務(wù)器里的用戶發(fā)送消息 foreach (var user in currentGroup) { //不用給自己發(fā)送了 if (user.Key == msgBody.FromId) { continue; } if (user.Value.State == WebSocketState.Open) { await user.Value.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None); } } }); //把當(dāng)前頻道加入訂閱集合 _disposables.TryAdd(channel, sub); } }
這里涉及到了GroupUser
類,是來存儲群組和群組用戶的對應(yīng)關(guān)系的,定義如下
public class GroupUser { //key為群組的唯一標識 public ConcurrentDictionary<string, UserConnection> Groups = new ConcurrentDictionary<string, UserConnection>(); }
演示一下把兩個用戶添加到一個群組內(nèi),然后發(fā)送接收消息的場景,用戶u1發(fā)送
用戶u2接收
發(fā)送所有人
發(fā)送給所有用戶的邏輯比較簡單,不用考慮到用戶限制,只要用戶連接到了WebSocket集群則都可以接收到這個消息,大致工作方式如下圖所示
這個比較簡單,咱們直接看實現(xiàn)代碼,首先是定義一個地址,用于發(fā)布消息
//把用戶注冊進去 [HttpGet("/chat/all/{id}")] public async Task ChatAll(string id) { if (HttpContext.WebSockets.IsWebSocketRequest) { _logger.LogInformation($"all user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join"); var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync(); await _socketHandler.HandleAll(id, webSocket); } else { HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest; } }
具體的實現(xiàn)邏輯還是在HandleGroup類里,是HandleAll方法,看一下具體實現(xiàn)
public class WebSocketHandler:IDisposable { private readonly UserConnection AllConnection = new(); private readonly ConcurrentDictionary<string, IDisposable> _disposables = new(); private readonly string all = "all"; private readonly ILogger<WebSocketHandler> _logger; private readonly RedisClient _redisClient; public WebSocketHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient) { _logger = logger; _redisClient = redisClient; } public async Task HandleAll(string id, WebSocket webSocket) { await _lock.WaitAsync(); //把用戶加入用戶集合 _ = AllConnection.GetOrAdd(id, webSocket); //WebSocket集群中的每個服務(wù)只定義一次 if (AllConnection.Count == 1) { await SubAllMsg(all); } _lock.Release(); var buffer = new byte[1024 * 4]; //阻塞接收信息 var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None); while (webSocket.State == WebSocketState.Open) { try { string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('\0'); _logger.LogInformation($"user {id} send:{msg}"); //獲取接收信息 ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, Msg = msg }; //把消息通過redis發(fā)布到集群中的其他服務(wù) _redisClient.Publish(all, JsonConvert.SerializeObject(channelMsgBody)); receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None); } catch (Exception ex) { _logger.LogError(ex, ex.Message); break; } } //用戶退出則刪除集合中的當(dāng)前用戶信息 _ = AllConnection.TryRemove(id, out _); await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None); } private async Task SubAllMsg(string channel) { var sub = _redisClient.Subscribe(channel, async (channel, data) => { ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString()); byte[] sendByte = Encoding.UTF8.GetBytes($"user 【{msgBody.FromId}】 send all:{msgBody.Msg}"); //接收到消息后遍歷用戶集合把消息發(fā)送給所有用戶 foreach (var user in AllConnection) { //如果包含當(dāng)前用戶跳過 if (user.Key == msgBody.FromId) { continue; } if (user.Value.State == WebSocketState.Open) { await user.Value.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None); } } }); _disposables.TryAdd(channel, sub); } }
效果在這里就不展示了,和群組的效果是類似的,只是一個是部分用戶,一個是全部的用戶。
整合到一起
上面我們分別展示了一對一、群組、所有人的場景,但是實際使用的時候,每個用戶只需要注冊到WebSocket集群一次也就是保持一個連接即可,而不是一對一一個連接、注冊群組一個連接、所有消息的時候一個連接。所以我們需要把上面的演示整合一下,一個用戶只需要連接到WebSocket集群一次即可,至于發(fā)送給誰,加入什么群組,接收全部消息等都是連接后通過一些標識區(qū)分的,而不必每個類型的操作都注冊一次,就和微信和QQ一樣我只要登錄了即可,至于其他操作都是靠數(shù)據(jù)標識區(qū)分的。接下來咱們就整合一下代碼達到這個效果,大致的思路是
- 用戶連接到WebSocket集群,把用戶和連接保存到當(dāng)前WebSocket服務(wù)器的用戶集合中去。
- 一對一發(fā)送的時候,只需要在具體的服務(wù)器中找到具體的客戶端發(fā)送消息
- 群組的時候,先把當(dāng)前用戶標識加入群組集合即可,接收消息的時候根據(jù)群組集合里的用戶標識去用戶集合里去拿具體的WebSocket連接發(fā)送消息
- 全員消息的時候,直接遍歷集群中的每個WebSocket服務(wù)里的用戶集合里的WebSocket連接訓(xùn)話發(fā)送消息
這樣的話就保證了每個客戶端用戶在集群中只會綁定一個連接,首先還是單獨定義一個action,用于讓客戶端用戶連接上來,具體實現(xiàn)代碼如下所示
public class WebSocketChannelController : ControllerBase { private readonly ILogger<WebSocketController> _logger; private readonly WebSocketChannelHandler _webSocketChannelHandler; public WebSocketChannelController(ILogger<WebSocketController> logger, WebSocketChannelHandler webSocketChannelHandler) { _logger = logger; _webSocketChannelHandler = webSocketChannelHandler; } //只需要把當(dāng)前用戶連接到服務(wù)即可 [HttpGet("/chat/channel/{id}")] public async Task Channel(string id) { if (HttpContext.WebSockets.IsWebSocketRequest) { _logger.LogInformation($"user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join"); var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync(); await _webSocketChannelHandler.HandleChannel(id, webSocket); } else { HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest; } } }
接下來看一下WebSocketChannelHandler類的HandleChannel方法實現(xiàn),用于處理不同的消息,比如一對一、群組、全員消息等不同類型的消息
public class WebSocketChannelHandler : IDisposable { //用于存儲當(dāng)前WebSocket服務(wù)器鏈接上來的所有用戶對應(yīng)關(guān)系 private readonly UserConnection UserConnection = new(); //用于存儲群組和用戶關(guān)系,用戶集合采用HashSet保證每個用戶只加入一個群組一次 private readonly ConcurrentDictionary<string, HashSet<string>> GroupUser = new ConcurrentDictionary<string, HashSet<string>>(); private readonly SemaphoreSlim _lock = new(1, 1); //存放redis訂閱實例 private readonly ConcurrentDictionary<string, IDisposable> _disposables = new(); //一對一redis頻道前綴 private readonly string userPrefix = "user:"; //群組redis頻道前綴 private readonly string groupPrefix = "group:"; //全員redis頻道 private readonly string all = "all"; private readonly ILogger<WebSocketHandler> _logger; private readonly RedisClient _redisClient; public WebSocketChannelHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient) { _logger = logger; _redisClient = redisClient; } public async Task HandleChannel(string id, WebSocket webSocket) { await _lock.WaitAsync(); //每次連接進來就添加到用戶集合 _ = UserConnection.GetOrAdd(id, webSocket); //每個WebSocket服務(wù)實例只需要訂閱一次全員消息頻道 await SubMsg($"{userPrefix}{id}"); if (UserConnection.Count == 1) { await SubAllMsg(all); } _lock.Release(); var buffer = new byte[1024 * 4]; //接收客戶端消息 var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None); while (webSocket.State == WebSocketState.Open) { try { string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('\0'); //讀取客戶端消息 ChannelData channelData = JsonConvert.DeserializeObject<ChannelData>(msg); //判斷消息類型 switch (channelData.Method) { //一對一 case "One": await HandleOne(id, channelData.MsgBody, receiveResult); break; //把用戶加入群組 case "UserGroup": await AddUserGroup(id, channelData.Group, webSocket); break; //處理群組消息 case "Group": await HandleGroup(channelData.Group, id, webSocket, channelData.MsgBody); break; //處理全員消息 default: await HandleAll(id, channelData.MsgBody); break; } receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None); } catch (Exception ex) { _logger.LogError(ex, ex.Message); break; } } await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None); //在群組中移除當(dāng)前用戶 foreach (var users in GroupUser.Values) { lock (users) { users.Remove(id); } } //當(dāng)前客戶端用戶退出則移除連接 _ = UserConnection.TryRemove(id, out _); //取消用戶頻道訂閱 _disposables.Remove($"{userPrefix}{id}", out var sub); sub?.Dispose(); } public void Dispose() { foreach (var disposable in _disposables) { disposable.Value.Dispose(); } _disposables.Clear(); } }
這里涉及到了ChannelData
類是用于接收客戶端消息的類模板,具體定義如下
public class ChannelData { //消息類型 比如一對一 群組 全員 public string Method { get; set; } //群組標識 public string Group { get; set; } //消息體 public object MsgBody { get; set; } }
類中并不會包含當(dāng)前用戶信息,因為連接到當(dāng)前服務(wù)的時候已經(jīng)提供了客戶端唯一標識。結(jié)合上面的處理代碼我們可以看出,客戶端用戶連接到WebSocket實例之后,先注冊當(dāng)前用戶的redis訂閱頻道并且當(dāng)前實例僅注冊一次全員消息的redis頻道,用于處理非當(dāng)前實例注冊客戶端的一對一消息處理和全員消息處理,然后等待接收客戶端消息,根據(jù)客戶端消息的消息類型來判斷是進行一對一、群組、或者全員的消息類型處理,它的工作流程入下圖所示
由代碼和上面的流程圖可知,它根據(jù)不同的標識去處理不同類型的消息,接下來我們可以看下每種消息類型的處理方式。
一對一處理
首先是一對一的消息處理情況,看一下具體的處理邏輯,首先是一對一發(fā)布消息
private async Task HandleOne(string id, object msg, WebSocketReceiveResult receiveResult) { MsgBody msgBody = JsonConvert.DeserializeObject<MsgBody>(JsonConvert.SerializeObject(msg)); byte[] sendByte = Encoding.UTF8.GetBytes($"user {id} send:{msgBody.Msg}"); _logger.LogInformation($"user {id} send:{msgBody.Msg}"); //判斷目標用戶是否在當(dāng)前WebSocket服務(wù)器 if (UserConnection.TryGetValue(msgBody.Id, out var targetSocket)) { if (targetSocket.State == WebSocketState.Open) { await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), receiveResult.MessageType, true, CancellationToken.None); } } else { //如果不在當(dāng)前服務(wù)器,則直接把消息發(fā)布到具體的用戶頻道去,由具體用戶去訂閱 ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, ToId = msgBody.Id, Msg = msgBody.Msg }; _redisClient.Publish($"{userPrefix}{msgBody.Id}", JsonConvert.SerializeObject(channelMsgBody)); } }
接下來是用于處理訂閱其他用戶發(fā)送過來消息的邏輯,這個和整合之前的邏輯是一致的,在當(dāng)前服務(wù)器中找到用戶對應(yīng)的連接,發(fā)送消息
private async Task SubMsg(string channel) { var sub = _redisClient.Subscribe(channel, async (channel, data) => { ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString()); byte[] sendByte = Encoding.UTF8.GetBytes($"user {msgBody.FromId} send:{msgBody.Msg}"); if (UserConnection.TryGetValue(msgBody.ToId, out var targetSocket)) { if (targetSocket.State == WebSocketState.Open) { await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None); } else { _ = UserConnection.TryRemove(msgBody.FromId, out _); } } }); //把訂閱實例加入集合 _disposables.TryAdd(channel, sub); }
如果給某個用戶發(fā)送消息則可以使用如下的消息格式
{"Method":"One", "MsgBody":{"Id":"2","Msg":"Hello"}}
Method為One代表著是私聊一對一的情況,消息體內(nèi)Id為要發(fā)送給的具體用戶標識和消息體。
群組處理
接下來看群組處理方式,這個和之前的邏輯是有出入的,首先是用戶要先加入到某個群組然后才能接收群組消息或者在群組中發(fā)送消息,之前是一個用戶對應(yīng)多個連接,整合了之后集群中每個用戶只關(guān)聯(lián)唯一的一個WebSocket連接,首先看用戶加入群組的邏輯
private async Task AddUserGroup(string user, string group, WebSocket webSocket) { //獲取群組信息 var currentGroup = GroupUser.GetOrAdd(group, new HashSet<string>()); lock (currentGroup) { //把用戶標識加入當(dāng)前組 _ = currentGroup.Add(user); } //每個組的redis頻道,在每臺WebSocket服務(wù)器實例只注冊一次訂閱 if (currentGroup.Count == 1) { //訂閱當(dāng)前組消息 await SubGroupMsg($"{groupPrefix}{group}"); } string addMsg = $"user 【{user}】 add to group 【{group}】"; byte[] sendByte = Encoding.UTF8.GetBytes(addMsg); await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None); //如果有用戶加入群組,則通知其他群成員 ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = user, ToId = group, Msg = addMsg }; _redisClient.Publish($"{groupPrefix}{group}", JsonConvert.SerializeObject(channelMsgBody)); }
用戶想要在群組內(nèi)發(fā)消息,則必須先加入到一個具體的群組內(nèi),具體的加入群組的格式如下
{"Method":"UserGroup", "Group":"g1"}
Method為UserGroup代表著用戶加入群組的業(yè)務(wù)類型,Group代表著你要加入的群組唯一標識。接下來就看下,用戶發(fā)送群組消息的邏輯了
private async Task HandleGroup(string groupId, string userId, WebSocket webSocket, object msgBody) { //判斷群組是否存在 var hasValue = GroupUser.TryGetValue(groupId, out var users); if (!hasValue) { byte[] sendByte = Encoding.UTF8.GetBytes($"group【{groupId}】 not exists"); await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None); return; } //只有加入到當(dāng)前群組,才能在群組內(nèi)發(fā)送消息 if (!users.Contains(userId)) { byte[] sendByte = Encoding.UTF8.GetBytes($"user 【{userId}】 not in 【{groupId}】"); await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None); return; } _logger.LogInformation($"group 【{groupId}】 user 【{userId}】 send:{msgBody}"); //發(fā)送群組消息 ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = userId, ToId = groupId, Msg = msgBody.ToString() }; _redisClient.Publish($"{groupPrefix}{groupId}", JsonConvert.SerializeObject(channelMsgBody)); }
加入群組之后則可以發(fā)送和接收群組內(nèi)的消息了,給群組發(fā)送消息的格式如下
{"Method":"Group", "Group":"g1", "MsgBody":"Hi All"}
Method為Group代表著用戶加入群組的業(yè)務(wù)類型,Group則代表你要發(fā)送到具體的群組的唯一標識,MsgBody則是發(fā)送到群組內(nèi)的消息。最后再來看下訂閱群組內(nèi)消息的情況,也就是處理群組消息的邏輯
private async Task SubGroupMsg(string channel) { var sub = _redisClient.Subscribe(channel, async (channel, data) => { //接收群組訂閱消息 ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString()); byte[] sendByte = Encoding.UTF8.GetBytes($"group 【{msgBody.ToId}】 user 【{msgBody.FromId}】 send:{msgBody.Msg}"); //獲取當(dāng)前服務(wù)器實例中當(dāng)前群組的所有用戶連接 GroupUser.TryGetValue(msgBody.ToId, out var currentGroup); foreach (var user in currentGroup) { if (user == msgBody.FromId) { continue; } //通過群組內(nèi)的用戶標識去用戶集合獲取用戶集合里的用戶唯一連接發(fā)送消息 if (UserConnection.TryGetValue(user, out var targetSocket) && targetSocket.State == WebSocketState.Open) { await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None); } else { currentGroup.Remove(user); } } }); _disposables.TryAdd(channel, sub); }
全員消息處理
全員消息處理相對來說思路比較簡單,因為當(dāng)服務(wù)啟動的時候就會監(jiān)聽redis的全員消息頻道,這樣的話具體的實現(xiàn)也就只包含發(fā)送和接收全員消息了,首先看一下全員消息發(fā)送的邏輯
private async Task HandleAll(string id, object msgBody) { _logger.LogInformation($"user {id} send:{msgBody}"); //直接給redis的全員頻道發(fā)送消息 ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, Msg = msgBody.ToString() }; _redisClient.Publish(all, JsonConvert.SerializeObject(channelMsgBody)); }
全員消息的發(fā)送數(shù)據(jù)格式如下所示
{"Method":"All", "MsgBody":"Hello All"}
Method為All代表著全員消息類型,MsgBody則代表著具體消息。接收消息出里同樣很簡單,訂閱redis全員消息頻道,然后遍歷當(dāng)前WebSocket服務(wù)器實例內(nèi)的所有用戶獲取連接發(fā)送消息,具體邏輯如下
private async Task SubAllMsg(string channel) { var sub = _redisClient.Subscribe(channel, async (channel, data) => { ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString()); byte[] sendByte = Encoding.UTF8.GetBytes($"user 【{msgBody.FromId}】 send all:{msgBody.Msg}"); //獲取當(dāng)前服務(wù)器實例內(nèi)所有用戶的連接 foreach (var user in UserConnection) { //不給自己發(fā)送消息,因為發(fā)送的時候可以通過具體的業(yè)務(wù)代碼處理 if (user.Key == msgBody.FromId) { continue; } //給每個用戶發(fā)送消息 if (user.Value.State == WebSocketState.Open) { await user.Value.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None); } else { _ = UserConnection.TryRemove(user.Key, out _); } } }); _disposables.TryAdd(channel, sub); }
示例源碼
由于篇幅有限,沒辦法設(shè)計到全部的相關(guān)源碼,因此在這里貼出來github
相關(guān)的地址,方便大家查看和運行源碼。相關(guān)的源碼我這里實現(xiàn)了兩個版本,一個是基于asp.net core的版本,一個是基于golang的版本。兩份源碼的實現(xiàn)思路是一致的,所以這兩份代碼可以運行在一套集群示例里,配置在一套nginx里,并且連接到同一個redis實例里即可
asp.net core
源碼示例https://github.com/softlgl/WebsocketClustergolang
源碼示例https://github.com/softlgl/websocket-cluster
倉庫里還涉及到本人閑暇之余開源的其他倉庫,由于本人能力有限難登大雅之堂,就不做廣告了,有興趣的同學(xué)可以自行瀏覽一下。
總結(jié)
本文基于ASP.NET Core
框架提供了一個基于WebSocket
做集群的示例,由于思想是通用的,所以基于這個思路樓主也實現(xiàn)了golang
版本。其實在之前就想自己動手搞一搞關(guān)于WebSocket集群方面的設(shè)計,本篇文章算是對之前想法的一個落地操作。其核心思路文章已經(jīng)做了相關(guān)介紹,由于這些只是博主關(guān)于構(gòu)思的實現(xiàn),可能有很多細節(jié)尚未體現(xiàn)到,還希望大家多多理解。其核心思路總結(jié)一下
- 首先是,利用可以構(gòu)建WebSocket服務(wù)的框架,在當(dāng)前服務(wù)實例中保存當(dāng)前客戶端用戶和WebSocket的連接關(guān)系
- 如果消息的目標客戶端不在當(dāng)前服務(wù)器,可以利用redis頻道、消息隊列相關(guān)、甚至是數(shù)據(jù)庫類的共享回話發(fā)送的消息,由目標服務(wù)器獲取目標是否屬于自己的ws會話
- 本文設(shè)計的思路使用的是無狀態(tài)的方式,即WebSocket服務(wù)實例之間不存在直接的消息通信和相互的服務(wù)地址存儲,當(dāng)然也可以利用redis等存儲在線用戶信息等,這個可以參考具體業(yè)務(wù)自行設(shè)計
讀萬卷書,行萬里路。在這個時刻都在變化點的環(huán)境里,唯有不斷的進化自己,多接觸多嘗試不用的事物,多擴展自己的認知思維,方能構(gòu)建自己的底層邏輯。畢竟越底層越抽象,越通用越抽象。面對未知的挑戰(zhàn),自身作為自己堅強的后盾,可能才會讓自己更踏實。
以上就是ASP.NET Core WebSocket集群實現(xiàn)思路詳解的詳細內(nèi)容,更多關(guān)于ASP.NET Core WebSocket的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
ASP.NET Core如何注入多個服務(wù)實現(xiàn)類
這篇文章主要介紹了ASP.NET Core如何注入多個服務(wù)實現(xiàn)類的相關(guān)資料,需要的朋友可以參考下面文章的具體內(nèi)容2021-09-09利用ASP.NET技術(shù)動態(tài)生成HTML頁面
利用ASP.NET技術(shù)動態(tài)生成HTML頁面...2006-07-07利用.NET 開發(fā)服務(wù)器 應(yīng)用管理工具
這篇文章主要介紹如何利用.NET 開發(fā)一個應(yīng)用管理工具的服務(wù)器,文章回先聊背景接著其是喲美好方法,需要的的小伙伴可以參考一下小面文章的具體內(nèi)容2021-10-10在?ASP.NET?Core?中為?gRPC?服務(wù)添加全局異常處理
這篇文章主要介紹了在?ASP.NET?Core?中為?gRPC?服務(wù)添加全局異常處理?,在?ASP.NET?Core?中使用?GRPC.ASPNETCore?工具包寫?gRPC?服務(wù),想實現(xiàn)?gRPC?的異常全局攔截,下面一起來看看文中的詳細內(nèi)容吧2022-01-01.NET??Smobiler的復(fù)雜控件的由來與創(chuàng)造
這篇文章主要介紹了.NET Smobiler的復(fù)雜控件的由來與創(chuàng)造,Smobiler的復(fù)雜控件即利用自定義控件的方式組合控件,來使控件成為一個有機整體,里面的控件可相互協(xié)作交互,并使其達到高可用2022-08-08