C#使用Pipelines實(shí)現(xiàn)處理Socket數(shù)據(jù)包
寫在前面
在上一篇中對(duì)Pipelines進(jìn)行簡(jiǎn)單的了解,同時(shí)也留下了未解的問題,如何將Pipelines類庫(kù)運(yùn)用到Socket通訊過程中來解決粘包和分包。鏈接地址如下:C#中System.IO.Pipelines庫(kù)的使用詳解
這一篇做了一個(gè)完整的demo,使用Pipelines接收和處理來自多個(gè)客戶端發(fā)出的消息;相對(duì)于以往在報(bào)文包頭放包體長(zhǎng)度再結(jié)合結(jié)束符來判斷的方式,確實(shí)要簡(jiǎn)潔了許多。
代碼實(shí)現(xiàn)
服務(wù)端實(shí)現(xiàn)
using System.Net.Sockets; using System.Net; using System.Text; class Program { static async Task Main() { SocketServerForPiplines(); } static async void SocketServerForPiplines() { Console.WriteLine("Socket Server"); // 創(chuàng)建服務(wù)端Socket對(duì)象 var serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); serverSocket.Bind(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 9090)); serverSocket.ReceiveTimeout = 1000; serverSocket.SendTimeout = 1000; serverSocket.Listen(1000); Console.WriteLine("服務(wù)端啟動(dòng)監(jiān)聽"); while (true) { var clientSocket = serverSocket.Accept(); Console.WriteLine("有客戶端連上了"); var handler = new PiplinesHandler(clientSocket); await handler.StartReceiveAsync(); } Console.ReadLine(); } }
PiplinesHandler類
using System; using System.Buffers; using System.Collections.Generic; using System.IO.Pipelines; using System.Linq; using System.Net.Sockets; using System.Text; using System.Threading.Tasks; namespace PipelinesTester { public class PiplinesHandler { private const int _minimumBufferSize = 512; private Socket _socket; private Pipe _pipe; public PiplinesHandler(Socket socket) { _socket = socket; var options = new PipeOptions(pauseWriterThreshold: 4096, resumeWriterThreshold: 1024); _pipe = new Pipe(options); } public async Task StartReceiveAsync() { Task receiveTask = ReceiveMessageAsync(); Task processTask = ProcessMessageAsync(); await Task.WhenAll(receiveTask, processTask); } private async Task ReceiveMessageAsync() { PipeWriter writer = _pipe.Writer; while (true) { try { //從writer申請(qǐng)緩沖區(qū) Memory<byte> memory = writer.GetMemory(_minimumBufferSize); //從socket讀取數(shù)據(jù),直接寫入到緩沖區(qū)中,即直接寫入了PipeWriter中 int bytesRead = await _socket.ReceiveAsync(memory, SocketFlags.None); if (bytesRead == 0) { break; } //前移寫標(biāo)志位 writer.Advance(bytesRead); //通知Reader,可以讀取了 var result = await writer.FlushAsync(); if (result.IsCompleted) break; } catch (Exception e) { Console.WriteLine(e); break; } } await writer.CompleteAsync(); try { _socket.Shutdown(SocketShutdown.Both); _socket.Close(); } catch (Exception e) { Console.WriteLine(e); } } private async Task ProcessMessageAsync() { PipeReader _pipeReader = _pipe.Reader; while (true) { //讀取消息 var result = await _pipeReader.ReadAsync(); var buffer = result.Buffer; //查找結(jié)束符 SequencePosition? position = buffer.PositionOf((byte)'\n'); if (position == null) { continue; } // 處理消息 var line = buffer.Slice(0, position.Value); string msg = Encoding.UTF8.GetString(line); Console.WriteLine(msg); // 前移PipeReader buffer = buffer.Slice(buffer.GetPosition(1, position.Value)); _pipeReader.AdvanceTo(buffer.Start, buffer.End); // Stop reading if there's no more data coming. if (result.IsCompleted) { break; } } await _pipeReader.CompleteAsync(); } } }
客戶端實(shí)現(xiàn)
using System; using System.Net; using System.Net.Sockets; using System.Text; class Program { static void Main(string[] args) { TcpClientTest(); } static void TcpClientTest() { Console.WriteLine("TcpClient"); var msg = $"這是來自客戶端的消息{DateTime.Now.ToString("yyyy-MM-dd:HH:mm:ss")}\n"; var client = new TcpClient("127.0.0.1", 9090); var sendStream = client.GetStream(); var sendBytes = Encoding.Default.GetBytes(msg); sendStream.Write(sendBytes, 0, sendBytes.Length); sendStream.Flush(); sendStream.Close();//關(guān)閉網(wǎng)絡(luò)流 client.Close();//關(guān)閉客戶端 Console.WriteLine(msg); Console.ReadLine(); } }
調(diào)用示例
到此這篇關(guān)于C#使用Pipelines實(shí)現(xiàn)處理Socket數(shù)據(jù)包的文章就介紹到這了,更多相關(guān)C# Pipelines處理Socket內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
C#中的delegate委托類型基本學(xué)習(xí)教程
這篇文章主要介紹了C#中的delegate委托類型基本學(xué)習(xí)教程,委托是C#語(yǔ)言所具有的一個(gè)重要特性,需要的朋友可以參考下2016-01-01C# 使用CancellationTokenSource取消多線程
有時(shí)間我們?cè)谑褂枚嗑€程的時(shí)候,需要取消線程的執(zhí)行,可以使用CancellationTokenSource來取消對(duì)Task開辟多線程的取消,感興趣的可以了解一下2021-08-08C#的Process類調(diào)用第三方插件實(shí)現(xiàn)PDF文件轉(zhuǎn)SWF文件
本篇文章主要介紹了C#的Process類調(diào)用第三方插件實(shí)現(xiàn)PDF文件轉(zhuǎn)SWF文件,現(xiàn)在分享給大家,具有一定的參考價(jià)值,有需要的可以了解一下。2016-11-11通過?C#/VB.NET?代碼將?Excel?工作表拆分為單獨(dú)的文件
這篇文章主要介紹了通過C#/VB.NET代碼將Excel工作表拆分為單獨(dú)的文件,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下2022-09-09c#根據(jù)文件類型獲取相關(guān)類型圖標(biāo)的方法代碼
c#根據(jù)文件類型獲取相關(guān)類型圖標(biāo)的方法代碼,需要的朋友可以參考一下2013-05-05Unity實(shí)現(xiàn)簡(jiǎn)單虛擬搖桿
這篇文章主要為大家詳細(xì)介紹了Unity實(shí)現(xiàn)簡(jiǎn)單虛擬搖桿,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2020-04-04