C#處理TCP數(shù)據(jù)的方法詳解
前言
Tcp是一個(gè)面向連接的流數(shù)據(jù)傳輸協(xié)議,用人話說(shuō)就是傳輸是一個(gè)已經(jīng)建立好連接的管道,數(shù)據(jù)都在管道里像流水一樣流淌到對(duì)端。
那么數(shù)據(jù)必然存在幾個(gè)問(wèn)題,比如數(shù)據(jù)如何持續(xù)的讀取,數(shù)據(jù)包的邊界等。
Nagle's算法
Nagle 算法的核心思想是,在一個(gè) TCP 連接上,最多只能有一個(gè)未被確認(rèn)的小數(shù)據(jù)包(小于 MSS,即最大報(bào)文段大?。?/p>
優(yōu)勢(shì)
減少網(wǎng)絡(luò)擁塞:通過(guò)合并小數(shù)據(jù)包,減少了網(wǎng)絡(luò)中的數(shù)據(jù)包數(shù)量,降低了擁塞的可能性。
提高網(wǎng)絡(luò)效率:在低速網(wǎng)絡(luò)中,Nagle 算法可以顯著提高傳輸效率。
劣勢(shì)
增加延遲:在交互式應(yīng)用中,Nagle 算法可能導(dǎo)致顯著的延遲,因?yàn)樗却?ACK 或合并數(shù)據(jù)包。
C#中如何配置?
var _socket = new Socket(IPAddress.Any.AddressFamily, SocketType.Stream, ProtocolType.Tcp); _serverSocket.NoDelay = _options.NoDelay;
連接超時(shí)
在調(diào)用客戶端Socket連接服務(wù)器的時(shí)候,可以設(shè)置連接超時(shí)機(jī)制,具體可以傳入一個(gè)任務(wù)的取消令牌,并且設(shè)置超時(shí)時(shí)間。
CancellationTokenSource connectTokenSource = new CancellationTokenSource(); connectTokenSource.CancelAfter(3000); //3秒 await _socket.ConnectAsync(RemoteEndPoint, connectTokenSource.Token);
SSL加密傳輸
TCP使用SSL加密傳輸,通過(guò)非對(duì)稱加密的方式,利用證書(shū),保證雙方使用了安全的密鑰加密了報(bào)文。在C#中如何配置?
服務(wù)端配置
//創(chuàng)建證書(shū)對(duì)象 var _certificate = _certificate = new X509Certificate2(_options.PfxCertFilename, _options.PfxPassword); //與客戶端進(jìn)行驗(yàn)證 if (allowingUntrustedSSLCertificate) //是否允許不受信任的證書(shū) { SslStream = new SslStream(NetworkStream, false, (obj, certificate, chain, error) => true); } else { SslStream = new SslStream(NetworkStream, false); } try { //serverCertificate:用于對(duì)服務(wù)器進(jìn)行身份驗(yàn)證的 X509Certificate //clientCertificateRequired:一個(gè) Boolean 值,指定客戶端是否必須為身份驗(yàn)證提供證書(shū) //checkCertificateRevocation:一個(gè) Boolean 值,指定在身份驗(yàn)證過(guò)程中是否檢查證書(shū)吊銷列表 await SslStream.AuthenticateAsServerAsync(new SslServerAuthenticationOptions() { ServerCertificate = x509Certificate, ClientCertificateRequired = mutuallyAuthenticate, CertificateRevocationCheckMode = checkCertificateRevocation ? X509RevocationMode.Online : X509RevocationMode.NoCheck }, cancellationToken).ConfigureAwait(false); if (!SslStream.IsEncrypted || !SslStream.IsAuthenticated) { return false; } if (mutuallyAuthenticate && !SslStream.IsMutuallyAuthenticated) { return false; } } catch (Exception) { throw; } //完成驗(yàn)證后,通過(guò)SslStream傳輸數(shù)據(jù) int readCount = await SslStream.ReadAsync(buffer, _lifecycleTokenSource.Token) .ConfigureAwait(false);
客戶端配置
var _certificate = new X509Certificate2(_options.PfxCertFilename, _options.PfxPassword); if (_options.IsSsl) //如果使用ssl加密傳輸 { if (_options.AllowingUntrustedSSLCertificate)//是否允許不受信任的證書(shū) { _sslStream = new SslStream(_networkStream, false, (obj, certificate, chain, error) => true); } else { _sslStream = new SslStream(_networkStream, false); } _sslStream.ReadTimeout = _options.ReadTimeout; _sslStream.WriteTimeout = _options.WriteTimeout; await _sslStream.AuthenticateAsClientAsync(new SslClientAuthenticationOptions() { TargetHost = RemoteEndPoint.Address.ToString(), EnabledSslProtocols = System.Security.Authentication.SslProtocols.Tls12, CertificateRevocationCheckMode = _options.CheckCertificateRevocation ? X509RevocationMode.Online : X509RevocationMode.NoCheck, ClientCertificates = new X509CertificateCollection() { _certificate } }, connectTokenSource.Token).ConfigureAwait(false); if (!_sslStream.IsEncrypted || !_sslStream.IsAuthenticated || (_options.MutuallyAuthenticate && !_sslStream.IsMutuallyAuthenticated)) { throw new InvalidOperationException("SSL authenticated faild!"); } }
KeepAlive
keepAlive不是TCP協(xié)議中的,而是各個(gè)操作系統(tǒng)本身實(shí)現(xiàn)的功能,主要是防止一些Socket突然斷開(kāi)后沒(méi)有被感知到,導(dǎo)致一直浪費(fèi)資源的情況。
/// <summary> /// 開(kāi)啟Socket的KeepAlive /// 設(shè)置tcp協(xié)議的一些KeepAlive參數(shù) /// </summary> /// <param name="socket"></param> /// <param name="tcpKeepAliveInterval">沒(méi)有接收到對(duì)方確認(rèn),繼續(xù)發(fā)送KeepAlive的發(fā)送頻率</param> /// <param name="tcpKeepAliveTime">KeepAlive的空閑時(shí)長(zhǎng),或者說(shuō)每次正常發(fā)送心跳的周期</param> /// <param name="tcpKeepAliveRetryCount">KeepAlive之后設(shè)置最大允許發(fā)送?;钐綔y(cè)包的次數(shù),到達(dá)此次數(shù)后直接放棄嘗試,并關(guān)閉連接</param> internal static void SetKeepAlive(this Socket socket, int tcpKeepAliveInterval, int tcpKeepAliveTime, int tcpKeepAliveRetryCount) { socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true); socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveInterval, tcpKeepAliveInterval); socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveTime, tcpKeepAliveTime); socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveRetryCount, tcpKeepAliveRetryCount); }
具體的開(kāi)啟,還需要看操作系統(tǒng)的版本以及不同操作系統(tǒng)的支持。
粘包斷包處理
Pipe & ReadOnlySequence
TCP面向應(yīng)用是流式數(shù)據(jù)傳輸,所以接收端接到的數(shù)據(jù)是像流水一樣從管道中傳來(lái),每次取到的數(shù)據(jù)取決于應(yīng)用設(shè)置的緩沖區(qū)大小,以及套接字本身緩沖區(qū)待讀取字節(jié)數(shù)。
C#中提供的Pipe就如上圖一樣,是一個(gè)管道Pipe有兩個(gè)對(duì)象成員,一個(gè)是PipeWriter,一個(gè)是PipeReader,可以理解為一個(gè)是生產(chǎn)者,專門(mén)往管道里灌輸數(shù)據(jù)流,即字節(jié)流,一個(gè)是消費(fèi)者,專門(mén)從管道里獲取字節(jié)流進(jìn)行處理。
可以看到Pipe中的數(shù)據(jù)包是用鏈表關(guān)聯(lián)的,但是這個(gè)數(shù)據(jù)包是從Socke緩沖區(qū)每次取到的數(shù)據(jù)包,它不一定是一個(gè)完整的數(shù)據(jù)包,所以這些數(shù)據(jù)包連接起來(lái)后形成了一個(gè)C#提供的另外一個(gè)抽象的對(duì)象ReadOnlySequence。
但是這里還是沒(méi)有提供太好的處理斷包和粘包的辦法,因?yàn)閿喟嘲奶幚硇枰獌煞矫?/p>
1、業(yè)務(wù)數(shù)據(jù)包的定義
2、數(shù)據(jù)流切割出一個(gè)個(gè)完整的數(shù)據(jù)包
假設(shè)業(yè)務(wù)已經(jīng)定義好了數(shù)據(jù)包,那么我們?nèi)绾螐腜ipe中這些數(shù)據(jù)包根據(jù)業(yè)務(wù)定義來(lái)從不同的數(shù)據(jù)包中切割出一個(gè)完整的包,那么就需要ReadOnlySequence,它提供的操作方法,非常方便我們?nèi)デ懈顢?shù)據(jù),主要是頭尾數(shù)據(jù)包的切割。
假設(shè)我們業(yè)務(wù)層定義了一個(gè)數(shù)據(jù)包結(jié)構(gòu),數(shù)據(jù)包是不定長(zhǎng)的,包體長(zhǎng)度每次都寫(xiě)在包頭里,我們來(lái)實(shí)現(xiàn)一個(gè)數(shù)據(jù)包過(guò)濾器。
//收到消息 while (!_receiveDataTokenSource.Token.IsCancellationRequested) { try { //從pipe中獲取緩沖區(qū) Memory<byte> buffer = _pipeWriter.GetMemory(_options.BufferSize); int readCount = 0; readCount = await _sslStream.ReadAsync(buffer, _lifecycleTokenSource.Token).ConfigureAwait(false); if (readCount > 0) { var data = buffer.Slice(0, readCount); //告知消費(fèi)者,往Pipe的管道中寫(xiě)入了多少字節(jié)數(shù)據(jù) _pipeWriter.Advance(readCount); } else { if (IsDisconnect()) { await DisConnectAsync(); } throw new SocketException(); } FlushResult result = await _pipeWriter.FlushAsync().ConfigureAwait(false); if (result.IsCompleted) { break; } } catch (IOException) { //TODO log break; } catch (SocketException) { //TODO log break; } catch (TaskCanceledException) { //TODO log break; } } _pipeWriter.Complete(); //消費(fèi)者處理數(shù)據(jù) while (!_lifecycleTokenSource.Token.IsCancellationRequested) { ReadResult result = await _pipeReader.ReadAsync(); ReadOnlySequence<byte> buffer = result.Buffer; ReadOnlySequence<byte> data; do { //通過(guò)過(guò)濾器得到一個(gè)完整的包 data = _receivePackageFilter.ResolvePackage(ref buffer); if (!data.IsEmpty) { OnReceivedData?.Invoke(this, new ClientDataReceiveEventArgs(data.ToArray())); } } while (!data.IsEmpty && buffer.Length > 0); _pipeReader.AdvanceTo(buffer.Start); } _pipeReader.Complete(); /// <summary> /// 解析數(shù)據(jù)包 /// 固定報(bào)文頭解析協(xié)議 /// </summary> /// <param name="headerSize">數(shù)據(jù)報(bào)文頭的大小</param> /// <param name="bodyLengthIndex">數(shù)據(jù)包大小在報(bào)文頭中的位置</param> /// <param name="bodyLengthBytes">數(shù)據(jù)包大小在報(bào)文頭中的長(zhǎng)度</param> /// <param name="IsLittleEndian">數(shù)據(jù)報(bào)文大小端。windows中通常是小端,unix通常是大端模式</param> /// </summary> /// <param name="sequence">一個(gè)完整的業(yè)務(wù)數(shù)據(jù)包</param> public override ReadOnlySequence<byte> ResolvePackage(ref ReadOnlySequence<byte> sequence) { var len = sequence.Length; if (len < _bodyLengthIndex) return default; var bodyLengthSequence = sequence.Slice(_bodyLengthIndex, _bodyLengthBytes); byte[] bodyLengthBytes = ArrayPool<byte>.Shared.Rent(_bodyLengthBytes); try { int index = 0; foreach (var item in bodyLengthSequence) { Array.Copy(item.ToArray(), 0, bodyLengthBytes, index, item.Length); index += item.Length; } long bodyLength = 0; int offset = 0; if (!_isLittleEndian) { offset = bodyLengthBytes.Length - 1; foreach (var bytes in bodyLengthBytes) { bodyLength += bytes << (offset * 8); offset--; } } else { foreach (var bytes in bodyLengthBytes) { bodyLength += bytes << (offset * 8); offset++; } } if (sequence.Length < _headerSize + bodyLength) return default; var endPosition = sequence.GetPosition(_headerSize + bodyLength); var data = sequence.Slice(0, endPosition);//得到完整數(shù)據(jù)包 sequence = sequence.Slice(endPosition);//緩沖區(qū)中去除取到的完整包 return data; } finally { ArrayPool<byte>.Shared.Return(bodyLengthBytes); } }
以上就是實(shí)現(xiàn)了固定數(shù)據(jù)包頭實(shí)現(xiàn)粘包斷包處理的部分代碼。
關(guān)于TCP的連接還有一些,比如客戶端連接限制,空閑連接關(guān)閉等。
到此這篇關(guān)于C#處理TCP數(shù)據(jù)的方法詳解的文章就介紹到這了,更多相關(guān)C#處理TCP數(shù)據(jù)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
C#連接SQL Server的實(shí)現(xiàn)方法
這篇文章主要給大家介紹了關(guān)于C#連接SQL Server的實(shí)現(xiàn)方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2018-12-12Unity實(shí)現(xiàn)動(dòng)物識(shí)別的示例代碼
本文主要介紹了如何通過(guò)Unity實(shí)現(xiàn)動(dòng)物識(shí)別,可以實(shí)現(xiàn)識(shí)別近八千種動(dòng)物,接口返回動(dòng)物名稱,并可獲取識(shí)別結(jié)果對(duì)應(yīng)的百科信息,感興趣的可以了解一下2022-02-02C#創(chuàng)建Windows服務(wù)與服務(wù)的安裝、卸載
這篇文章介紹了C#創(chuàng)建Windows服務(wù)與服務(wù)的安裝、卸載,對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-02-02C#中用foreach語(yǔ)句遍歷數(shù)組及將數(shù)組作為參數(shù)的用法
這篇文章主要介紹了C#中用foreach語(yǔ)句遍歷數(shù)組及將數(shù)組作為參數(shù)的用法,C#的數(shù)組可作為實(shí)參傳遞給方法形參,需要的朋友可以參考下2016-01-01C#固定大小緩沖區(qū)及使用指針復(fù)制數(shù)據(jù)詳解
這篇文章主要為大家介紹了C#固定大小緩沖區(qū)及使用指針復(fù)制數(shù)據(jù)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-12-12