Netty開發(fā)及粘包實(shí)戰(zhàn)解決分析
1. Netty介紹
Netty是一款開源的Java網(wǎng)絡(luò)編程框架,廣泛應(yīng)用于很多高流量的服務(wù)器端應(yīng)用程序:
- 異步和事件驅(qū)動(dòng):Netty基于NIO(非阻塞I/O)構(gòu)建,操作都是異步回調(diào)來觸發(fā)事件,如連接建立、數(shù)據(jù)到達(dá)等。
- 高性能:Netty的一大優(yōu)點(diǎn)就是高性能。它的設(shè)計(jì)能夠讓你最大限度地利用現(xiàn)代的多核硬件。
- 靈活的協(xié)議支持:Netty支持各種協(xié)議,包括TCP、UDP、HTTP/HTTPS、Unix Socket、WebSockets等。
- 零拷貝:Netty支持“零拷貝”,這可以減少不必要的系統(tǒng)調(diào)用,顯著提高數(shù)據(jù)處理性能。
Netty 目前最新版本是 4.1.95Final
很久之前 Netty就發(fā)布了 5 的測(cè)試版本,市場(chǎng)上都有很多介紹 Netty5 的書在賣了,但可惜問題太多,最終廢棄了,目前依然只維護(hù) 4 的版本。
1.1. 組件
1.1.1. EventLoopGroup
EventLoopGroup 是一個(gè)線程池,用于管理和調(diào)度 EventLoop 對(duì)象。在 Netty 中,每個(gè) EventLoopGroup 有一個(gè)或多個(gè) EventLoop,用于處理連接請(qǐng)求和 I/O 操作,而每個(gè)EventLoop是單線程的。
所以Netty可以通過EventLoopGroup的構(gòu)造調(diào)參,來實(shí)現(xiàn)不同的Reactor模型:
(1)既可也是單Reactor單線程模型:
EventLoopGroup group = new NioEventLoopGroup(1); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group)
(2)也可以是 主從Reactor多線程模型:
EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(n); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup)
主從分工
- BossEventLoopGroup:負(fù)責(zé)接收客戶端的連接請(qǐng)求。并將連接請(qǐng)求分配給workerEventLoopGroup中的某個(gè)EventLoop進(jìn)行處理。BossGroup中通常只需一個(gè)EventLoop;
- WorkerEventLoopGroup:負(fù)責(zé)處理服務(wù)器端的連接和數(shù)據(jù)讀寫操作。每個(gè)EventLoop都綁定在一個(gè)具體的線程上,在運(yùn)行過程中只處理該線程所監(jiān)聽的IO事件。 workerGroup通常需要多個(gè)EventLoop。
1.1.2. EventLoop
EventLoop 則是事件循環(huán)的核心,負(fù)責(zé)監(jiān)聽和處理 Channel 中的事件和 I/O 操作。在 EventLoopGroup 中,每個(gè) EventLoop 都是獨(dú)立的,可以并發(fā)地處理多個(gè) Channel 上的事件和 I/O 操作。
1.1.3. Channel 和 ByteBuf
定義
- Channel:代表了一個(gè)網(wǎng)絡(luò)通道,可以用于進(jìn)行網(wǎng)絡(luò)通信。通過使用 Channel,我們可以連接到遠(yuǎn)程服務(wù)器、發(fā)送和接收數(shù)據(jù)等。
- ByteBuf:則是用于管理和操作數(shù)據(jù)的緩沖區(qū),通過使用 ByteBuf,我們可以進(jìn)行數(shù)據(jù)的讀、寫、復(fù)制、切片、合并等操作。
搭配使用
在 Netty 中,Channel 和 ByteBuf 是緊密結(jié)合的,通常一次數(shù)據(jù)傳輸會(huì)涉及到兩個(gè) Channel 和兩個(gè) ByteBuf 對(duì)象,分別代表了發(fā)送端和接收端的數(shù)據(jù)緩沖區(qū)。以下是 Channel 和 ByteBuf 的搭配使用流程:
- 創(chuàng)建 Channel:首先,我們需要?jiǎng)?chuàng)建一個(gè) Channel 對(duì)象,用于表示一個(gè)網(wǎng)絡(luò)通道??梢酝ㄟ^ Bootstrap 或 ServerBootstrap 類來創(chuàng)建 Channel 對(duì)象,并配置其參數(shù)和屬性。
- 寫入數(shù)據(jù):當(dāng)需要向遠(yuǎn)程服務(wù)器發(fā)送數(shù)據(jù)時(shí),我們需要先將數(shù)據(jù)寫入到 ByteBuf 對(duì)象中,然后將 ByteBuf 對(duì)象寫入到 Channel 對(duì)象中。在寫入數(shù)據(jù)時(shí),可以通過 write() 或 writeAndFlush() 方法來實(shí)現(xiàn)。
- 讀取數(shù)據(jù):當(dāng)遠(yuǎn)程服務(wù)器發(fā)送數(shù)據(jù)時(shí),我們需要通過 Channel 對(duì)象來讀取數(shù)據(jù)。讀取數(shù)據(jù)時(shí),Channel 對(duì)象會(huì)將數(shù)據(jù)存儲(chǔ)到 ByteBuf 對(duì)象中,我們可以通過 read() 方法來獲取數(shù)據(jù)或數(shù)據(jù)大小。在讀取數(shù)據(jù)之后,我們需要及時(shí)釋放 ByteBuf 對(duì)象,以便避免內(nèi)存泄漏和內(nèi)存溢出等問題。
- 釋放資源:當(dāng)數(shù)據(jù)傳輸完成后,我們需要釋放 Channel 和 ByteBuf 對(duì)象的資源。在 Netty 中,Channel 和 ByteBuf 對(duì)象都需要顯式地釋放資源,以避免內(nèi)存泄漏和內(nèi)存溢出等問題。可以通過 release() 方法來釋放 ByteBuf 對(duì)象,通過 close() 方法來釋放 Channel 對(duì)象。
1.1.4. ChannelPipeline 和 Channel
定義
- Channel:對(duì)象表示一個(gè)通信通道,可以進(jìn)行數(shù)據(jù)的讀寫和事件的觸發(fā)等操作。
- ChannelPipeline:則是一個(gè)事件處理器的鏈表,用于處理 Channel 中的事件和數(shù)據(jù)。
每個(gè) Channel 都有一個(gè)關(guān)聯(lián)的 ChannelPipeline 對(duì)象,當(dāng)有事件發(fā)生時(shí),Netty 會(huì)將事件從 Channel 中傳遞到 ChannelPipeline 中,然后按照順序依次觸發(fā)各個(gè)事件處理器 ChannelHandler 的邏輯。當(dāng)事件處理完畢后,Netty 會(huì)將處理結(jié)果返回到 Channel 中,以便進(jìn)行數(shù)據(jù)的讀寫等操作。
在 ChannelPipeline 中,可以添加多個(gè)事件處理器,用于處理不同類型的事件和數(shù)據(jù)。例如,可以添加一個(gè)消息解碼器、一個(gè)消息編碼器、一個(gè)業(yè)務(wù)邏輯處理器等。每個(gè)事件處理器都可以進(jìn)行特定的邏輯處理,并將處理結(jié)果傳遞給下一個(gè)事件處理器。
1.2. 網(wǎng)絡(luò)協(xié)議
Netty是一個(gè)非常強(qiáng)大和靈活的網(wǎng)絡(luò)編程框架,它支持多種通信協(xié)議。以下是一些Netty支持的通信協(xié)議:
- TCP/IP 和 UDP/IP:Netty 提供了底層的網(wǎng)絡(luò)通信支持,可以構(gòu)建基于TCP/IP或UDP/IP的應(yīng)用。
- HTTP/HTTPS and HTTP/2:Netty 提供了HTTP、HTTPS以及HTTP/2的高級(jí)支持。
- WebSocket:Netty 支持 WebSocket,允許 Web 瀏覽器和服務(wù)器之間進(jìn)行全雙工通信。
- Google Protobuf:Netty 為 Google 的 Protobuf 序列化庫提供了支持。
- SSL/TLS:通過JDK的Secure Socket Extension (JSSE),Netty 支持 SSL/TLS 實(shí)現(xiàn)安全通信。
- Unix Domain Socket:從 Netty 4.1版本開始,Netty也開始支持 Unix Domain Socket。
因?yàn)?Netty 支持的網(wǎng)絡(luò)協(xié)議豐富,所以當(dāng)有非Http協(xié)議網(wǎng)絡(luò)通信的需求時(shí),大家第一時(shí)間會(huì)想到 Netty。
2. 代碼示例
2.1. 基于tcp協(xié)議
2.1.1.服務(wù)端
pom
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.63.Final</version> </dependency>
服務(wù)端
@Component public class NettyServer { // 創(chuàng)建兩個(gè)線程組,分別用于接收客戶端連接和處理網(wǎng)絡(luò)IO操作 private final EventLoopGroup bossGroup = new NioEventLoopGroup(); private final EventLoopGroup workerGroup = new NioEventLoopGroup(); @PostConstruct public void start() throws InterruptedException { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) // 指定使用 NioServerSocketChannel 作為通道實(shí)現(xiàn) .channel(NioServerSocketChannel.class) // 定義 ChannelPipeline(多個(gè)ChannelHandler組合) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8)); ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8)); ch.pipeline().addLast(new ServerHandler()); } }); // 綁定端口,開始接收進(jìn)來的連接 ChannelFuture f = b.bind(8080).sync(); if (f.isSuccess()) { System.out.println("啟動(dòng)Netty服務(wù)成功,端口號(hào):" + 8080); } } @PreDestroy public void shutdown() { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
ChannelHandler 消息處理
public class ServerHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println("Received message from client: " + msg); // 回復(fù)消息給客戶端 //ctx.writeAndFlush("Received your message: " + msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
2.1.2.客戶端
客戶端
@DependsOn({"nettyServer"}) @Component public class NettyClient { private EventLoopGroup group; private Channel channel; @PostConstruct public void start() throws InterruptedException { group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) { ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8)); ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8)); socketChannel.pipeline().addLast(new ClientHandler()); } }); ChannelFuture future = bootstrap.connect("127.0.0.1", 8080).sync(); if (future.isSuccess()) { System.out.println("連接服務(wù)器成功"); } channel = future.channel(); } @PreDestroy public void destroy() { if (group != null) { group.shutdownGracefully(); } }
ChannelHandler 消息處理
public class ClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) { System.out.println("Server response:" + msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
2.2. 基于Unix Socket協(xié)議
其他的不變,這里只關(guān)注客戶服務(wù)端代碼。
2.2.1. 代碼
服務(wù)端
private final EventLoopGroup bossGroup = new KQueueEventLoopGroup(); private final EventLoopGroup workerGroup = new KQueueEventLoopGroup(); @PostConstruct public void start() throws InterruptedException { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(KQueueServerDomainSocketChannel.class) .childHandler(new ChannelInitializer<KQueueDomainSocketChannel>() { @Override public void initChannel(KQueueDomainSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8)); ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8)); ch.pipeline().addLast(new ServerHandler()); } }); ChannelFuture f = b.bind(new DomainSocketAddress("/tmp/test.sock")).sync(); if (f.isSuccess()) { System.out.println("啟動(dòng)Netty服務(wù)成功,文件:" + "/tmp/test.sock"); } }
客戶端
public void start() throws InterruptedException { group = new KQueueEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(KQueueDomainSocketChannel.class) .handler(new ChannelInitializer<KQueueDomainSocketChannel>() { @Override protected void initChannel(KQueueDomainSocketChannel socketChannel) { socketChannel.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8)); socketChannel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8)); socketChannel.pipeline().addLast(new ClientHandler()); } }); ChannelFuture future = bootstrap.connect(new DomainSocketAddress("/tmp/test.sock")).sync(); if (future.isSuccess()) { System.out.println("連接服務(wù)器成功"); } channel = future.channel(); }
2.2.2. 分析
Unix Socket協(xié)議
Unix Domain Socket(簡稱UDS)是一個(gè)用于實(shí)現(xiàn)本地進(jìn)程間通信的協(xié)議。與使用網(wǎng)絡(luò)套接字(socket)進(jìn)行通信不同,UDS僅用于同一臺(tái)機(jī)器上的相鄰進(jìn)程之間的通信。
在Unix/Linux系統(tǒng)中,UDS通常被用于代替TCP/IP套接字來提高性能和安全性。不過它們可以通過文件系統(tǒng)路徑來建立連接,不能跨機(jī)器通信。
Netty中協(xié)議切換
通過對(duì)比上述代碼,可以看出netty中切換協(xié)議是比較簡單的,換成對(duì)應(yīng)的 Channel 實(shí)現(xiàn)類,以及連接方式就可以了。
因?yàn)槭莔ac中運(yùn)行,示例代碼中用KQueueDomainSocketChannel替代DomainSocketChannel
2.3. 測(cè)試
Controller發(fā)消息
@RestController public class MsgController { @Autowired private NettyClient nettyClient; @PostMapping("/send") public ResponseEntity<Void> sendMsg(@RequestBody String msg) { System.out.println(msg.getBytes(StandardCharsets.UTF_8).length); try { for (int i = 0; i < 1000; i++) { nettyClient.send(msg); } return new ResponseEntity<>(HttpStatus.OK); } catch (Exception e) { return new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR); } } }
測(cè)試結(jié)果
前面已經(jīng)基于 TCP協(xié)議寫好了netty的客戶端、服務(wù)端,
現(xiàn)在寫接口,可以通過客戶端給服務(wù)端發(fā)消息,不過單次調(diào)用會(huì)一次性發(fā)1000遍。
接口調(diào)用傳入:hello
預(yù)期結(jié)果:
Received message from client: hello
Received message from client: hello
... ... // 同樣輸出1000遍
實(shí)際結(jié)果:
Received message from client: hello
Received message from client: hellohello
Received message from client: hellohe
Received message from client: llohellohellohellohello
Received message from client: hellohellohello
... ... // 無規(guī)則
出現(xiàn)問題的原因就是下一章要將的粘包、拆包問題。
3. 粘包、拆包問題
3.1. 問題分析
3.1.1. tcp協(xié)議出現(xiàn)問題的原因
粘包/拆包問題是由TCP協(xié)議本身造成的,和Netty本身無關(guān),任何基于TCP協(xié)議實(shí)現(xiàn)數(shù)據(jù)傳輸?shù)募夹g(shù)都會(huì)面臨這個(gè)問題,原因如下:
- 應(yīng)用程序?qū)懭霐?shù)據(jù)的字節(jié)大小大于套接字發(fā)送緩沖區(qū)的大小:這種情況下,會(huì)發(fā)生拆包現(xiàn)象,發(fā)送方的TCP協(xié)議棧會(huì)把一次應(yīng)用程序的發(fā)送操作分成多個(gè)數(shù)據(jù)段進(jìn)行發(fā)送。
- 進(jìn)行了多次寫入操作,但數(shù)據(jù)沒有被及時(shí)發(fā)送出去:這可能是由于TCP的Nagle算法造成的。
- 應(yīng)用程序讀取操作不及時(shí):如果接收方面的應(yīng)用層沒有及時(shí)讀取接收緩沖區(qū)的數(shù)據(jù),造成堆積,從而形成一個(gè)大的數(shù)據(jù)塊。如果此時(shí)應(yīng)用層進(jìn)行數(shù)據(jù)讀取,就容易讀取到多個(gè)TCP數(shù)據(jù)段的數(shù)據(jù),形成了粘包現(xiàn)象。
- 網(wǎng)絡(luò)環(huán)境等硬件問題:如網(wǎng)絡(luò)延遲、抖動(dòng)等,也可能導(dǎo)致多個(gè)小的數(shù)據(jù)包合并為一個(gè)大包進(jìn)行傳送,從而導(dǎo)致粘包。
解決粘包和拆包問題的基本策略就是在應(yīng)用層引入數(shù)據(jù)邊界。常見的方法有:固定長度、分隔符、在包頭中加入長度字段等。
3.1.2. 其他協(xié)議為什么沒問題
HTTP 協(xié)議
HTTP 協(xié)議 基于 TCP 協(xié)議構(gòu)建,而 TCP 是一種面向流的協(xié)議,所以理論上可能會(huì)有粘包問題。但是在實(shí)際應(yīng)用中,HTTP 協(xié)議已經(jīng)做了明確的分包處理,因此通常不需要開發(fā)者去處理粘包問題,HTTP 使用了一些特定的方式來定義數(shù)據(jù)包的邊界:
對(duì)于 HTTP/1.0 和 HTTP/1.1,一次完整的 HTTP 交互由一個(gè)請(qǐng)求和一個(gè)響應(yīng)組成,它們都是相對(duì)獨(dú)立的。請(qǐng)求和響應(yīng)都有明確的開始行(請(qǐng)求行或狀態(tài)行)和結(jié)束標(biāo)志(如 Content-Length 頭或 chunked 編碼表示的消息體長度)。這樣可以很清楚地知道報(bào)文的開始和結(jié)束,避免了粘包問題。
對(duì)于 HTTP/2,它引入了二進(jìn)制幀的概念。每個(gè)幀有明確的長度和類型,這也使得在接收端可以準(zhǔn)確地解析出各個(gè)幀,避免粘包問題。
UDP 協(xié)議
UDP 協(xié)議 是一種無連接的、不可靠的協(xié)議,它并沒有像TCP協(xié)議那樣提供流量控制和擁塞控制等功能,因此在傳輸過程中可能會(huì)出現(xiàn)丟包或亂序等問題。由于UDP協(xié)議采用數(shù)據(jù)報(bào)方式進(jìn)行傳輸,每個(gè)UDP數(shù)據(jù)報(bào)都有獨(dú)立的頭部標(biāo)識(shí),因此不會(huì)出現(xiàn)粘包問題。
WebSocket 協(xié)議
WebSocket 協(xié)議 建立連接后,客戶端和服務(wù)器之間會(huì)保持長時(shí)間的連接狀態(tài),可以隨時(shí)發(fā)送和接收數(shù)據(jù)。當(dāng)服務(wù)器發(fā)送數(shù)據(jù)時(shí),會(huì)將數(shù)據(jù)封裝到一個(gè)完整的WebSocket幀中,并通過TCP協(xié)議進(jìn)行傳輸。而客戶端收到數(shù)據(jù)后,會(huì)從WebSocket幀中解析出數(shù)據(jù),并進(jìn)行相應(yīng)處理。這樣就避免了TCP協(xié)議中的“粘包”和“拆包”問題。
3.1.3. Unix Socket 為什么也有問題
Unix Socket(也被稱為 Unix Domain Socket,UDS)主要支持以下兩種類型的通信協(xié)議:
- 流式協(xié)議 (SOCK_STREAM): 類似于 TCP,在發(fā)送和接收數(shù)據(jù)時(shí)提供了字節(jié)流服務(wù)。數(shù)據(jù)在兩個(gè)方向上都是有序的,并且不會(huì)重復(fù)或者丟失。這種模式下,一端發(fā)送的數(shù)據(jù)順序和另一端接收的數(shù)據(jù)順序是相同的。
- 數(shù)據(jù)報(bào)協(xié)議 (SOCK_DGRAM): 這種類型的 socket 提供了一種無需連接的、固定大小的消息服務(wù),類似于 UDP。每次讀操作都返回最多一條完整的消息;如果消息超出緩沖區(qū)的大小,那么該消息可能會(huì)被截?cái)唷?/li>
Unix Socket 的這兩種模式在行為上與 TCP 和 UDP 很相似。因此在基于 SOCK_STREAM 協(xié)議使用 Netty 開發(fā)服務(wù)端和客戶端時(shí),可能會(huì)出現(xiàn)類似粘包的問題。
前面有現(xiàn)成的基于Unix Stream協(xié)議實(shí)現(xiàn)的代碼,我們同樣調(diào)用接口試一下,發(fā)現(xiàn) Unix Socket 同樣會(huì)產(chǎn)生粘包問題
解決思路
結(jié)合HTTP、UDP、WebSocket 解決粘包/拆包問題的思路,同樣也可以推導(dǎo)解決TCP問題的思路:在發(fā)送數(shù)據(jù)時(shí),應(yīng)該設(shè)計(jì)一種協(xié)議來確定消息的邊界,比如:添加特殊的分隔符,或者在每個(gè)消息的頭部包含消息的長度等。
基于這個(gè)思路,Netty 框架提供了 LineBasedFrameDecoder、DelimiterBasedFrameDecoder和 LengthFieldBasedFrameDecoder等解決方案,下面一一介紹。
3.2. 解決方案
3.2.1. LineBasedFrameDecoder
使用行結(jié)束符作為數(shù)據(jù)包的分隔符。每條消息后面都有一個(gè)行結(jié)束符(例如 \n 或 \r\n),它會(huì)一直讀取字節(jié)直到遇到這個(gè)結(jié)束符,然后把之前讀取到的字節(jié)組裝成一條消息。
如果沒有找到行結(jié)束符,那么就認(rèn)為當(dāng)前還沒有讀取到完整的數(shù)據(jù)包,需要將已經(jīng)讀取到的字節(jié)保存起來,等待下次讀取。
代碼-客戶端修改
發(fā)送消息的方法中,每條消息結(jié)尾都加上行結(jié)束符后綴:
public void send(String msg) { if (channel != null) { channel.writeAndFlush(msg + "\\n"); } else { System.out.println("message sending failed, connection not established"); } }
代碼-服務(wù)端修改
在 ChannelPipeline 中加上下列解碼的 ChannelHandler:
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
局限性
- 固定的分隔符:主要是通過 \n 或 \r\n 來標(biāo)識(shí)一個(gè)完整的消息。這意味著如果你的協(xié)議中沒有使用這兩個(gè)字符作為結(jié)束標(biāo)記,或者這兩個(gè)字符在消息體中有特殊含義,則不能正確工作。
- 只支持文本數(shù)據(jù): 主要設(shè)計(jì)為處理文本協(xié)議。對(duì)于二進(jìn)制數(shù)據(jù),尤其是包含
\n
或\r\n
的二進(jìn)制數(shù)據(jù),可能會(huì)出現(xiàn)誤切割的情況。 - 無法處理大數(shù)據(jù)包: 如果一個(gè)非常大的數(shù)據(jù)塊在沒有任何分隔符的情況下被發(fā)送,會(huì)消耗大量內(nèi)存來存儲(chǔ)這些數(shù)據(jù),直到找到一個(gè)分隔符。這可能會(huì)導(dǎo)致內(nèi)存溢出問題。所以構(gòu)造方法中要設(shè)置 maxLength 參數(shù)(如示例中的 1024)。
3.2.2. DelimiterBasedFrameDecoder
解決方式
和LineBasedFrameDecoder類似,當(dāng)接收到數(shù)據(jù)時(shí),會(huì)檢查是否存在分隔符。如果存在,它就認(rèn)為已經(jīng)讀取到了一個(gè)完整的消息,并將這個(gè)消息傳遞給下一個(gè)ChannelHandler進(jìn)行處理。如果不存在,它將繼續(xù)等待,直到讀取到分隔符。
區(qū)別在于,前者的分隔符固定,而它的分隔符可以自定義。
代碼-客戶端修改
發(fā)送消息的方法中,每條消息結(jié)尾都加上行結(jié)束符后綴:
public void send(String msg) { if (channel != null) { channel.writeAndFlush(msg + "$_"); } else { System.out.println("message sending failed, connection not established"); } }
代碼-服務(wù)端修改
在 ChannelPipeline 中加上下列解碼的 ChannelHandler:
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
局限性
- 依賴于特定的分隔符:需要依賴特定的分隔符來判定一個(gè)消息的結(jié)束,但是在某些情況下,這樣的分隔符可能并不存在,或者不能很好地被應(yīng)用在該協(xié)議上。同樣可能出現(xiàn)誤判。
- 不適合二進(jìn)制協(xié)議: 由于DelimiterBasedFrameDecoder主要是針對(duì)文本協(xié)議設(shè)計(jì)的,所以在處理一些二進(jìn)制協(xié)議時(shí)可能會(huì)遇到困難。
- 內(nèi)存問題: 如果一個(gè)非常大的數(shù)據(jù)塊在沒有任何分隔符的情況下被發(fā)送,DelimiterBasedFrameDecoder可能會(huì)消耗過多的內(nèi)存來存儲(chǔ)這些數(shù)據(jù),直到找到一個(gè)分隔符。這可能會(huì)導(dǎo)致內(nèi)存溢出問題。所以也需要設(shè)置 maxFrameLength(如示例中的 1024)。
3.2.3. FixedLengthFrameDecoder
解決方式
工作原理主要是每次從 ByteBuf 中讀取固定長度的字節(jié),然后構(gòu)造成一個(gè)獨(dú)立的 frame 對(duì)象,傳遞給下一個(gè) handler 處理。
這樣可以確保不會(huì)因?yàn)?TCP 粘包導(dǎo)致多個(gè)消息被當(dāng)作一個(gè)消息處理,也不會(huì)因?yàn)?TCP 拆包導(dǎo)致一個(gè)消息被當(dāng)作多個(gè)消息處理。
代碼-服務(wù)端修改
在 ChannelPipeline 中加上下列解碼的 ChannelHandler:
ch.pipeline().addLast(new FixedLengthFrameDecoder(5));
因?yàn)閭鬏數(shù)膮?shù)“hello”是5個(gè)字節(jié),這類就固定為5.
局限性
- 固定長度限制: FixedLengthFrameDecoder 只能處理固定長度的消息,如果實(shí)際應(yīng)用中的消息長度不固定,那么就無法使用 FixedLengthFrameDecoder 進(jìn)行解碼。相應(yīng)地,如果消息長度小于固定長度,那么必須填充到固定長度,這就可能會(huì)浪費(fèi)帶寬。
- 無內(nèi)置校驗(yàn): FixedLengthFrameDecoder 僅僅是按照固定長度切分消息,它并不關(guān)心消息的完整性和正確性。如果你想對(duì)消息進(jìn)行校驗(yàn),需要自己實(shí)現(xiàn)。
3.2.4. LengthFieldBasedFrameDecoder
解決方式
- 長度字段標(biāo)識(shí): LengthFieldBasedFrameDecoder 解決粘包問題的方式主要是通過在數(shù)據(jù)包中添加一個(gè)表示后續(xù)數(shù)據(jù)長度的字段,這個(gè)字段的位置和長度可以由開發(fā)者自定義,解碼器會(huì)根據(jù)這個(gè)長度字段得知具體的消息體長度,然后進(jìn)行正確的截取。
- 校驗(yàn)讀取: 當(dāng)接收到新的數(shù)據(jù)包時(shí),解碼器首先找到長度字段,讀取出消息體的長度,然后等待足夠長度的數(shù)據(jù)到達(dá)后,再從 ByteBuf 中讀取,形成一個(gè)完整的消息幀。
- 消除半包讀取: 通過以上方式,LengthFieldBasedFrameDecoder 可以確保每次都能從 ByteBuf 中讀取到完整的消息幀,不會(huì)出現(xiàn)只讀取到半個(gè)消息幀的情況。
在網(wǎng)絡(luò)通信中,發(fā)送和接收數(shù)據(jù)需要遵循同一種協(xié)議。LengthFieldBasedFrameDecoder 是一個(gè)基于長度字段的解碼器,而 LengthFieldPrepender 則是一個(gè)對(duì)應(yīng)的編碼器,它會(huì)在消息體前面加上一個(gè)長度字段。
它們一般會(huì)配套使用,這樣發(fā)送端發(fā)送的數(shù)據(jù)和接收端接收的數(shù)據(jù)結(jié)構(gòu)就會(huì)保持一致,從而能夠正確地進(jìn)行解碼。
代碼-客戶端修改
添加ChannelHandler實(shí)現(xiàn),通過LengthFieldPrepender這個(gè)編碼器,在發(fā)送的消息前添加長度字段(這里的 4 是指長度字段本身占用的字節(jié)數(shù)量):
socketChannel.pipeline().addLast(new LengthFieldPrepender(4));
代碼-服務(wù)端修改
在 ChannelPipeline 中加上下列解碼的 ChannelHandler:
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
4. Netty特性優(yōu)化
4.1. 內(nèi)存池 PooledByteBufAllocator
內(nèi)存池是一種用于管理和復(fù)用內(nèi)存塊的技術(shù)??梢员苊忸l繁地分配和釋放內(nèi)存,從而減少系統(tǒng)開銷和內(nèi)存碎片問題,提高系統(tǒng)的效率和性能。
PooledByteBufAllocator(分配器 [?æl??ke?t?r]) 是 Netty 提供的一個(gè)基于內(nèi)存池的 ByteBuf 分配器。與直接創(chuàng)建新的 ByteBuf 實(shí)例相比,PooledByteBufAllocator 提供了重用內(nèi)存的能力,這可以顯著減少內(nèi)存分配和垃圾收集的開銷,提高性能:
- 內(nèi)存分區(qū):PooledByteBufAllocator 將內(nèi)存劃分為多個(gè) Arena,每個(gè) Arena 進(jìn)一步劃分為多個(gè) Chunk 和 Page。通過這種方式,PooledByteBufAllocator 能夠滿足不同大小的內(nèi)存需求,并且能夠快速找到合適的內(nèi)存塊進(jìn)行分配。
- 對(duì)象復(fù)用:當(dāng) ByteBuf 的引用計(jì)數(shù)為 0 時(shí),它的內(nèi)存會(huì)被返回到原來的 Arena 并可以被重用。這避免了頻繁創(chuàng)建和銷毀對(duì)象,降低了系統(tǒng)開銷。
- 線程本地緩存:PooledByteBufAllocator 使用了線程本地緩存技術(shù)(Thread Local Cache),每個(gè)線程都有自己的一份緩存池,這可以減少線程間的競(jìng)爭,進(jìn)一步提高性能。
- 內(nèi)存分配策略:對(duì)于小于 Page 大小的內(nèi)存分配請(qǐng)求,PooledByteBufAllocator 使用 jemalloc 策略進(jìn)行內(nèi)存分配。這是一種高效的內(nèi)存分配策略,能夠減少內(nèi)存碎片,提高內(nèi)存使用率。
通過這些方式,PooledByteBufAllocator 可以有效地復(fù)用內(nèi)存,提高了內(nèi)存使用的效率和性能。
PooledByteBufAllocator 創(chuàng)建 ByteBuf 過程
PooledByteBufAllocator allocator = new PooledByteBufAllocator(); // 分別分配堆內(nèi)存、堆外內(nèi)存,內(nèi)存大小也可以指定,如: allocator.heapBuffer(1024); ByteBuf heapBuffer = allocator.heapBuffer(); ByteBuf directBuffer = allocator.directBuffer(); // 正常將寫入數(shù)據(jù)或讀取 heapBuffer.writeBytes(data); byte b = heapBuffer.readByte(); // 記得不用時(shí)釋放內(nèi)存,堆外內(nèi)存不受垃圾回收,不釋放會(huì)有內(nèi)存泄露 heapBuffer.release(); directBuffer.release();
不過實(shí)際項(xiàng)目中,很少有見過通過創(chuàng)建 PooledByteBufAllocator,再創(chuàng)建 ByteBuf 的。
基本都是由 Unpooled 工具類 創(chuàng)建 ByteBuf。
創(chuàng)建:堆內(nèi)內(nèi)存 OR 堆外內(nèi)存?
(1)堆內(nèi)內(nèi)存:
如果你需要處理的數(shù)據(jù)比較小(比如幾 KB 或幾百 KB),而且需要進(jìn)行頻繁的讀寫操作,那么建議使用堆內(nèi)內(nèi)存。
(2)堆外內(nèi)存:
如果你需要處理的數(shù)據(jù)比較大(比如幾 MB 或幾十 MB),而且需要進(jìn)行頻繁的 IO 操作,那么建議使用堆外內(nèi)存。堆外內(nèi)存是由操作系統(tǒng)管理的,數(shù)據(jù)存儲(chǔ)在操作系統(tǒng)的內(nèi)存中,可以直接進(jìn)行 IO 操作。此外,在使用堆外內(nèi)存時(shí),可以避免 Java 堆和操作系統(tǒng)之間的數(shù)據(jù)拷貝,減少了系統(tǒng)的開銷和延遲。
需要注意的是,堆外內(nèi)存的申請(qǐng)和釋放需要調(diào)用 JNI 接口,因此申請(qǐng)和釋放堆外內(nèi)存的開銷會(huì)比較高。因此一般來說:
對(duì)于小規(guī)模的數(shù)據(jù)處理應(yīng)用,建議使用堆內(nèi)內(nèi)存;對(duì)于大規(guī)模的數(shù)據(jù)處理應(yīng)用,建議使用堆外內(nèi)存
4.2. 內(nèi)存池 Unpooled
Unpooled 是 Netty 中一個(gè)工具類,用于創(chuàng)建不同類型的 ByteBuf 對(duì)象,而且同樣是使用PooledByteBufAllocator 類來分配和管理內(nèi)存。
只不過它提供了一些靜態(tài)方法,可以很方便地創(chuàng)建 HeapBuf、DirectBuf、CompositeBuf 等類型的 ByteBuf 對(duì)象。常見方法:
- buffer():創(chuàng)建一個(gè) HeapBuf 對(duì)象,使用 JVM 堆內(nèi)存來存儲(chǔ)數(shù)據(jù)。
- directBuffer():創(chuàng)建一個(gè) DirectBuf 對(duì)象,使用直接內(nèi)存來存儲(chǔ)數(shù)據(jù)。
- wrappedBuffer():創(chuàng)建一個(gè) CompositeBuf 對(duì)象,可以將多個(gè) ByteBuf 對(duì)象合并成一個(gè)虛擬的 ByteBuf 對(duì)象。
- copiedBuffer():創(chuàng)建一個(gè) HeapBuf 對(duì)象,并將字節(jié)數(shù)組的內(nèi)容復(fù)制到 HeapBuf 中。
- unsafeBuffer():創(chuàng)建一個(gè)不安全的 ByteBuf 對(duì)象,用于一些特殊的場(chǎng)景,例如 JNI 調(diào)用等。
不過同樣要記得在使用完畢后,應(yīng)該及時(shí)調(diào)用 release() 方法來釋放 ByteBuf 對(duì)象的資源哦。
回顧一下:考慮到Netty中 ByteBuf 等常用類,為避免頻繁地分配和釋放內(nèi)存,通過內(nèi)存池實(shí)現(xiàn)內(nèi)存復(fù)用。但 ByteBuf 也是類,頻繁地創(chuàng)建、銷毀對(duì)象同樣有大量的性能開銷,怎么優(yōu)化?
那么接下來我們看一下 對(duì)象池。
4.3. 對(duì)象池 Recycler
Recycler (回收器,[?ri??sa?kl] )是 Netty是一個(gè)對(duì)象池,主要用于重用對(duì)象,避免頻繁創(chuàng)建和銷毀帶來的性能開銷。被廣泛地應(yīng)用于各種場(chǎng)景中,例如 ByteBuf 對(duì)象池、EventExecutor 對(duì)象池、ChannelHandlerContext 對(duì)象池等等。我們還是來看看 ByteBuf。
ByteBuf 中包含一個(gè) Recycler.Handle 對(duì)象,用于管理 ByteBuf 對(duì)象池的創(chuàng)建和銷毀。當(dāng)需要?jiǎng)?chuàng)建一個(gè)新的 ByteBuf 對(duì)象時(shí),無論通過前面介紹的PooledByteBufAllocator、Unpooled,都是通過 ByteBufAllocator 接口提供的 directBuffer() 或 heapBuffer() 等方法來創(chuàng)建。
這些方法就是基于Recycler,會(huì)自動(dòng)從線程本地的對(duì)象池中獲取一個(gè) ByteBuf 對(duì)象,如果對(duì)象池為空,則會(huì)創(chuàng)建一個(gè)新對(duì)象,并將其加入對(duì)象池中。當(dāng)不再需要這個(gè)對(duì)象時(shí),可以通過調(diào)用 release() 方法將其回收到對(duì)象池中,等待下次使用。
ChannelHandlerContext 對(duì)象池也類似,在 Netty 中,可以通過 ChannelHandlerContext 的 newContext() 方法來獲取一個(gè)新的 ChannelHandlerContext 對(duì)象,這個(gè)方法會(huì)從 Recycler 對(duì)象池中獲取一個(gè) ChannelHandlerContext 對(duì)象并進(jìn)行初始化,如果沒有可用的對(duì)象,則會(huì)創(chuàng)建一個(gè)新對(duì)象。在使用完后,通過調(diào)用 ChannelHandlerContext 的 recycle() 方法將其回收到對(duì)象池中,等待下次使用。
當(dāng)然 Recycler 是 Netty 中實(shí)現(xiàn)對(duì)象池的機(jī)制,并不局限于只有 Netty 的這些組件類可以用,任何我們自定義的類都可以。下面看一個(gè)例子。
示例(任何對(duì)象)
public class UserCache { private static final Recycler<User> userRecycler = new Recycler<User>() { @Override protected User newObject(Handle<User> handle) { return new User(handle); } }; static final class User { private String name; private Recycler.Handle<User> handle; public void setName(String name) { this.name = name; } public String getName() { return name; } public User(Recycler.Handle<User> handle) { this.handle = handle; } public void recycle() { handle.recycle(this); } } public static void main(String[] args) { User user1 = userRecycler.get(); user1.setName("hello"); user1.recycle(); User user2 = userRecycler.get(); System.out.println(user1 == user2); } }
左邊的例子中,我們定義了一個(gè)User類,main方法中,user1.recycle(),user1回收了之后,然后 user2 再獲取。
- (1)user2獲取的依然是同一個(gè)對(duì)象,所以打印出的結(jié)果是:hello 和 true。
- (2)如果我們注釋掉 user1.cecycle(),user2 會(huì)獲取不到對(duì)象,打印的結(jié)果就是:null 和 false。
線程安全
另外,Recycler 使用線程本地變量(FastThreadLocal)來存儲(chǔ)對(duì)象,每個(gè)線程都有一個(gè)獨(dú)立的對(duì)象池。這個(gè)機(jī)制可以保證對(duì)象的安全性和線程互相獨(dú)立,避免了線程安全問題和競(jìng)爭條件的出現(xiàn)。
那么這個(gè) FastThreadLocal 是啥?和常見的 ThreadLocal 有啥關(guān)系呢?
4.4. 本地線程優(yōu)化 FastThreadLocal
FastThreadLocal(更快的ThreadLocal) 是 Netty 自己研發(fā)的一個(gè)工具類,用于替換 Java 原生的 ThreadLocal。主要有以下幾個(gè)原因:
- 性能:與 ThreadLocal 相比,F(xiàn)astThreadLocal 在存取線程局部變量時(shí)有更快的速度。在 ThreadLocal 中,每次獲取變量都需要通過哈希映射進(jìn)行查找,當(dāng)線程局部變量很多時(shí),這會(huì)成為一個(gè)性能瓶頸。而 FastThreadLocal 則將所有線程的局部變量存儲(chǔ)在一個(gè)數(shù)組中,通過索引快速定位,提高了存取速度。
- 避免內(nèi)存泄漏:ThreadLocal 在使用不當(dāng)時(shí),很容易造成內(nèi)存泄漏,需要我們?cè)谑褂煤笤偈謩?dòng)調(diào)用reomve()方法。而 FastThreadLocal 能有效避免這個(gè)問題。它會(huì)在每個(gè)線程結(jié)束時(shí)自動(dòng)清理線程局部變量,而不是依賴于 JVM 的垃圾回收。
- 更好的整合:Netty 中很多地方使用了線程局部變量,例如 ByteBuf 的內(nèi)存池、Recycler 對(duì)象池等。有了自己的 FastThreadLocal,Netty 可以更好地控制和優(yōu)化這些功能,提高整體性能。
代碼示例
public class FastThreadLocalDemo { private static final FastThreadLocal<Integer> THREAD_LOCAL = new FastThreadLocal<Integer>() { @Override protected Integer initialValue() throws Exception { return 1; } }; public static void main(String[] args) { new FastThreadLocalThread(() -> { for (int i = 0; i < 10; i++) { System.out.println(Thread.currentThread().getName() + " --> " + THREAD_LOCAL.get()); THREAD_LOCAL.set(THREAD_LOCAL.get() + 1); } }, "FastThreadLocalThread-1").start(); } }
注意事項(xiàng)
FastThreadLocal 的使用方式和 ThreadLocal差別不大,但是有幾點(diǎn)需要注意:
- 使用 FastThreadLocal 的線程最好是 FastThreadLocalThread 類型或者其子類。FastThreadLocal 會(huì)在這些線程中有更好的性能。如果使用的是Thread或其他實(shí)現(xiàn)的話,F(xiàn)astThreadLocal 仍然可以工作,但性能會(huì)降級(jí)。
- 相比于 ThreadLocal,F(xiàn)astThreadLocal 的優(yōu)勢(shì)在于當(dāng)一個(gè)線程有多個(gè)線程本地變量時(shí),它可以通過減少哈希沖突和查找來提高性能。但是如果一個(gè)線程只有一個(gè)或者很少的線程本地變量,那么 ThreadLocal 可能會(huì)有更好的性能。
- 當(dāng)你不再需要使用 FastThreadLocal 中的對(duì)象時(shí),還是應(yīng)該調(diào)用 remove() 來避免內(nèi)存泄漏。
雖說在使用了 FastThreadLocalThread 實(shí)例的情況下,在線程結(jié)束時(shí),F(xiàn)astThreadLocal 會(huì)自動(dòng)清理所有線程局部變量。但顯式地調(diào)用 remove() 方法仍然是一個(gè)好的實(shí)踐。特別是在長生命周期的線程或者使用了線程池的情況下,顯式地清理線程局部變量可以幫助避免潛在的內(nèi)存泄漏問題。
以上就是Netty開發(fā)及粘包實(shí)戰(zhàn)解決分析的詳細(xì)內(nèi)容,更多關(guān)于Netty開發(fā)粘包解決的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
以上就是Netty開發(fā)及粘包實(shí)戰(zhàn)解決分析的詳細(xì)內(nèi)容,更多關(guān)于Netty開發(fā)粘包解決的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Spring Boot結(jié)成MyBatis-Plus最全配置指南
本文主要介紹了Spring Boot結(jié)成MyBatis-Plus最全配置指南,包括依賴引入、配置數(shù)據(jù)源、Mapper 掃描、基本CRUD操作等,具有一定的參考價(jià)值,感興趣的可以了解一下2025-03-03spring基于通用Dao的多數(shù)據(jù)源配置詳解
這篇文章主要為大家詳細(xì)介紹了spring基于通用Dao的多數(shù)據(jù)源配置,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下解2018-03-03基于Spring Boot的Logback日志輪轉(zhuǎn)配置詳解
本篇文章主要介紹了基于Spring Boot的Logback日志輪轉(zhuǎn)配置詳解,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-10-10SpringBoot使用@SpringBootTest注解開發(fā)單元測(cè)試教程
這篇文章主要介紹了SpringBoot使用@SpringBootTest注解開發(fā)單元測(cè)試教程,本文通過詳細(xì)的案例過程來說明如何使用該項(xiàng)技術(shù),需要的朋友可以參考下2021-06-06java關(guān)于String.split("|")的使用方式
這篇文章主要介紹了java關(guān)于String.split("|")的使用方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-02-02RabbitMQ通過延遲插件實(shí)現(xiàn)延遲消息
在RabbitMQ中,使用延遲消息插件比死信隊(duì)列更優(yōu)化的實(shí)現(xiàn)消息的延遲發(fā)送,本文介紹了延遲插件的下載、安裝、以及如何通過設(shè)置消息頭x-delay實(shí)現(xiàn)消息的延遲投遞,特別指出,使用延遲消息可能會(huì)損耗性能,適合短時(shí)間的延遲場(chǎng)景2024-10-10Java中快速排序優(yōu)化技巧之隨機(jī)取樣、三數(shù)取中和插入排序
快速排序是一種常用的基于比較的排序算法,下面這篇文章主要給大家介紹了關(guān)于Java中快速排序優(yōu)化技巧之隨機(jī)取樣、三數(shù)取中和插入排序的相關(guān)資料,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-09-09