C#使用Pipelines實(shí)現(xiàn)處理Socket數(shù)據(jù)包
寫(xiě)在前面
在上一篇中對(duì)Pipelines進(jìn)行簡(jiǎn)單的了解,同時(shí)也留下了未解的問(wèn)題,如何將Pipelines類(lèi)庫(kù)運(yùn)用到Socket通訊過(guò)程中來(lái)解決粘包和分包。鏈接地址如下:C#中System.IO.Pipelines庫(kù)的使用詳解
這一篇做了一個(gè)完整的demo,使用Pipelines接收和處理來(lái)自多個(gè)客戶(hù)端發(fā)出的消息;相對(duì)于以往在報(bào)文包頭放包體長(zhǎng)度再結(jié)合結(jié)束符來(lái)判斷的方式,確實(shí)要簡(jiǎn)潔了許多。
代碼實(shí)現(xiàn)
服務(wù)端實(shí)現(xiàn)
using System.Net.Sockets;
using System.Net;
using System.Text;
class Program
{
static async Task Main()
{
SocketServerForPiplines();
}
static async void SocketServerForPiplines()
{
Console.WriteLine("Socket Server");
// 創(chuàng)建服務(wù)端Socket對(duì)象
var serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
serverSocket.Bind(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 9090));
serverSocket.ReceiveTimeout = 1000;
serverSocket.SendTimeout = 1000;
serverSocket.Listen(1000);
Console.WriteLine("服務(wù)端啟動(dòng)監(jiān)聽(tīng)");
while (true)
{
var clientSocket = serverSocket.Accept();
Console.WriteLine("有客戶(hù)端連上了");
var handler = new PiplinesHandler(clientSocket);
await handler.StartReceiveAsync();
}
Console.ReadLine();
}
}PiplinesHandler類(lèi)
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
namespace PipelinesTester
{
public class PiplinesHandler
{
private const int _minimumBufferSize = 512;
private Socket _socket;
private Pipe _pipe;
public PiplinesHandler(Socket socket)
{
_socket = socket;
var options = new PipeOptions(pauseWriterThreshold: 4096, resumeWriterThreshold: 1024);
_pipe = new Pipe(options);
}
public async Task StartReceiveAsync()
{
Task receiveTask = ReceiveMessageAsync();
Task processTask = ProcessMessageAsync();
await Task.WhenAll(receiveTask, processTask);
}
private async Task ReceiveMessageAsync()
{
PipeWriter writer = _pipe.Writer;
while (true)
{
try
{
//從writer申請(qǐng)緩沖區(qū)
Memory<byte> memory = writer.GetMemory(_minimumBufferSize);
//從socket讀取數(shù)據(jù),直接寫(xiě)入到緩沖區(qū)中,即直接寫(xiě)入了PipeWriter中
int bytesRead = await _socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
//前移寫(xiě)標(biāo)志位
writer.Advance(bytesRead);
//通知Reader,可以讀取了
var result = await writer.FlushAsync();
if (result.IsCompleted)
break;
}
catch (Exception e)
{
Console.WriteLine(e);
break;
}
}
await writer.CompleteAsync();
try
{
_socket.Shutdown(SocketShutdown.Both);
_socket.Close();
}
catch (Exception e)
{
Console.WriteLine(e);
}
}
private async Task ProcessMessageAsync()
{
PipeReader _pipeReader = _pipe.Reader;
while (true)
{
//讀取消息
var result = await _pipeReader.ReadAsync();
var buffer = result.Buffer;
//查找結(jié)束符
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position == null)
{
continue;
}
// 處理消息
var line = buffer.Slice(0, position.Value);
string msg = Encoding.UTF8.GetString(line);
Console.WriteLine(msg);
// 前移PipeReader
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
_pipeReader.AdvanceTo(buffer.Start, buffer.End);
// Stop reading if there's no more data coming.
if (result.IsCompleted)
{
break;
}
}
await _pipeReader.CompleteAsync();
}
}
}客戶(hù)端實(shí)現(xiàn)
using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
class Program
{
static void Main(string[] args)
{
TcpClientTest();
}
static void TcpClientTest()
{
Console.WriteLine("TcpClient");
var msg = $"這是來(lái)自客戶(hù)端的消息{DateTime.Now.ToString("yyyy-MM-dd:HH:mm:ss")}\n";
var client = new TcpClient("127.0.0.1", 9090);
var sendStream = client.GetStream();
var sendBytes = Encoding.Default.GetBytes(msg);
sendStream.Write(sendBytes, 0, sendBytes.Length);
sendStream.Flush();
sendStream.Close();//關(guān)閉網(wǎng)絡(luò)流
client.Close();//關(guān)閉客戶(hù)端
Console.WriteLine(msg);
Console.ReadLine();
}
}調(diào)用示例

到此這篇關(guān)于C#使用Pipelines實(shí)現(xiàn)處理Socket數(shù)據(jù)包的文章就介紹到這了,更多相關(guān)C# Pipelines處理Socket內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
C#中的delegate委托類(lèi)型基本學(xué)習(xí)教程
這篇文章主要介紹了C#中的delegate委托類(lèi)型基本學(xué)習(xí)教程,委托是C#語(yǔ)言所具有的一個(gè)重要特性,需要的朋友可以參考下2016-01-01
C# 使用CancellationTokenSource取消多線程
有時(shí)間我們?cè)谑褂枚嗑€程的時(shí)候,需要取消線程的執(zhí)行,可以使用CancellationTokenSource來(lái)取消對(duì)Task開(kāi)辟多線程的取消,感興趣的可以了解一下2021-08-08
C#的Process類(lèi)調(diào)用第三方插件實(shí)現(xiàn)PDF文件轉(zhuǎn)SWF文件
本篇文章主要介紹了C#的Process類(lèi)調(diào)用第三方插件實(shí)現(xiàn)PDF文件轉(zhuǎn)SWF文件,現(xiàn)在分享給大家,具有一定的參考價(jià)值,有需要的可以了解一下。2016-11-11
通過(guò)?C#/VB.NET?代碼將?Excel?工作表拆分為單獨(dú)的文件
這篇文章主要介紹了通過(guò)C#/VB.NET代碼將Excel工作表拆分為單獨(dú)的文件,文章圍繞主題展開(kāi)詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下2022-09-09
c#根據(jù)文件類(lèi)型獲取相關(guān)類(lèi)型圖標(biāo)的方法代碼
c#根據(jù)文件類(lèi)型獲取相關(guān)類(lèi)型圖標(biāo)的方法代碼,需要的朋友可以參考一下2013-05-05
Unity實(shí)現(xiàn)簡(jiǎn)單虛擬搖桿
這篇文章主要為大家詳細(xì)介紹了Unity實(shí)現(xiàn)簡(jiǎn)單虛擬搖桿,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2020-04-04

