SpringBoot中使用Netty的實(shí)現(xiàn)示例
一、什么是Netty
Netty 是 Java 中一個(gè)非常高性能的網(wǎng)絡(luò)通信框架,用來開發(fā)服務(wù)器和客戶端程序,主要用于處理 TCP/UDP 的網(wǎng)絡(luò)連接,比如:
聊天服務(wù)
實(shí)時(shí)推送
高并發(fā)網(wǎng)絡(luò)通信(比如游戲、IoT、金融系統(tǒng))
你可以把 Netty 理解為一種比 Java 原生 Socket 更方便、性能更強(qiáng)的“網(wǎng)絡(luò)搭建工具”。再詳細(xì)了解Netty的工作原理之前,我們先來看一下Java中最簡單的客戶端和服務(wù)器之間的連接。
二、最簡單的 Java 網(wǎng)絡(luò)通信
2.1什么是“客戶端”和“服務(wù)端”?
我們先理解一個(gè)現(xiàn)實(shí)生活的比喻:奶茶店點(diǎn)單系統(tǒng)
服務(wù)端(Netty 服務(wù)):奶茶店(固定位置,等待別人來點(diǎn)單)
客戶端(瀏覽器、手機(jī) App、Netty 客戶端):顧客(誰想喝奶茶誰來)
通信方式(TCP):電話(通過電話點(diǎn)單)
還可以更加省略一點(diǎn)來說就是 ?? 一個(gè)人發(fā)送消息(客戶端) ? 另一個(gè)人接收并回復(fù)(服務(wù)端)
2.2服務(wù)端
import java.io.*; import java.net.*; public class Server { public static void main(String[] args) throws Exception { ServerSocket serverSocket = new ServerSocket(8080); // 在8080端口等別人來找 System.out.println("服務(wù)端啟動(dòng),等待客戶端連接..."); Socket socket = serverSocket.accept(); // 有人來連接,就接收它 System.out.println("客戶端連接進(jìn)來了"); // 輸入輸出流:用來讀寫數(shù)據(jù) BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); // 讀 PrintWriter out = new PrintWriter(socket.getOutputStream(), true); // 寫 String line; while ((line = in.readLine()) != null) { System.out.println("收到客戶端消息:" + line); out.println("我收到了:" + line); // 回給客戶端 } socket.close(); // 關(guān)閉連接 serverSocket.close(); } }
2.3客戶端
import java.io.*; import java.net.*; public class Client { public static void main(String[] args) throws Exception { Socket socket = new Socket("127.0.0.1", 8080); // 連接本機(jī)服務(wù)端 System.out.println("連接服務(wù)端成功!"); // 輸入輸出 BufferedReader userInput = new BufferedReader(new InputStreamReader(System.in)); // 你鍵盤輸入 PrintWriter out = new PrintWriter(socket.getOutputStream(), true); // 發(fā)消息 BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); // 收消息 String msg; while ((msg = userInput.readLine()) != null) { out.println(msg); // 發(fā)給服務(wù)端 String reply = in.readLine(); // 讀取服務(wù)端返回 System.out.println("服務(wù)端說:" + reply); } socket.close(); } }
2.4 服務(wù)端和客戶端之間的通信
首先是服務(wù)端先啟動(dòng),會(huì)有如下顯示,同時(shí)告訴顧客我家的店的端口號(hào)是8080。
服務(wù)端啟動(dòng),等待客戶端連接...
然后有顧客想買東西,通過 new Socket("127.0.0.1", 8080); // 連接本機(jī)服務(wù)端,即走進(jìn)服務(wù)器店的大門8080。而在服務(wù)器這端,通過serverSocket.accept(); 看見有人來連接,就接收它,服務(wù)它。這時(shí)候客戶端會(huì)輸出如下
連接服務(wù)端成功!
服務(wù)端會(huì)輸出如下:
客戶端連接進(jìn)來了
在客戶端通過控制臺(tái)輸入:hello后,即通過如下代碼接收到了你的輸入,并存放在userInput變量中。
BufferedReader userInput = new BufferedReader(new InputStreamReader(System.in));
客戶端通過out對(duì)象發(fā)消息
PrintWriter out = new PrintWriter(socket.getOutputStream(), true); // 發(fā)消息
客戶端通過in對(duì)象接受消息
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); // 收消息
當(dāng) msg = userInput.readLine()) != null ,即當(dāng)檢測到客戶端要發(fā)送消息就執(zhí)行如下代碼:
out.println(msg); // 發(fā)給服務(wù)端 String reply = in.readLine(); // 讀取服務(wù)端返回 System.out.println("服務(wù)端說:" + reply);
out.println(msg)后,就將信息發(fā)送到了服務(wù)器端,服務(wù)器端就會(huì)輸出如下
收到客戶端消息:hello
同時(shí)在服務(wù)器端通過 out.println("我收到了:" + line); 回給客戶端,客戶端通過reply接收到消息,客戶端就會(huì)輸出
服務(wù)端說:我收到了:hello
2.5 客戶端和服務(wù)器端的關(guān)系如下:
角色 | 作用 |
---|
Server | 永遠(yuǎn)在等別人來(監(jiān)聽端口) |
Client | 主動(dòng)發(fā)起連接 |
Input/Output | 收發(fā)消息用的“通道” |
二、為什么需要線程模型?(Thread Model)
理解了基礎(chǔ)的服務(wù)端和客戶端通信,我們可以繼續(xù)深入,了解一些稍微復(fù)雜一點(diǎn)的概念,即線程。
在前面那個(gè)簡單的服務(wù)端/客戶端例子中,服務(wù)端是“串行”的,意思是:
- 它在等待一個(gè)客戶端連接。
- 收到消息后再回復(fù),接著等待下一個(gè)連接。
但是如果你有很多客戶端同時(shí)發(fā)消息,服務(wù)端就會(huì)變得很慢,因?yàn)樗荒芤粋€(gè)一個(gè)地處理請(qǐng)求。
所以,我們需要更高效的處理方式:并發(fā)編程并發(fā)編程意味著能夠同時(shí)處理多個(gè)任務(wù),不等一個(gè)任務(wù)完成再開始下一個(gè)。而且每個(gè)任務(wù)都不會(huì)相互阻塞。這就是 線程池 和 事件循環(huán)模型 的價(jià)值所在。在 Netty 中:
- 線程池:多個(gè)線程可以同時(shí)處理多個(gè)連接。
- 事件循環(huán)模型:每個(gè)線程(事件循環(huán))只負(fù)責(zé)自己的任務(wù),它會(huì)不停地輪詢事件,比如客戶端連接、數(shù)據(jù)讀取等。
三、什么是“阻塞”和“非阻塞”?
? 阻塞:你去餐廳吃飯,服務(wù)員給你一個(gè)菜單,但你必須等著他們準(zhǔn)備好菜才能吃,期間你不能干別的事。
? 非阻塞:你點(diǎn)菜后,服務(wù)員會(huì)告訴你“稍等一會(huì)兒”,然后你可以做其他事。只要菜做好了,服務(wù)員會(huì)告訴你,打斷你做其他事,給你菜。
TCP 通信中的阻塞和非阻塞:
- 阻塞:當(dāng)你發(fā)起連接或請(qǐng)求時(shí),程序會(huì)一直等待,直到連接建立或數(shù)據(jù)返回。
- 非阻塞:發(fā)起請(qǐng)求后,程序不再等待,會(huì)繼續(xù)執(zhí)行其他任務(wù)。如果有返回結(jié)果,程序會(huì)處理返回。
Netty 默認(rèn)就是 非阻塞 的,這樣它能同時(shí)處理很多連接,不會(huì)被一個(gè)請(qǐng)求堵住。
四、Netty 是如何處理高并發(fā)的?
Netty通過使用一個(gè)線程模型 EventLoop(事件循環(huán))來處理高并發(fā)。EventLoopGroup:管理多個(gè)線程(可以理解為多個(gè)服務(wù)員),負(fù)責(zé)處理網(wǎng)絡(luò)事件。EventLoop:每個(gè)線程負(fù)責(zé)自己的一部分任務(wù),比如處理某一個(gè)客戶端的請(qǐng)求。
舉例來看就是:
- 一個(gè)服務(wù)端線程,負(fù)責(zé)監(jiān)聽連接(等待“顧客”進(jìn)店)。
- 多個(gè)工作線程,負(fù)責(zé)實(shí)際的通信(幫“顧客”點(diǎn)單、做菜)。
4.1 EventLoop 和 NIO 的關(guān)系
Netty 使用了 NIO(非阻塞 IO) 模型。NIO 讓一個(gè)線程能處理多個(gè)連接。具體來說:
- 使用 Selector 輪詢(檢查)每個(gè)連接的狀態(tài),看是否有數(shù)據(jù)到達(dá)。
- 使用 Channel 來表示網(wǎng)絡(luò)連接。
- 使用 Buffer 來讀取和寫入數(shù)據(jù)。
這個(gè)模型讓 Netty 在面對(duì)數(shù)千個(gè)并發(fā)連接時(shí),也能保持高效。
總結(jié)來看,Netty的EventLoopGroup管理多個(gè)線程,每個(gè)線程只干特定的事情,假設(shè)某個(gè)線程只干連接客戶端這個(gè)事情,又由于Netty引入了NIO模型,所以又讓這個(gè)負(fù)責(zé)處理連接的線程具備了同時(shí)處理多個(gè)連接請(qǐng)求的能力。
五、實(shí)際的 Netty 服務(wù)端示例
public class EchoServer { public static void main(String[] args) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 負(fù)責(zé)接收連接 EventLoopGroup workerGroup = new NioEventLoopGroup(); // 負(fù)責(zé)處理請(qǐng)求 try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new EchoServerHandler()); ChannelFuture f = b.bind(8080).sync(); // 綁定端口,開始監(jiān)聽 f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } // 處理客戶端發(fā)來的消息 public static class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 接收到數(shù)據(jù)后直接寫回給客戶端 System.out.println("收到消息:" + msg); ctx.writeAndFlush(msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); // 出現(xiàn)異常關(guān)閉連接 } } }
- ServerBootstrap:是 Netty 中用于啟動(dòng)服務(wù)端的核心類,啟動(dòng) Netty 服務(wù)端。
- bossGroup 和 workerGroup:管理事件循環(huán),分別處理接收連接和處理數(shù)據(jù)的任務(wù)。
- EchoServerHandler:是我們自定義的業(yè)務(wù)處理邏輯,收到客戶端的消息就原封不動(dòng)地回傳。
六、實(shí)際使用的Netty
6.1 NettyServer類
ServerBootstrap:Netty服務(wù)器啟動(dòng)的核心類。
ServerBootstrap serverBootstrap = new ServerBootstrap() .group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ServerChannelInitializer(delimiter, maxFrameLength)) .localAddress(socketAddress) .option(ChannelOption.SO_BACKLOG, 1024) .childOption(ChannelOption.SO_KEEPALIVE, true);
- .group(bossGroup, workGroup) 配置監(jiān)聽線程和工作線程。
.channel(NioServerSocketChannel.class)
: 這里指定了服務(wù)端的 Channel 類型。NioServerSocketChannel
適用于 NIO(非阻塞 IO),這是一種處理高并發(fā)的方式。.childHandler(new ServerChannelInitializer(delimiter, maxFrameLength))
: 為每個(gè)連接配置一個(gè)ChannelInitializer
,在每個(gè)連接初始化時(shí)(每個(gè)客戶端連接時(shí))會(huì)被調(diào)用。ServerChannelInitializer
是自定義的初始化類,配置如何處理數(shù)據(jù)的編解碼、業(yè)務(wù)邏輯等。.localAddress(socketAddress)
: 配置綁定的本地地址和端口.option(ChannelOption.SO_BACKLOG, 1024)
: 配置服務(wù)器端的連接隊(duì)列大小。隊(duì)列最大長度設(shè)置為 1024。.childOption(ChannelOption.SO_KEEPALIVE, true)
: 設(shè)置 TCP KeepAlive,確保連接在空閑時(shí)依然存活。
6.1.1啟動(dòng)并綁定端口
ChannelFuture channelFuture = serverBootstrap.bind(socketAddress).sync();
.bind(socketAddress)
: 綁定到指定的socketAddress
,開始監(jiān)聽客戶端的連接。.sync()
: 阻塞方法,直到端口綁定成功并啟動(dòng)后,才會(huì)繼續(xù)執(zhí)行。ChannelFuture
用于獲取當(dāng)前操作的結(jié)果(是否成功綁定)
6.2 SeverChannelInitializer類
在NettyServer類中,我們是調(diào)用了SeverChannelInitializer類的,我們使用SeverChannelInitializer類來配置如何處理數(shù)據(jù)的編解碼、業(yè)務(wù)邏輯等。當(dāng)每個(gè)客戶端連接進(jìn)來時(shí),配置它的 Channel 的“流水線”——也就是這個(gè)連接收到/發(fā)送數(shù)據(jù)時(shí),按什么順序怎么處理。可以把它理解為工廠生產(chǎn)線的“組裝說明書”。
package com....nettyService; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.logging.LoggingHandler; public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> { private String DELIMITER; private int MAXFRAMELENGTH; public ServerChannelInitializer(String delimiter, int maxFrameLength) { DELIMITER = delimiter; MAXFRAMELENGTH = maxFrameLength; } @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast("logging", new LoggingHandler("DEBUG")); socketChannel.pipeline().addLast("decoder", new HL7Decoder()); socketChannel.pipeline().addLast("encoder", new HL7Encoder()); socketChannel.pipeline().addLast(new NettyServerHandler()); } }
SeverChannelInitializer首先繼承了ChannelInitializer<SocketChannel>,這樣沒有一個(gè)新的連接的時(shí)候Netty 就會(huì)調(diào)用 initChannel()
方法,給這個(gè)連接安裝一套“處理器組合”(pipeline)。
而這一套“處理器組合”當(dāng)接收到客戶端發(fā)送的消息執(zhí)行順序如下:
【客戶端】==> socketChannel
↓
[LoggingHandler](打印日志)
↓
[HL7Decoder](解碼消息)
↓
[NettyServerHandler](業(yè)務(wù)處理)
當(dāng)服務(wù)端要回復(fù)消息,其執(zhí)行順序如下:
NettyServerHandler.write()
↓
[HL7Encoder](編碼為字節(jié))
↓
[LoggingHandler](打?。?br /> ↓
【客戶端】
6.3 NettySeverHandler類
在SeverChannelInitializer類中,其寫好了業(yè)務(wù)處理順序,在處理業(yè)務(wù)時(shí),其處理業(yè)務(wù)的核心是NettySeverHandler類來實(shí)現(xiàn)的
package com.....nettyService; import com...component.commons.utils.BeanUtils; import com...emergency.service.impl.BS2800MPacketParse; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import org.apache.commons.lang3.ObjectUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Map; @Component public class NettyServerHandler extends SimpleChannelInboundHandler<String> { private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class); private BS2800MPacketParse bs2800MPacketParse = BeanUtils.getBean(BS2800MPacketParse.class); /** * 裝載所有客戶端channel的組 */ private static final Map<String, Channel> ipChannelMap = new HashMap<>(); /** * 客戶端連接過來會(huì)觸發(fā) */ @Override public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception { Channel channel = channelHandlerContext.channel(); ipChannelMap.put(channel.remoteAddress().toString(), channel); logger.info("客戶端連接:" + channelHandlerContext); } /** * 客戶端發(fā)消息過來會(huì)觸發(fā) */ @Override public void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception { Channel channel = channelHandlerContext.channel(); logger.info("服務(wù)端接收到客戶端消息"); // logger.info("發(fā)送消息的客戶端地址:" + channel.remoteAddress()); logger.info("發(fā)送消息的客戶端所發(fā)消息:" + msg); String result = msg; String msa = handleParams(channelHandlerContext, result); if (ObjectUtils.isNotEmpty(msa)) { channel.writeAndFlush(msa); } } @Override public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception { super.channelReadComplete(channelHandlerContext); } @Override public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception { Channel channel = channelHandlerContext.channel(); // 當(dāng)通道變?yōu)榉腔顒?dòng)狀態(tài)(斷開連接)時(shí),將其從 ChannelGroup 中移除 String ip = channel.remoteAddress().toString(); if (ipChannelMap.containsKey(ip)) { ipChannelMap.remove(ip); if (!channel.isActive() || channel == null) { channelHandlerContext.close(); } } logger.info("客戶端地址為:" + ip + "的連接已斷開"); } /** * 發(fā)生異常觸發(fā) */ @Override public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable cause) throws Exception { logger.warn(cause.toString()); } /** * 處理接收?qǐng)?bào)文消息 */ public String handleParams(ChannelHandlerContext channelHandlerContext, String msg) { String msa = null; SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss"); Channel channel = channelHandlerContext.channel(); if (channel.remoteAddress().toString().contains("10.10.51.213")) { if (ObjectUtils.isNotEmpty(msg)) { String result[] = msg.split("\r"); if (ObjectUtils.isNotEmpty(result) && result.length > 0) { String msh = null; for (String string : result) { if (string.contains("MSH")) { msh = string; } } if (msh.contains("ORU^R01")) { Date date = new Date(); String temp[] = msh.split("\\|", -1); if (ObjectUtils.isNotEmpty(temp) && temp.length > 9) { msa = "MSH|^~\\&|||||" + dateFormat.format(date) + "||ACK^R01|" + temp[9] + "|P|2.3.1||||0||ASCII|||"; String str = "MSA|AA|" + temp[9] + "|Message accepted|||0|"; msa = msa + "\r" + str; Map<String, String> paramMap = new HashMap<>(); paramMap.put(temp[9], msg); bs2800MPacketParse.parse(msg); return msa; } } } } } return msa; } }
6.3.1繼承
NettyServerHandler繼承SimpleChannelInboundHandler<String> 每次接收到客戶端消息(已經(jīng)是 String
類型,說明解碼器已完成解碼),就會(huì)觸發(fā) channelRead0()
方法。我們可以在這里處理邏輯、保存數(shù)據(jù)、做回復(fù)等
6.3.2channelActive
有客戶端連接進(jìn)來時(shí),Netty 會(huì)自動(dòng)調(diào)用這個(gè)方法。將客戶端的 Channel 保存到 ipChannelMap
中,方便后面用 IP 找到連接。同時(shí)打印客戶端連接信息。
6.3.3channelRead0
每當(dāng)客戶端發(fā)一條消息過來,就會(huì)自動(dòng)執(zhí)行這里!先獲取當(dāng)前的 Channel(對(duì)應(yīng)客戶端)
Channel channel = channelHandlerContext.channel();
打印日志,方便調(diào)試看到收到的數(shù)據(jù)
到此這篇關(guān)于SpringBoot中使用Netty的實(shí)現(xiàn)示例的文章就介紹到這了,更多相關(guān)SpringBoot使用 Netty內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- springboot接入netty實(shí)現(xiàn)在線統(tǒng)計(jì)人數(shù)
- SpringBoot整合Netty服務(wù)端的方法示例
- SpringBoot整合Netty服務(wù)端的實(shí)現(xiàn)示例
- SpringBoot如何集成Netty
- SpringBoot集成netty實(shí)現(xiàn)websocket通信功能
- SpringBoot整合Netty+Websocket實(shí)現(xiàn)消息推送的示例代碼
- SpringBoot整合Netty的流程步驟
- springboot整合netty框架實(shí)現(xiàn)站內(nèi)信
- Springboot整合Netty自定義協(xié)議實(shí)現(xiàn)示例詳解
- springboot整合netty框架的方式小結(jié)
相關(guān)文章
Java實(shí)現(xiàn)精準(zhǔn)Excel數(shù)據(jù)排序的方法詳解
在數(shù)據(jù)處理或者數(shù)據(jù)分析的場景中,需要對(duì)已有的數(shù)據(jù)進(jìn)行排序,在Excel中可以通過排序功能進(jìn)行整理數(shù)據(jù),而在Java中,則可以借助Excel表格插件對(duì)數(shù)據(jù)進(jìn)行批量排序,下面我們就來學(xué)習(xí)一下常見的數(shù)據(jù)排序方法吧2023-10-10spring-data-jpa中findOne與getOne的區(qū)別說明
這篇文章主要介紹了spring-data-jpa中findOne與getOne的區(qū)別說明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11Java class文件格式之?dāng)?shù)據(jù)類型_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
這篇文章主要介紹了Java class文件格式之?dāng)?shù)據(jù)類型的相關(guān)資料,需要的朋友可以參考下2017-06-06JAVA提高第八篇 動(dòng)態(tài)代理技術(shù)
這篇文章主要為大家詳細(xì)介紹了JAVA動(dòng)態(tài)代理技術(shù)的相關(guān)資料,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-10-10SpringBoot中@Insert、@Update實(shí)現(xiàn)批量新增更新的使用示例
本文主要介紹了SpringBoot中@Insert、@Update實(shí)現(xiàn)批量新增更新的使用,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-10-10