欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

C#使用Pipelines實(shí)現(xiàn)處理Socket數(shù)據(jù)包

 更新時(shí)間:2023年12月27日 08:19:53   作者:rjcql  
這篇文章主要為大家詳細(xì)介紹了C#如何使用Pipelines實(shí)現(xiàn)處理Socket數(shù)據(jù)包,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下

寫在前面

在上一篇中對(duì)Pipelines進(jìn)行簡(jiǎn)單的了解,同時(shí)也留下了未解的問題,如何將Pipelines類庫(kù)運(yùn)用到Socket通訊過程中來解決粘包和分包。鏈接地址如下:C#中System.IO.Pipelines庫(kù)的使用詳解

這一篇做了一個(gè)完整的demo,使用Pipelines接收和處理來自多個(gè)客戶端發(fā)出的消息;相對(duì)于以往在報(bào)文包頭放包體長(zhǎng)度再結(jié)合結(jié)束符來判斷的方式,確實(shí)要簡(jiǎn)潔了許多。

代碼實(shí)現(xiàn)

服務(wù)端實(shí)現(xiàn)

using System.Net.Sockets;
using System.Net;
using System.Text;
 
class Program
{
    static async Task Main()
    {
        SocketServerForPiplines();
    }
 
    static async void SocketServerForPiplines()
    {
        Console.WriteLine("Socket Server");
 
        // 創(chuàng)建服務(wù)端Socket對(duì)象
        var serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        serverSocket.Bind(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 9090));
        serverSocket.ReceiveTimeout = 1000;
        serverSocket.SendTimeout = 1000;
        serverSocket.Listen(1000);
        Console.WriteLine("服務(wù)端啟動(dòng)監(jiān)聽");
 
        while (true)
        {
            var clientSocket = serverSocket.Accept();
 
            Console.WriteLine("有客戶端連上了");
 
            var handler = new PiplinesHandler(clientSocket);
            await handler.StartReceiveAsync();
        }
 
        Console.ReadLine();
    }
}

PiplinesHandler類

using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
 
namespace PipelinesTester
{
    public class PiplinesHandler
    {
        private const int _minimumBufferSize = 512;
        private Socket _socket;
        private Pipe _pipe;
 
        public PiplinesHandler(Socket socket)
        {
            _socket = socket;
            var options = new PipeOptions(pauseWriterThreshold: 4096, resumeWriterThreshold: 1024);
            _pipe = new Pipe(options);
        }
 
        public async Task StartReceiveAsync()
        {
            Task receiveTask = ReceiveMessageAsync();
            Task processTask = ProcessMessageAsync();
 
            await Task.WhenAll(receiveTask, processTask);
        }
 
 
        private async Task ReceiveMessageAsync()
        {
            PipeWriter writer = _pipe.Writer;
 
            while (true)
            {
                try
                {
                    //從writer申請(qǐng)緩沖區(qū)
                    Memory<byte> memory = writer.GetMemory(_minimumBufferSize);
                    //從socket讀取數(shù)據(jù),直接寫入到緩沖區(qū)中,即直接寫入了PipeWriter中        
                    int bytesRead = await _socket.ReceiveAsync(memory, SocketFlags.None);
                    if (bytesRead == 0)
                    {
                        break;
                    }
                    //前移寫標(biāo)志位
                    writer.Advance(bytesRead);
                    //通知Reader,可以讀取了
                    var result = await writer.FlushAsync();
 
                    if (result.IsCompleted)
                        break;
                }
                catch (Exception e)
                {
                    Console.WriteLine(e);
                    break;
                }
 
            }
 
            await writer.CompleteAsync();
 
            try
            {
                _socket.Shutdown(SocketShutdown.Both);
                _socket.Close();
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
            }
        }
 
        private async Task ProcessMessageAsync()
        {
            PipeReader _pipeReader = _pipe.Reader;
 
            while (true)
            {
                //讀取消息
                var result = await _pipeReader.ReadAsync();
                var buffer = result.Buffer;
                //查找結(jié)束符            
                SequencePosition? position = buffer.PositionOf((byte)'\n');
 
                if (position == null)
                {
                    continue;
                }
 
                // 處理消息
                var line = buffer.Slice(0, position.Value);
                string msg = Encoding.UTF8.GetString(line);
                Console.WriteLine(msg);
 
                // 前移PipeReader
                buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
                _pipeReader.AdvanceTo(buffer.Start, buffer.End);
 
                // Stop reading if there's no more data coming.
                if (result.IsCompleted)
                {
                    break;
                }
            }
 
            await _pipeReader.CompleteAsync();
        }
    }
}

客戶端實(shí)現(xiàn)

using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
 
class Program
{
    static void Main(string[] args)
    {
        TcpClientTest();
    }
 
    static void TcpClientTest()
    {
        Console.WriteLine("TcpClient");
 
        var msg = $"這是來自客戶端的消息{DateTime.Now.ToString("yyyy-MM-dd:HH:mm:ss")}\n";
        var client = new TcpClient("127.0.0.1", 9090);
        var sendStream = client.GetStream();
        var sendBytes = Encoding.Default.GetBytes(msg);
        sendStream.Write(sendBytes, 0, sendBytes.Length);
        sendStream.Flush();
        sendStream.Close();//關(guān)閉網(wǎng)絡(luò)流  
        client.Close();//關(guān)閉客戶端  
 
        Console.WriteLine(msg);
 
        Console.ReadLine();
    }
}

調(diào)用示例

到此這篇關(guān)于C#使用Pipelines實(shí)現(xiàn)處理Socket數(shù)據(jù)包的文章就介紹到這了,更多相關(guān)C# Pipelines處理Socket內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論