Netty解決 TCP 粘包拆包的方法
什么是粘包/拆包
一般所謂的TCP粘包是在一次接收數(shù)據(jù)不能完全地體現(xiàn)一個完整的消息數(shù)據(jù)。TCP通訊為何存在粘包呢?主要原因是TCP是以流的方式來處理數(shù)據(jù),再加上網(wǎng)絡(luò)上MTU的往往小于在應(yīng)用處理的消息數(shù)據(jù),所以就會引發(fā)一次接收的數(shù)據(jù)無法滿足消息的需要,導(dǎo)致粘包的存在。處理粘包的唯一方法就是制定應(yīng)用層的數(shù)據(jù)通訊協(xié)議,通過協(xié)議來規(guī)范現(xiàn)有接收的數(shù)據(jù)是否滿足消息數(shù)據(jù)的需要。
我們都知道TCP是基于字節(jié)流的傳輸協(xié)議。
那么數(shù)據(jù)在通信層傳播其實就像河水一樣并沒有明顯的分界線,而數(shù)據(jù)具體表示什么意思什么地方有句號什么地方有分號這個對于TCP底層來說并不清楚。應(yīng)用層向TCP層發(fā)送用于網(wǎng)間傳輸?shù)?、?位字節(jié)表示的數(shù)據(jù)流,然后TCP把數(shù)據(jù)流分區(qū)成適當(dāng)長度的報文段,之后TCP把結(jié)果包傳給IP層,由它來通過網(wǎng)絡(luò)將包傳送給接收端實體的TCP層。
所以對于這個數(shù)據(jù)拆分成大包小包的問題就是我們今天要講的粘包和拆包的問題。
1、TCP粘包拆包問題說明
粘包和拆包這兩個概念估計大家還不清楚,通過下面這張圖我們來分析一下:
假設(shè)客戶端分別發(fā)送兩個數(shù)據(jù)包D1,D2個服務(wù)端,但是發(fā)送過程中數(shù)據(jù)是何種形式進(jìn)行傳播這個并不清楚,分別有下列4種情況:
- 服務(wù)端一次接受到了D1和D2兩個數(shù)據(jù)包,兩個包粘在一起,稱為粘包;
- 服務(wù)端分兩次讀取到數(shù)據(jù)包D1和D2,沒有發(fā)生粘包和拆包;
- 服務(wù)端分兩次讀到了數(shù)據(jù)包,第一次讀到了D1和D2的部分內(nèi)容,第二次讀到了D2的剩下部分,這個稱為拆包;
- 服務(wù)器分三次讀到了數(shù)據(jù)部分,第一次讀到了D1包,第二次讀到了D2包的部分內(nèi)容,第三次讀到了D2包的剩下內(nèi)容。
2、TCP粘包產(chǎn)生原因
我們知道在TCP協(xié)議中,應(yīng)用數(shù)據(jù)分割成TCP認(rèn)為最適合發(fā)送的數(shù)據(jù)塊,這部分是通過“MSS”(最大數(shù)據(jù)包長度)選項來控制的,通常這種機制也被稱為一種協(xié)商機制,MSS規(guī)定了TCP傳往另一端的最大數(shù)據(jù)塊的長度。這個值TCP協(xié)議在實現(xiàn)的時候往往用MTU值代替(需要減去IP數(shù)據(jù)包包頭的大小20Bytes和TCP數(shù)據(jù)段的包頭20Bytes)所以往往MSS為1460。通訊雙方會根據(jù)雙方提供的MSS值得最小值確定為這次連接的最大MSS值。
tcp為提高性能,發(fā)送端會將需要發(fā)送的數(shù)據(jù)發(fā)送到緩沖區(qū),等待緩沖區(qū)滿了之后,再將緩沖中的數(shù)據(jù)發(fā)送到接收方。同理,接收方也有緩沖區(qū)這樣的機制,來接收數(shù)據(jù)。
發(fā)生粘包拆包的原因主要有以下這些:
- 應(yīng)用程序?qū)懭霐?shù)據(jù)的字節(jié)大小大于套接字發(fā)送緩沖區(qū)的大小將發(fā)生拆包;
- 進(jìn)行MSS大小的TCP分段。MSS是TCP報文段中的數(shù)據(jù)字段的最大長度,當(dāng)TCP報文長度-TCP頭部長度>mss的時候?qū)l(fā)生拆包;
- 應(yīng)用程序?qū)懭霐?shù)據(jù)小于套接字緩沖區(qū)大小,網(wǎng)卡將應(yīng)用多次寫入的數(shù)據(jù)發(fā)送到網(wǎng)絡(luò)上,將發(fā)生粘包;
- 數(shù)據(jù)包大于MTU的時候?qū)M(jìn)行切片。MTU即(Maxitum Transmission Unit) 最大傳輸單元,由于以太網(wǎng)傳輸電氣方面的限制,每個以太網(wǎng)幀都有最小的大小64bytes最大不能超過1518bytes,刨去以太網(wǎng)幀的幀頭14Bytes和幀尾CRC校驗部分4Bytes,那么剩下承載上層協(xié)議的地方也就是Data域最大就只能有1500Bytes這個值我們就把它稱之為MTU。這個就是網(wǎng)絡(luò)層協(xié)議非常關(guān)心的地方,因為網(wǎng)絡(luò)層協(xié)議比如IP協(xié)議會根據(jù)這個值來決定是否把上層傳下來的數(shù)據(jù)進(jìn)行分片。
3、如何解決TCP粘包拆包
我們知道tcp是無界的數(shù)據(jù)流,且協(xié)議本身無法避免粘包,拆包的發(fā)生,那我們只能在應(yīng)用層數(shù)據(jù)協(xié)議上,加以控制。通常在制定傳輸數(shù)據(jù)時,可以使用如下方法:
- 設(shè)置定長消息,服務(wù)端每次讀取既定長度的內(nèi)容作為一條完整消息;
- 使用帶消息頭的協(xié)議、消息頭存儲消息開始標(biāo)識及消息長度信息,服務(wù)端獲取消息頭的時候解析出消息長度,然后向后讀取該長度的內(nèi)容;
- 設(shè)置消息邊界,服務(wù)端從網(wǎng)絡(luò)流中按消息邊界分離出消息內(nèi)容。比如在消息末尾加上換行符用以區(qū)分消息結(jié)束。
當(dāng)然應(yīng)用層還有更多復(fù)雜的方式可以解決這個問題,這個就屬于網(wǎng)絡(luò)層的問題了,我們還是用java提供的方式來解決這個問題。我們先看一個例子看看粘包是如何發(fā)生的。
服務(wù)端:
public class HelloWordServer { private int port; public HelloWordServer(int port) { this.port = port; } public void start(){ EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ServerChannelInitializer()); try { ChannelFuture future = server.bind(port).sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } public static void main(String[] args) { HelloWordServer server = new HelloWordServer(7788); server.start(); } }
服務(wù)端Initializer:
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); // 字符串解碼 和 編碼 pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); // 自己的邏輯Handler pipeline.addLast("handler", new HelloWordServerHandler()); } }
服務(wù)端handler:
public class HelloWordServerHandler extends ChannelInboundHandlerAdapter { private int counter; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String)msg; System.out.println("server receive order : " + body + ";the counter is: " + ++counter); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); } }
客戶端:
public class HelloWorldClient { private int port; private String address; public HelloWorldClient(int port,String address) { this.port = port; this.address = address; } public void start(){ EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ClientChannelInitializer()); try { ChannelFuture future = bootstrap.connect(address,port).sync(); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); }finally { group.shutdownGracefully(); } } public static void main(String[] args) { HelloWorldClient client = new HelloWorldClient(7788,"127.0.0.1"); client.start(); } }
客戶端Initializer:
public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> { protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); // 客戶端的邏輯 pipeline.addLast("handler", new HelloWorldClientHandler()); } }
客戶端handler:
public class HelloWorldClientHandler extends ChannelInboundHandlerAdapter { private byte[] req; private int counter; public BaseClientHandler() { req = ("Unless required by applicable law or agreed to in writing, software\n" + " distributed under the License is distributed on an \"AS IS\" BASIS,\n" + " WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n" + " See the License for the specific language governing permissions and\n" + " limitations under the License.This connector uses the BIO implementation that requires the JSSE\n" + " style configuration. When using the APR/native implementation, the\n" + " penSSL style configuration is required as described in the APR/native\n" + " documentation.An Engine represents the entry point (within Catalina) that processes\n" + " every request. The Engine implementation for Tomcat stand alone\n" + " analyzes the HTTP headers included with the request, and passes them\n" + " on to the appropriate Host (virtual host)# Unless required by applicable law or agreed to in writing, software\n" + "# distributed under the License is distributed on an \"AS IS\" BASIS,\n" + "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n" + "# See the License for the specific language governing permissions and\n" + "# limitations under the License.# For example, set the org.apache.catalina.util.LifecycleBase logger to log\n" + "# each component that extends LifecycleBase changing state:\n" + "#org.apache.catalina.util.LifecycleBase.level = FINE" ).getBytes(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf message; //將上面的所有字符串作為一個消息體發(fā)送出去 message = Unpooled.buffer(req.length); message.writeBytes(req); ctx.writeAndFlush(message); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String buf = (String)msg; System.out.println("Now is : " + buf + " ; the counter is : "+ (++counter)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
運行客戶端和服務(wù)端我們能看到:
我們看到這個長長的字符串被截成了2段發(fā)送,這就是發(fā)生了拆包的現(xiàn)象。同樣粘包我們也很容易去模擬,我們把BaseClientHandler中的channelActive方法里面的:
message = Unpooled.buffer(req.length); message.writeBytes(req); ctx.writeAndFlush(message);
這幾行代碼是把我們上面的一長串字符轉(zhuǎn)成的byte數(shù)組寫進(jìn)流里發(fā)送出去,那么我們可以在這里把上面發(fā)送消息的這幾行循環(huán)幾遍這樣發(fā)送的內(nèi)容增多了就有可能在拆包的時候把上一條消息的一部分分配到下一條消息里面了,修改如下:
for (int i = 0; i < 3; i++) { message = Unpooled.buffer(req.length); message.writeBytes(req); ctx.writeAndFlush(message); }
改完之后我們再運行一下,輸出太長不好截圖,我們在輸出結(jié)果中能看到循環(huán)3次之后的消息服務(wù)端收到的就不是之前的完整的一條了,而是被拆分了4次發(fā)送。
對于上面出現(xiàn)的粘包和拆包的問題,Netty已有考慮,并且有實施的方案:LineBasedFrameDecoder。
我們重新改寫一下ServerChannelInitializer:
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new LineBasedFrameDecoder(2048)); // 字符串解碼 和 編碼 pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); // 自己的邏輯Handler pipeline.addLast("handler", new BaseServerHandler()); } }
新增:pipeline.addLast(new LineBasedFrameDecoder(2048))。同時,我們還得對上面發(fā)送的消息進(jìn)行改造BaseClientHandler:
public class BaseClientHandler extends ChannelInboundHandlerAdapter { private byte[] req; private int counter; req = ("Unless required by applicable dfslaw or agreed to in writing, software" + " distributed under the License is distributed on an \"AS IS\" BASIS," + " WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied." + " See the License for the specific language governing permissions and" + " limitations under the License.This connector uses the BIO implementation that requires the JSSE" + " style configuration. When using the APR/native implementation, the" + " penSSL style configuration is required as described in the APR/native" + " documentation.An Engine represents the entry point (within Catalina) that processes" + " every request. The Engine implementation for Tomcat stand alone" + " analyzes the HTTP headers included with the request, and passes them" + " on to the appropriate Host (virtual host)# Unless required by applicable law or agreed to in writing, software" + "# distributed under the License is distributed on an \"AS IS\" BASIS," + "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied." + "# See the License for the specific language governing permissions and" + "# limitations under the License.# For example, set the org.apache.catalina.util.LifecycleBase logger to log" + "# each component that extends LifecycleBase changing state:" + "#org.apache.catalina.util.LifecycleBase.level = FINE\n" ).getBytes(); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf message; message = Unpooled.buffer(req.length); message.writeBytes(req); ctx.writeAndFlush(message); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String buf = (String)msg; System.out.println("Now is : " + buf + " ; the counter is : "+ (++counter)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
去掉所有的”\n”,只保留字符串末尾的這一個。原因稍后再說。channelActive方法中我們不必再用循環(huán)多次發(fā)送消息了,只發(fā)送一次就好(第一個例子中發(fā)送一次的時候是發(fā)生了拆包的),然后我們再次運行,大家會看到這么長一串字符只發(fā)送了一串就發(fā)送完畢。程序輸出我就不截圖了。下面來解釋一下LineBasedFrameDecoder。
LineBasedFrameDecoder的工作原理是它依次遍歷ByteBuf 中的可讀字節(jié),判斷看是否有”\n” 或者” \r\n”,如果有,就以此位置為結(jié)束位置,從可讀索引到結(jié)束位置區(qū)間的字節(jié)就組成了一行。它是以換行符為結(jié)束標(biāo)志的解碼器。支持?jǐn)y帶結(jié)束符或者不攜帶結(jié)束符兩種解碼方式,同時支持配置單行的最大長度。如果連續(xù)讀取到最大長度后仍然沒有發(fā)現(xiàn)換行符,就會拋出異常,同時忽略掉之前讀到的異常碼流。這個對于我們確定消息最大長度的應(yīng)用場景還是很有幫助。
對于上面的判斷看是否有”\n” 或者” \r\n”以此作為結(jié)束的標(biāo)志我們可能回想,要是沒有”\n” 或者” \r\n”那還有什么別的方式可以判斷消息是否結(jié)束呢。別擔(dān)心,Netty對于此已經(jīng)有考慮,還有別的解碼器可以幫助我們解決問題,
到此這篇關(guān)于Netty解決 TCP 粘包拆包的方法的文章就介紹到這了,更多相關(guān)Netty解決 TCP 粘包拆包內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot整合Shiro兩種方式(總結(jié))
這篇文章主要介紹了SpringBoot整合Shiro兩種方式,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-06-06java 配置MyEclipse Maven環(huán)境具體實現(xiàn)步驟
這篇文章主要介紹了 java 配置MyEclipse Maven環(huán)境具體實現(xiàn)步驟的相關(guān)資料,具有一定的參考價值,需要的朋友可以參考下2016-11-11總結(jié)Java常用加解密方法AES?SHA1?md5
這篇文章主要為大家介紹了Java常用加密方法AES?SHA1?md5總結(jié)及示例demo,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-06-06SpringBoot如何使用Undertow做服務(wù)器
這篇文章主要介紹了SpringBoot如何使用Undertow做服務(wù)器,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-07-07