SpringBoot中使用Netty的實現(xiàn)示例
一、什么是Netty
Netty 是 Java 中一個非常高性能的網(wǎng)絡(luò)通信框架,用來開發(fā)服務(wù)器和客戶端程序,主要用于處理 TCP/UDP 的網(wǎng)絡(luò)連接,比如:
聊天服務(wù)
實時推送
高并發(fā)網(wǎng)絡(luò)通信(比如游戲、IoT、金融系統(tǒng))
你可以把 Netty 理解為一種比 Java 原生 Socket 更方便、性能更強的“網(wǎng)絡(luò)搭建工具”。再詳細了解Netty的工作原理之前,我們先來看一下Java中最簡單的客戶端和服務(wù)器之間的連接。
二、最簡單的 Java 網(wǎng)絡(luò)通信
2.1什么是“客戶端”和“服務(wù)端”?
我們先理解一個現(xiàn)實生活的比喻:奶茶店點單系統(tǒng)
服務(wù)端(Netty 服務(wù)):奶茶店(固定位置,等待別人來點單)
客戶端(瀏覽器、手機 App、Netty 客戶端):顧客(誰想喝奶茶誰來)
通信方式(TCP):電話(通過電話點單)
還可以更加省略一點來說就是 ?? 一個人發(fā)送消息(客戶端) ? 另一個人接收并回復(服務(wù)端)
2.2服務(wù)端
import java.io.*; import java.net.*; public class Server { public static void main(String[] args) throws Exception { ServerSocket serverSocket = new ServerSocket(8080); // 在8080端口等別人來找 System.out.println("服務(wù)端啟動,等待客戶端連接..."); Socket socket = serverSocket.accept(); // 有人來連接,就接收它 System.out.println("客戶端連接進來了"); // 輸入輸出流:用來讀寫數(shù)據(jù) BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); // 讀 PrintWriter out = new PrintWriter(socket.getOutputStream(), true); // 寫 String line; while ((line = in.readLine()) != null) { System.out.println("收到客戶端消息:" + line); out.println("我收到了:" + line); // 回給客戶端 } socket.close(); // 關(guān)閉連接 serverSocket.close(); } }
2.3客戶端
import java.io.*; import java.net.*; public class Client { public static void main(String[] args) throws Exception { Socket socket = new Socket("127.0.0.1", 8080); // 連接本機服務(wù)端 System.out.println("連接服務(wù)端成功!"); // 輸入輸出 BufferedReader userInput = new BufferedReader(new InputStreamReader(System.in)); // 你鍵盤輸入 PrintWriter out = new PrintWriter(socket.getOutputStream(), true); // 發(fā)消息 BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); // 收消息 String msg; while ((msg = userInput.readLine()) != null) { out.println(msg); // 發(fā)給服務(wù)端 String reply = in.readLine(); // 讀取服務(wù)端返回 System.out.println("服務(wù)端說:" + reply); } socket.close(); } }
2.4 服務(wù)端和客戶端之間的通信
首先是服務(wù)端先啟動,會有如下顯示,同時告訴顧客我家的店的端口號是8080。
服務(wù)端啟動,等待客戶端連接...
然后有顧客想買東西,通過 new Socket("127.0.0.1", 8080); // 連接本機服務(wù)端,即走進服務(wù)器店的大門8080。而在服務(wù)器這端,通過serverSocket.accept(); 看見有人來連接,就接收它,服務(wù)它。這時候客戶端會輸出如下
連接服務(wù)端成功!
服務(wù)端會輸出如下:
客戶端連接進來了
在客戶端通過控制臺輸入:hello后,即通過如下代碼接收到了你的輸入,并存放在userInput變量中。
BufferedReader userInput = new BufferedReader(new InputStreamReader(System.in));
客戶端通過out對象發(fā)消息
PrintWriter out = new PrintWriter(socket.getOutputStream(), true); // 發(fā)消息
客戶端通過in對象接受消息
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); // 收消息
當 msg = userInput.readLine()) != null ,即當檢測到客戶端要發(fā)送消息就執(zhí)行如下代碼:
out.println(msg); // 發(fā)給服務(wù)端 String reply = in.readLine(); // 讀取服務(wù)端返回 System.out.println("服務(wù)端說:" + reply);
out.println(msg)后,就將信息發(fā)送到了服務(wù)器端,服務(wù)器端就會輸出如下
收到客戶端消息:hello
同時在服務(wù)器端通過 out.println("我收到了:" + line); 回給客戶端,客戶端通過reply接收到消息,客戶端就會輸出
服務(wù)端說:我收到了:hello
2.5 客戶端和服務(wù)器端的關(guān)系如下:
角色 | 作用 |
---|
Server | 永遠在等別人來(監(jiān)聽端口) |
Client | 主動發(fā)起連接 |
Input/Output | 收發(fā)消息用的“通道” |
二、為什么需要線程模型?(Thread Model)
理解了基礎(chǔ)的服務(wù)端和客戶端通信,我們可以繼續(xù)深入,了解一些稍微復雜一點的概念,即線程。
在前面那個簡單的服務(wù)端/客戶端例子中,服務(wù)端是“串行”的,意思是:
- 它在等待一個客戶端連接。
- 收到消息后再回復,接著等待下一個連接。
但是如果你有很多客戶端同時發(fā)消息,服務(wù)端就會變得很慢,因為它只能一個一個地處理請求。
所以,我們需要更高效的處理方式:并發(fā)編程并發(fā)編程意味著能夠同時處理多個任務(wù),不等一個任務(wù)完成再開始下一個。而且每個任務(wù)都不會相互阻塞。這就是 線程池 和 事件循環(huán)模型 的價值所在。在 Netty 中:
- 線程池:多個線程可以同時處理多個連接。
- 事件循環(huán)模型:每個線程(事件循環(huán))只負責自己的任務(wù),它會不停地輪詢事件,比如客戶端連接、數(shù)據(jù)讀取等。
三、什么是“阻塞”和“非阻塞”?
? 阻塞:你去餐廳吃飯,服務(wù)員給你一個菜單,但你必須等著他們準備好菜才能吃,期間你不能干別的事。
? 非阻塞:你點菜后,服務(wù)員會告訴你“稍等一會兒”,然后你可以做其他事。只要菜做好了,服務(wù)員會告訴你,打斷你做其他事,給你菜。
TCP 通信中的阻塞和非阻塞:
- 阻塞:當你發(fā)起連接或請求時,程序會一直等待,直到連接建立或數(shù)據(jù)返回。
- 非阻塞:發(fā)起請求后,程序不再等待,會繼續(xù)執(zhí)行其他任務(wù)。如果有返回結(jié)果,程序會處理返回。
Netty 默認就是 非阻塞 的,這樣它能同時處理很多連接,不會被一個請求堵住。
四、Netty 是如何處理高并發(fā)的?
Netty通過使用一個線程模型 EventLoop(事件循環(huán))來處理高并發(fā)。EventLoopGroup:管理多個線程(可以理解為多個服務(wù)員),負責處理網(wǎng)絡(luò)事件。EventLoop:每個線程負責自己的一部分任務(wù),比如處理某一個客戶端的請求。
舉例來看就是:
- 一個服務(wù)端線程,負責監(jiān)聽連接(等待“顧客”進店)。
- 多個工作線程,負責實際的通信(幫“顧客”點單、做菜)。
4.1 EventLoop 和 NIO 的關(guān)系
Netty 使用了 NIO(非阻塞 IO) 模型。NIO 讓一個線程能處理多個連接。具體來說:
- 使用 Selector 輪詢(檢查)每個連接的狀態(tài),看是否有數(shù)據(jù)到達。
- 使用 Channel 來表示網(wǎng)絡(luò)連接。
- 使用 Buffer 來讀取和寫入數(shù)據(jù)。
這個模型讓 Netty 在面對數(shù)千個并發(fā)連接時,也能保持高效。
總結(jié)來看,Netty的EventLoopGroup管理多個線程,每個線程只干特定的事情,假設(shè)某個線程只干連接客戶端這個事情,又由于Netty引入了NIO模型,所以又讓這個負責處理連接的線程具備了同時處理多個連接請求的能力。
五、實際的 Netty 服務(wù)端示例
public class EchoServer { public static void main(String[] args) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 負責接收連接 EventLoopGroup workerGroup = new NioEventLoopGroup(); // 負責處理請求 try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new EchoServerHandler()); ChannelFuture f = b.bind(8080).sync(); // 綁定端口,開始監(jiān)聽 f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } // 處理客戶端發(fā)來的消息 public static class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 接收到數(shù)據(jù)后直接寫回給客戶端 System.out.println("收到消息:" + msg); ctx.writeAndFlush(msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); // 出現(xiàn)異常關(guān)閉連接 } } }
- ServerBootstrap:是 Netty 中用于啟動服務(wù)端的核心類,啟動 Netty 服務(wù)端。
- bossGroup 和 workerGroup:管理事件循環(huán),分別處理接收連接和處理數(shù)據(jù)的任務(wù)。
- EchoServerHandler:是我們自定義的業(yè)務(wù)處理邏輯,收到客戶端的消息就原封不動地回傳。
六、實際使用的Netty
6.1 NettyServer類
ServerBootstrap:Netty服務(wù)器啟動的核心類。
ServerBootstrap serverBootstrap = new ServerBootstrap() .group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ServerChannelInitializer(delimiter, maxFrameLength)) .localAddress(socketAddress) .option(ChannelOption.SO_BACKLOG, 1024) .childOption(ChannelOption.SO_KEEPALIVE, true);
- .group(bossGroup, workGroup) 配置監(jiān)聽線程和工作線程。
.channel(NioServerSocketChannel.class)
: 這里指定了服務(wù)端的 Channel 類型。NioServerSocketChannel
適用于 NIO(非阻塞 IO),這是一種處理高并發(fā)的方式。.childHandler(new ServerChannelInitializer(delimiter, maxFrameLength))
: 為每個連接配置一個ChannelInitializer
,在每個連接初始化時(每個客戶端連接時)會被調(diào)用。ServerChannelInitializer
是自定義的初始化類,配置如何處理數(shù)據(jù)的編解碼、業(yè)務(wù)邏輯等。.localAddress(socketAddress)
: 配置綁定的本地地址和端口.option(ChannelOption.SO_BACKLOG, 1024)
: 配置服務(wù)器端的連接隊列大小。隊列最大長度設(shè)置為 1024。.childOption(ChannelOption.SO_KEEPALIVE, true)
: 設(shè)置 TCP KeepAlive,確保連接在空閑時依然存活。
6.1.1啟動并綁定端口
ChannelFuture channelFuture = serverBootstrap.bind(socketAddress).sync();
.bind(socketAddress)
: 綁定到指定的socketAddress
,開始監(jiān)聽客戶端的連接。.sync()
: 阻塞方法,直到端口綁定成功并啟動后,才會繼續(xù)執(zhí)行。ChannelFuture
用于獲取當前操作的結(jié)果(是否成功綁定)
6.2 SeverChannelInitializer類
在NettyServer類中,我們是調(diào)用了SeverChannelInitializer類的,我們使用SeverChannelInitializer類來配置如何處理數(shù)據(jù)的編解碼、業(yè)務(wù)邏輯等。當每個客戶端連接進來時,配置它的 Channel 的“流水線”——也就是這個連接收到/發(fā)送數(shù)據(jù)時,按什么順序怎么處理。可以把它理解為工廠生產(chǎn)線的“組裝說明書”。
package com....nettyService; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.logging.LoggingHandler; public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> { private String DELIMITER; private int MAXFRAMELENGTH; public ServerChannelInitializer(String delimiter, int maxFrameLength) { DELIMITER = delimiter; MAXFRAMELENGTH = maxFrameLength; } @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast("logging", new LoggingHandler("DEBUG")); socketChannel.pipeline().addLast("decoder", new HL7Decoder()); socketChannel.pipeline().addLast("encoder", new HL7Encoder()); socketChannel.pipeline().addLast(new NettyServerHandler()); } }
SeverChannelInitializer首先繼承了ChannelInitializer<SocketChannel>,這樣沒有一個新的連接的時候Netty 就會調(diào)用 initChannel()
方法,給這個連接安裝一套“處理器組合”(pipeline)。
而這一套“處理器組合”當接收到客戶端發(fā)送的消息執(zhí)行順序如下:
【客戶端】==> socketChannel
↓
[LoggingHandler](打印日志)
↓
[HL7Decoder](解碼消息)
↓
[NettyServerHandler](業(yè)務(wù)處理)
當服務(wù)端要回復消息,其執(zhí)行順序如下:
NettyServerHandler.write()
↓
[HL7Encoder](編碼為字節(jié))
↓
[LoggingHandler](打?。?br /> ↓
【客戶端】
6.3 NettySeverHandler類
在SeverChannelInitializer類中,其寫好了業(yè)務(wù)處理順序,在處理業(yè)務(wù)時,其處理業(yè)務(wù)的核心是NettySeverHandler類來實現(xiàn)的
package com.....nettyService; import com...component.commons.utils.BeanUtils; import com...emergency.service.impl.BS2800MPacketParse; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import org.apache.commons.lang3.ObjectUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Map; @Component public class NettyServerHandler extends SimpleChannelInboundHandler<String> { private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class); private BS2800MPacketParse bs2800MPacketParse = BeanUtils.getBean(BS2800MPacketParse.class); /** * 裝載所有客戶端channel的組 */ private static final Map<String, Channel> ipChannelMap = new HashMap<>(); /** * 客戶端連接過來會觸發(fā) */ @Override public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception { Channel channel = channelHandlerContext.channel(); ipChannelMap.put(channel.remoteAddress().toString(), channel); logger.info("客戶端連接:" + channelHandlerContext); } /** * 客戶端發(fā)消息過來會觸發(fā) */ @Override public void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception { Channel channel = channelHandlerContext.channel(); logger.info("服務(wù)端接收到客戶端消息"); // logger.info("發(fā)送消息的客戶端地址:" + channel.remoteAddress()); logger.info("發(fā)送消息的客戶端所發(fā)消息:" + msg); String result = msg; String msa = handleParams(channelHandlerContext, result); if (ObjectUtils.isNotEmpty(msa)) { channel.writeAndFlush(msa); } } @Override public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception { super.channelReadComplete(channelHandlerContext); } @Override public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception { Channel channel = channelHandlerContext.channel(); // 當通道變?yōu)榉腔顒訝顟B(tài)(斷開連接)時,將其從 ChannelGroup 中移除 String ip = channel.remoteAddress().toString(); if (ipChannelMap.containsKey(ip)) { ipChannelMap.remove(ip); if (!channel.isActive() || channel == null) { channelHandlerContext.close(); } } logger.info("客戶端地址為:" + ip + "的連接已斷開"); } /** * 發(fā)生異常觸發(fā) */ @Override public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable cause) throws Exception { logger.warn(cause.toString()); } /** * 處理接收報文消息 */ public String handleParams(ChannelHandlerContext channelHandlerContext, String msg) { String msa = null; SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss"); Channel channel = channelHandlerContext.channel(); if (channel.remoteAddress().toString().contains("10.10.51.213")) { if (ObjectUtils.isNotEmpty(msg)) { String result[] = msg.split("\r"); if (ObjectUtils.isNotEmpty(result) && result.length > 0) { String msh = null; for (String string : result) { if (string.contains("MSH")) { msh = string; } } if (msh.contains("ORU^R01")) { Date date = new Date(); String temp[] = msh.split("\\|", -1); if (ObjectUtils.isNotEmpty(temp) && temp.length > 9) { msa = "MSH|^~\\&|||||" + dateFormat.format(date) + "||ACK^R01|" + temp[9] + "|P|2.3.1||||0||ASCII|||"; String str = "MSA|AA|" + temp[9] + "|Message accepted|||0|"; msa = msa + "\r" + str; Map<String, String> paramMap = new HashMap<>(); paramMap.put(temp[9], msg); bs2800MPacketParse.parse(msg); return msa; } } } } } return msa; } }
6.3.1繼承
NettyServerHandler繼承SimpleChannelInboundHandler<String> 每次接收到客戶端消息(已經(jīng)是 String
類型,說明解碼器已完成解碼),就會觸發(fā) channelRead0()
方法。我們可以在這里處理邏輯、保存數(shù)據(jù)、做回復等
6.3.2channelActive
有客戶端連接進來時,Netty 會自動調(diào)用這個方法。將客戶端的 Channel 保存到 ipChannelMap
中,方便后面用 IP 找到連接。同時打印客戶端連接信息。
6.3.3channelRead0
每當客戶端發(fā)一條消息過來,就會自動執(zhí)行這里!先獲取當前的 Channel(對應(yīng)客戶端)
Channel channel = channelHandlerContext.channel();
打印日志,方便調(diào)試看到收到的數(shù)據(jù)
到此這篇關(guān)于SpringBoot中使用Netty的實現(xiàn)示例的文章就介紹到這了,更多相關(guān)SpringBoot使用 Netty內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- springboot接入netty實現(xiàn)在線統(tǒng)計人數(shù)
- SpringBoot整合Netty服務(wù)端的方法示例
- SpringBoot整合Netty服務(wù)端的實現(xiàn)示例
- SpringBoot如何集成Netty
- SpringBoot集成netty實現(xiàn)websocket通信功能
- SpringBoot整合Netty+Websocket實現(xiàn)消息推送的示例代碼
- SpringBoot整合Netty的流程步驟
- springboot整合netty框架實現(xiàn)站內(nèi)信
- Springboot整合Netty自定義協(xié)議實現(xiàn)示例詳解
- springboot整合netty框架的方式小結(jié)
相關(guān)文章
Java實現(xiàn)精準Excel數(shù)據(jù)排序的方法詳解
在數(shù)據(jù)處理或者數(shù)據(jù)分析的場景中,需要對已有的數(shù)據(jù)進行排序,在Excel中可以通過排序功能進行整理數(shù)據(jù),而在Java中,則可以借助Excel表格插件對數(shù)據(jù)進行批量排序,下面我們就來學習一下常見的數(shù)據(jù)排序方法吧2023-10-10spring-data-jpa中findOne與getOne的區(qū)別說明
這篇文章主要介紹了spring-data-jpa中findOne與getOne的區(qū)別說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-11-11Java class文件格式之數(shù)據(jù)類型_動力節(jié)點Java學院整理
這篇文章主要介紹了Java class文件格式之數(shù)據(jù)類型的相關(guān)資料,需要的朋友可以參考下2017-06-06SpringBoot中@Insert、@Update實現(xiàn)批量新增更新的使用示例
本文主要介紹了SpringBoot中@Insert、@Update實現(xiàn)批量新增更新的使用,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2023-10-10