C#處理TCP數(shù)據(jù)的方法詳解
前言
Tcp是一個(gè)面向連接的流數(shù)據(jù)傳輸協(xié)議,用人話說就是傳輸是一個(gè)已經(jīng)建立好連接的管道,數(shù)據(jù)都在管道里像流水一樣流淌到對(duì)端。
那么數(shù)據(jù)必然存在幾個(gè)問題,比如數(shù)據(jù)如何持續(xù)的讀取,數(shù)據(jù)包的邊界等。
Nagle's算法
Nagle 算法的核心思想是,在一個(gè) TCP 連接上,最多只能有一個(gè)未被確認(rèn)的小數(shù)據(jù)包(小于 MSS,即最大報(bào)文段大?。?/p>
優(yōu)勢
減少網(wǎng)絡(luò)擁塞:通過合并小數(shù)據(jù)包,減少了網(wǎng)絡(luò)中的數(shù)據(jù)包數(shù)量,降低了擁塞的可能性。
提高網(wǎng)絡(luò)效率:在低速網(wǎng)絡(luò)中,Nagle 算法可以顯著提高傳輸效率。
劣勢
增加延遲:在交互式應(yīng)用中,Nagle 算法可能導(dǎo)致顯著的延遲,因?yàn)樗却?ACK 或合并數(shù)據(jù)包。
C#中如何配置?
var _socket = new Socket(IPAddress.Any.AddressFamily, SocketType.Stream, ProtocolType.Tcp); _serverSocket.NoDelay = _options.NoDelay;
連接超時(shí)
在調(diào)用客戶端Socket連接服務(wù)器的時(shí)候,可以設(shè)置連接超時(shí)機(jī)制,具體可以傳入一個(gè)任務(wù)的取消令牌,并且設(shè)置超時(shí)時(shí)間。
CancellationTokenSource connectTokenSource = new CancellationTokenSource(); connectTokenSource.CancelAfter(3000); //3秒 await _socket.ConnectAsync(RemoteEndPoint, connectTokenSource.Token);
SSL加密傳輸
TCP使用SSL加密傳輸,通過非對(duì)稱加密的方式,利用證書,保證雙方使用了安全的密鑰加密了報(bào)文。在C#中如何配置?
服務(wù)端配置
//創(chuàng)建證書對(duì)象
var _certificate = _certificate = new X509Certificate2(_options.PfxCertFilename, _options.PfxPassword);
//與客戶端進(jìn)行驗(yàn)證
if (allowingUntrustedSSLCertificate) //是否允許不受信任的證書
{
SslStream = new SslStream(NetworkStream, false,
(obj, certificate, chain, error) => true);
}
else
{
SslStream = new SslStream(NetworkStream, false);
}
try
{
//serverCertificate:用于對(duì)服務(wù)器進(jìn)行身份驗(yàn)證的 X509Certificate
//clientCertificateRequired:一個(gè) Boolean 值,指定客戶端是否必須為身份驗(yàn)證提供證書
//checkCertificateRevocation:一個(gè) Boolean 值,指定在身份驗(yàn)證過程中是否檢查證書吊銷列表
await SslStream.AuthenticateAsServerAsync(new SslServerAuthenticationOptions()
{
ServerCertificate = x509Certificate,
ClientCertificateRequired = mutuallyAuthenticate,
CertificateRevocationCheckMode = checkCertificateRevocation ? X509RevocationMode.Online : X509RevocationMode.NoCheck
}, cancellationToken).ConfigureAwait(false);
if (!SslStream.IsEncrypted || !SslStream.IsAuthenticated)
{
return false;
}
if (mutuallyAuthenticate && !SslStream.IsMutuallyAuthenticated)
{
return false;
}
}
catch (Exception)
{
throw;
}
//完成驗(yàn)證后,通過SslStream傳輸數(shù)據(jù)
int readCount = await SslStream.ReadAsync(buffer, _lifecycleTokenSource.Token)
.ConfigureAwait(false);客戶端配置
var _certificate = new X509Certificate2(_options.PfxCertFilename, _options.PfxPassword);
if (_options.IsSsl) //如果使用ssl加密傳輸
{
if (_options.AllowingUntrustedSSLCertificate)//是否允許不受信任的證書
{
_sslStream = new SslStream(_networkStream, false,
(obj, certificate, chain, error) => true);
}
else
{
_sslStream = new SslStream(_networkStream, false);
}
_sslStream.ReadTimeout = _options.ReadTimeout;
_sslStream.WriteTimeout = _options.WriteTimeout;
await _sslStream.AuthenticateAsClientAsync(new SslClientAuthenticationOptions()
{
TargetHost = RemoteEndPoint.Address.ToString(),
EnabledSslProtocols = System.Security.Authentication.SslProtocols.Tls12,
CertificateRevocationCheckMode = _options.CheckCertificateRevocation ? X509RevocationMode.Online : X509RevocationMode.NoCheck,
ClientCertificates = new X509CertificateCollection() { _certificate }
}, connectTokenSource.Token).ConfigureAwait(false);
if (!_sslStream.IsEncrypted || !_sslStream.IsAuthenticated ||
(_options.MutuallyAuthenticate && !_sslStream.IsMutuallyAuthenticated))
{
throw new InvalidOperationException("SSL authenticated faild!");
}
}KeepAlive
keepAlive不是TCP協(xié)議中的,而是各個(gè)操作系統(tǒng)本身實(shí)現(xiàn)的功能,主要是防止一些Socket突然斷開后沒有被感知到,導(dǎo)致一直浪費(fèi)資源的情況。
/// <summary>
/// 開啟Socket的KeepAlive
/// 設(shè)置tcp協(xié)議的一些KeepAlive參數(shù)
/// </summary>
/// <param name="socket"></param>
/// <param name="tcpKeepAliveInterval">沒有接收到對(duì)方確認(rèn),繼續(xù)發(fā)送KeepAlive的發(fā)送頻率</param>
/// <param name="tcpKeepAliveTime">KeepAlive的空閑時(shí)長,或者說每次正常發(fā)送心跳的周期</param>
/// <param name="tcpKeepAliveRetryCount">KeepAlive之后設(shè)置最大允許發(fā)送?;钐綔y包的次數(shù),到達(dá)此次數(shù)后直接放棄嘗試,并關(guān)閉連接</param>
internal static void SetKeepAlive(this Socket socket, int tcpKeepAliveInterval, int tcpKeepAliveTime, int tcpKeepAliveRetryCount)
{
socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveInterval, tcpKeepAliveInterval);
socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveTime, tcpKeepAliveTime);
socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveRetryCount, tcpKeepAliveRetryCount);
}具體的開啟,還需要看操作系統(tǒng)的版本以及不同操作系統(tǒng)的支持。
粘包斷包處理
Pipe & ReadOnlySequence

TCP面向應(yīng)用是流式數(shù)據(jù)傳輸,所以接收端接到的數(shù)據(jù)是像流水一樣從管道中傳來,每次取到的數(shù)據(jù)取決于應(yīng)用設(shè)置的緩沖區(qū)大小,以及套接字本身緩沖區(qū)待讀取字節(jié)數(shù)。
C#中提供的Pipe就如上圖一樣,是一個(gè)管道Pipe有兩個(gè)對(duì)象成員,一個(gè)是PipeWriter,一個(gè)是PipeReader,可以理解為一個(gè)是生產(chǎn)者,專門往管道里灌輸數(shù)據(jù)流,即字節(jié)流,一個(gè)是消費(fèi)者,專門從管道里獲取字節(jié)流進(jìn)行處理。
可以看到Pipe中的數(shù)據(jù)包是用鏈表關(guān)聯(lián)的,但是這個(gè)數(shù)據(jù)包是從Socke緩沖區(qū)每次取到的數(shù)據(jù)包,它不一定是一個(gè)完整的數(shù)據(jù)包,所以這些數(shù)據(jù)包連接起來后形成了一個(gè)C#提供的另外一個(gè)抽象的對(duì)象ReadOnlySequence。
但是這里還是沒有提供太好的處理斷包和粘包的辦法,因?yàn)閿喟嘲奶幚硇枰獌煞矫?/p>
1、業(yè)務(wù)數(shù)據(jù)包的定義
2、數(shù)據(jù)流切割出一個(gè)個(gè)完整的數(shù)據(jù)包
假設(shè)業(yè)務(wù)已經(jīng)定義好了數(shù)據(jù)包,那么我們?nèi)绾螐腜ipe中這些數(shù)據(jù)包根據(jù)業(yè)務(wù)定義來從不同的數(shù)據(jù)包中切割出一個(gè)完整的包,那么就需要ReadOnlySequence,它提供的操作方法,非常方便我們?nèi)デ懈顢?shù)據(jù),主要是頭尾數(shù)據(jù)包的切割。
假設(shè)我們業(yè)務(wù)層定義了一個(gè)數(shù)據(jù)包結(jié)構(gòu),數(shù)據(jù)包是不定長的,包體長度每次都寫在包頭里,我們來實(shí)現(xiàn)一個(gè)數(shù)據(jù)包過濾器。
//收到消息
while (!_receiveDataTokenSource.Token.IsCancellationRequested)
{
try
{
//從pipe中獲取緩沖區(qū)
Memory<byte> buffer = _pipeWriter.GetMemory(_options.BufferSize);
int readCount = 0;
readCount = await _sslStream.ReadAsync(buffer, _lifecycleTokenSource.Token).ConfigureAwait(false);
if (readCount > 0)
{
var data = buffer.Slice(0, readCount);
//告知消費(fèi)者,往Pipe的管道中寫入了多少字節(jié)數(shù)據(jù)
_pipeWriter.Advance(readCount);
}
else
{
if (IsDisconnect())
{
await DisConnectAsync();
}
throw new SocketException();
}
FlushResult result = await _pipeWriter.FlushAsync().ConfigureAwait(false);
if (result.IsCompleted)
{
break;
}
}
catch (IOException)
{
//TODO log
break;
}
catch (SocketException)
{
//TODO log
break;
}
catch (TaskCanceledException)
{
//TODO log
break;
}
}
_pipeWriter.Complete();
//消費(fèi)者處理數(shù)據(jù)
while (!_lifecycleTokenSource.Token.IsCancellationRequested)
{
ReadResult result = await _pipeReader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
ReadOnlySequence<byte> data;
do
{
//通過過濾器得到一個(gè)完整的包
data = _receivePackageFilter.ResolvePackage(ref buffer);
if (!data.IsEmpty)
{
OnReceivedData?.Invoke(this, new ClientDataReceiveEventArgs(data.ToArray()));
}
}
while (!data.IsEmpty && buffer.Length > 0);
_pipeReader.AdvanceTo(buffer.Start);
}
_pipeReader.Complete();
/// <summary>
/// 解析數(shù)據(jù)包
/// 固定報(bào)文頭解析協(xié)議
/// </summary>
/// <param name="headerSize">數(shù)據(jù)報(bào)文頭的大小</param>
/// <param name="bodyLengthIndex">數(shù)據(jù)包大小在報(bào)文頭中的位置</param>
/// <param name="bodyLengthBytes">數(shù)據(jù)包大小在報(bào)文頭中的長度</param>
/// <param name="IsLittleEndian">數(shù)據(jù)報(bào)文大小端。windows中通常是小端,unix通常是大端模式</param>
/// </summary>
/// <param name="sequence">一個(gè)完整的業(yè)務(wù)數(shù)據(jù)包</param>
public override ReadOnlySequence<byte> ResolvePackage(ref ReadOnlySequence<byte> sequence)
{
var len = sequence.Length;
if (len < _bodyLengthIndex) return default;
var bodyLengthSequence = sequence.Slice(_bodyLengthIndex, _bodyLengthBytes);
byte[] bodyLengthBytes = ArrayPool<byte>.Shared.Rent(_bodyLengthBytes);
try
{
int index = 0;
foreach (var item in bodyLengthSequence)
{
Array.Copy(item.ToArray(), 0, bodyLengthBytes, index, item.Length);
index += item.Length;
}
long bodyLength = 0;
int offset = 0;
if (!_isLittleEndian)
{
offset = bodyLengthBytes.Length - 1;
foreach (var bytes in bodyLengthBytes)
{
bodyLength += bytes << (offset * 8);
offset--;
}
}
else
{
foreach (var bytes in bodyLengthBytes)
{
bodyLength += bytes << (offset * 8);
offset++;
}
}
if (sequence.Length < _headerSize + bodyLength)
return default;
var endPosition = sequence.GetPosition(_headerSize + bodyLength);
var data = sequence.Slice(0, endPosition);//得到完整數(shù)據(jù)包
sequence = sequence.Slice(endPosition);//緩沖區(qū)中去除取到的完整包
return data;
}
finally
{
ArrayPool<byte>.Shared.Return(bodyLengthBytes);
}
}以上就是實(shí)現(xiàn)了固定數(shù)據(jù)包頭實(shí)現(xiàn)粘包斷包處理的部分代碼。
關(guān)于TCP的連接還有一些,比如客戶端連接限制,空閑連接關(guān)閉等。
到此這篇關(guān)于C#處理TCP數(shù)據(jù)的方法詳解的文章就介紹到這了,更多相關(guān)C#處理TCP數(shù)據(jù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
基于mvc5+ef6+Bootstrap框架實(shí)現(xiàn)身份驗(yàn)證和權(quán)限管理
最近剛做完一個(gè)項(xiàng)目,項(xiàng)目架構(gòu)師使用mvc5+ef6+Bootstrap,用的是vs2015,數(shù)據(jù)庫是sql server2014。下面小編把mvc5+ef6+Bootstrap項(xiàng)目心得之身份驗(yàn)證和權(quán)限管理模塊的實(shí)現(xiàn)思路分享給大家,需要的朋友可以參考下2016-06-06
WinForm使用DataGridView實(shí)現(xiàn)類似Excel表格的查找替換功能
這篇文章主要介紹了WinForm使用DataGridView實(shí)現(xiàn)類似Excel表格的查找替換功能,現(xiàn)在小編通過本文給大家分享查找替換實(shí)現(xiàn)過程,需要的朋友可以參考下2021-07-07
WPF實(shí)現(xiàn)繪制餅狀統(tǒng)計(jì)圖的示例代碼
這篇文章主要為大家詳細(xì)介紹了如何使用WPF實(shí)現(xiàn)繪制簡單的餅狀統(tǒng)計(jì)圖,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2024-10-10
利用MySqlBulkLoader實(shí)現(xiàn)批量插入數(shù)據(jù)的示例詳解
MySQLBulkLoader是MySQL?Connector/Net類中的一個(gè)類,用于包裝MySQL語句。本文將利用MySqlBulkLoader實(shí)現(xiàn)批量插入數(shù)據(jù)功能,感興趣的可以了解一下2022-06-06

