利用Netty+SpringBoot實現(xiàn)定時后端向前端推送數(shù)據(jù)
本文介紹BIO,NIO,AIO 及如何使用Spring Boot集成Netty,實現(xiàn)后臺向前端推送信息的功能。利用Spring Boot簡化Netty的集成和配置。
1.BIO,NIO,AIO
BIO、NIO和AIO是Java編程語言中用于處理輸入輸出(IO)操作的三種不同的機制,它們分別代表 同步阻塞I/O,同步非阻塞I/O 和 異步非阻塞I/O。
1.1 BIO
BIO(Blocking IO) 是最傳統(tǒng)的IO模型,也稱為同步阻塞IO。它實現(xiàn)的是同步阻塞模型,即服務(wù)器實現(xiàn)模式為一個連接一個線程,即客戶端有連接請求時服務(wù)器端就需要啟動一個線程進行處理。如果這個連接不做任何事情會造成不必要的線程開銷,并且線程在進行IO操作期間是被阻塞的,無法進行其他任務(wù)。在高并發(fā)環(huán)境下,BIO的性能較差,因為它需要為每個連接創(chuàng)建一個線程,而且線程切換開銷較大,不過可以通過線程池機制改善。BIO適合一些簡單的、低頻的、短連接的通信場景,例如HTTP請求。

1.2 NIO
NIO是Java 1.4引入的新IO模型,也稱為同步非阻塞IO,它提供了一種基于事件驅(qū)動的方式來處理I/O操作。
相比于傳統(tǒng)的BIO模型,NIO采用了Channel、Buffer和Selector等組件,線程可以對某個IO事件進行監(jiān)聽,并繼續(xù)執(zhí)行其他任務(wù),不需要阻塞等待。當(dāng)IO事件就緒時,線程會得到通知,然后可以進行相應(yīng)的操作,實現(xiàn)了非阻塞式的高伸縮性網(wǎng)絡(luò)通信。在NIO模型中,數(shù)據(jù)總是從Channel讀入Buffer,或者從Buffer寫入Channel,這種模式提高了IO效率,并且可以充分利用系統(tǒng)資源。
NIO主要由三部分組成:選擇器(Selector)、緩沖區(qū)(Buffer)和通道(Channel)。Channel是一個可以進行數(shù)據(jù)讀寫的對象,所有的數(shù)據(jù)都通過Buffer來處理,這種方式避免了直接將字節(jié)寫入通道中,而是將數(shù)據(jù)寫入包含一個或者多個字節(jié)的緩沖區(qū)。在多線程模式下,一個線程可以處理多個請求,這是通過將客戶端的連接請求注冊到多路復(fù)用器上,然后由多路復(fù)用器輪詢到連接有I/O請求時進行處理。
對于NIO,如果從特性來看,它是非阻塞式IO,N是Non-Blocking的意思;如果從技術(shù)角度,NIO對于BIO來說是一個新技術(shù),N的意思是New的意思。所以NIO也常常被稱作Non-Blocking I/O或New I/O。
NIO適用于連接數(shù)目多且連接比較短(輕操作)的架構(gòu),例如聊天服務(wù)器、彈幕系統(tǒng)、服務(wù)器間通訊等。它通過引入非阻塞通道的概念,提高了系統(tǒng)的伸縮性和并發(fā)性能。同時,NIO的使用也簡化了程序編寫,提高了開發(fā)效率。

1.3 AIO
Java AIO(Asynchronous I/O)是Java提供的異步非阻塞IO編程模型,從Java 7版本開始支持,AIO又稱NIO 2.0。
相比于NIO模型,AIO模型更進一步地實現(xiàn)了異步非阻塞IO,提高了系統(tǒng)的并發(fā)性能和伸縮性。在NIO模型中,雖然可以通過多路復(fù)用器處理多個連接請求,但仍需要在每個連接上進行讀寫操作,這仍然存在一定的阻塞。而在AIO模型中,所有的IO操作都是異步的,不會阻塞任何線程,可以更好地利用系統(tǒng)資源。
AIO模型有以下特性:
- 異步能力:AIO模型的最大特性是異步能力,對于socket和I/O操作都有效。讀寫操作都是異步的,完成后會自動調(diào)用回調(diào)函數(shù)。
- 回調(diào)函數(shù):在AIO模型中,當(dāng)一個異步操作完成后,會通知相關(guān)線程進行后續(xù)處理,這種處理方式稱為“回調(diào)”?;卣{(diào)函數(shù)可以由開發(fā)者自行定義,用于處理異步操作的結(jié)果。
- 非阻塞:AIO模型實現(xiàn)了完全的異步非阻塞IO,不會阻塞任何線程,可以更好地利用系統(tǒng)資源。
- 高性能:由于AIO模型的異步能力和非阻塞特性,它可以更好地處理高并發(fā)、高伸縮性的網(wǎng)絡(luò)通信場景,進一步提高系統(tǒng)的性能和效率。
- 操作系統(tǒng)支持:AIO模型需要操作系統(tǒng)的支持,因此在不同的操作系統(tǒng)上可能會有不同的表現(xiàn)。在Linux內(nèi)核2.6版本之后增加了對真正異步IO的實現(xiàn)
2 Netty原理
2.1 Netty原理
Netty基于Java NIO(非阻塞IO)實現(xiàn),它采用事件驅(qū)動的編程模型,將IO操作抽象為事件,通過事件處理器來處理這些事件。Netty的主要組件包括:
- Bootstrap:用于啟動客戶端和服務(wù)器的引導(dǎo)類
- Channel:代表IO操作的通道,用于網(wǎng)絡(luò)讀寫操作
- ChannelHandler:用于處理IO事件的事件處理器
- EventLoopGroup:用于處理IO操作的多線程事件循環(huán)組
3 Spring Boot集成Netty和Websocket
在Spring Boot應(yīng)用程序中,我們可以通過集成Netty,實現(xiàn)后臺向前端推送信息的功能。首先,我們需要添加Netty依賴,然后在Spring Boot應(yīng)用程序中創(chuàng)建一個NettyServer類,用于初始化Websocket通道。
1.引入依賴
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.111.Final</version>
</dependency>2.創(chuàng)建 NettyConfig 配置管理所有管道
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.concurrent.ConcurrentHashMap;
@SuppressWarnings("all")
public class NettyConfig {
/**
* 定義全局單例channel組 管理所有channel
*/
private static volatile ChannelGroup channelGroup = null;
/**
* 存放請求ID與channel的對應(yīng)關(guān)系
*/
private static volatile ConcurrentHashMap<String, Channel> channelMap = null;
/**
* 定義兩把鎖
*/
private static final Object lock1 = new Object();
private static final Object lock2 = new Object();
public static ChannelGroup getChannelGroup() {
if (null == channelGroup) {
synchronized (lock1) {
if (null == channelGroup) {
channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
}
}
return channelGroup;
}
public static ConcurrentHashMap<String, Channel> getChannelMap() {
if (null == channelMap) {
synchronized (lock2) {
if (null == channelMap) {
channelMap = new ConcurrentHashMap<>();
}
}
}
return channelMap;
}
public static Channel getChannel(String userId) {
if (null == channelMap) {
return getChannelMap().get(userId);
}
return channelMap.get(userId);
}
}3.創(chuàng)建MyChannelHandlerPool 通道組池
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
/**
* MyChannelHandlerPool
* 通道組池,管理所有websocket連接
*/
public class MyChannelHandlerPool {
private MyChannelHandlerPool(){}
public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
4.創(chuàng)建NettyServer 初始化Netty
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* NettyServer Netty服務(wù)器配置
*/
@Slf4j
@Component
@SuppressWarnings("all")
public class NettyServer {
private String url = "/admin/socket";
public NettyServer() {}
public void start() throws Exception {
// 主事件組
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 執(zhí)行事件組
EventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
int port = 12345;
serverBootstrap.group(group, bossGroup) // 綁定線程池
.channel(NioServerSocketChannel.class) // 指定使用的channel
.localAddress(port)// 綁定監(jiān)聽端口
.childHandler(new ChannelInitializer<SocketChannel>() { // 綁定客戶端連接時候觸發(fā)操作
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 添加ObjectEncoder和ObjectDecoder來處理對象的序列化和反序列化
log.info("收到新連接");
//websocket協(xié)議本身是基于http協(xié)議的,所以這邊也要使用http解編碼器
ch.pipeline().addLast(new HttpServerCodec());
//以塊的方式來寫的處理器
ch.pipeline().addLast(new ChunkedWriteHandler());
ch.pipeline().addLast(new HttpObjectAggregator(8192));
ch.pipeline().addLast(new WebSocketHandler());
ch.pipeline().addLast(new WebSocketServerProtocolHandler(url, null, true, 65536 * 10));
}
});
// 綁定端口并同步等待直到綁定完成
ChannelFuture future = serverBootstrap.bind().sync();
log.info(NettyServer.class.getName() + "啟動正在監(jiān)聽: " + future.channel().localAddress());
// 等待服務(wù)器通道關(guān)閉
future.channel().closeFuture().sync();
} finally {
// 釋放線程池資源
group.shutdownGracefully().sync();
bossGroup.shutdownGracefully().sync();
}
}
}5. 創(chuàng)建 WebSocketHandler 執(zhí)行任務(wù)
@Slf4j
@Component
@SuppressWarnings("all")
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
public static final String NETTY_START = "Netty-start";
public WebSocketHandler() {}
private ScheduledFuture<?> sendDataTask;
@Autowired
private DeviceLevelFourService deviceLevelFourService;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//添加到channelGroup通道組
MyChannelHandlerPool.channelGroup.add(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("與客戶端斷開連接,通道關(guān)閉!");
//添加到channelGroup 通道組
MyChannelHandlerPool.channelGroup.remove(ctx.channel());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 首次連接是FullHttpRequest,處理參數(shù)
if (msg instanceof FullHttpRequest) {
FullHttpRequest request = (FullHttpRequest) msg;
String uri = request.uri();
Map<String, String> paramMap = getUrlParams(uri);
log.info("接收到的參數(shù)是:" + JSON.toJSONString(paramMap));
// 如果url包含參數(shù),需要處理
if (uri.contains("?")) {
String newUri = uri.substring(0, uri.indexOf("?"));
log.info(newUri);
request.setUri(newUri);
}
// 當(dāng)連接建立時,啟動定時任務(wù)
sendDataTask = ctx.channel().eventLoop().scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
if (deviceLevelFourService == null) {
deviceLevelFourService = SpringContextUtil.getBean(DeviceLevelFourServiceImpl.class);
}
// 此處可以接收路徑參數(shù) ,直接獲取前端傳遞參數(shù)
// "ws://localhost:12345/admin/socket?id=1"
String deviceId = paramMap.get("id");
/**
* -------此處為自己的數(shù)據(jù)---------
*/
// 調(diào)用service 得到前端需要的數(shù)據(jù),用JSON工具類轉(zhuǎn)換推送到前端
List<DeviceNettyData> deviceNettyData = deviceLevelFourService.handlerDeviceData(Long.parseLong(deviceId));
String json = JSON.toJSONString(deviceNettyData, SerializerFeature.WriteMapNullValue);
log.info(json);
// 將 JSON 字符串封裝為 TextWebSocketFrame
TextWebSocketFrame frameNetty = new TextWebSocketFrame(json);
ctx.writeAndFlush(frameNetty); // 發(fā)送 WebSocket 幀
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}, 0, 30, TimeUnit.SECONDS); // 立即開始,每30秒發(fā)送一次
// 調(diào)用父類方法,處理下一個handler
super.channelRead(ctx, request);
} else if (msg instanceof TextWebSocketFrame frame) {
// 正常的TEXT消息類型
sendAllMessage(frame.text());
// 繼續(xù)傳遞給后續(xù)handler
super.channelRead(ctx, frame);
} else {
// 如果消息類型不匹配,記錄警告或處理異常情況
log.error("未處理的消息類型:" + msg.getClass());
super.channelRead(ctx, msg); // 仍然傳遞給后續(xù)處理
}
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
log.info(channelHandlerContext.name());
}
//讀取完成刷新
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
//異常則關(guān)閉ChannelHandlerContext連接
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
private void sendAllMessage(String message){
//收到信息后,群發(fā)給所有channel
MyChannelHandlerPool.channelGroup.writeAndFlush( new TextWebSocketFrame(message));
}
private static Map<String ,String> getUrlParams(String url){
Map<String,String> map = new HashMap<>();
url = url.replace("?",";");
if (!url.contains(";")){
return map;
}
if (url.split(";").length > 0){
String[] arr = url.split(";")[1].split("&");
for (String s : arr){
String key = s.split("=")[0];
String value = s.split("=")[1];
map.put(key,value);
}
return map;
}else{
return map;
}
}
}6.創(chuàng)建NettyServerRunner 用來使用新線程調(diào)用NetterServer
@Slf4j
public class NettyServerRunner implements Runnable {
@Override
public void run() {
try {
new NettyServer().start();
} catch (Exception e) {
// 使用Logger進行日志記錄
log.error(e.getMessage(), e);
}
}
}7. 最后可以在隨意注入到spring容器類中,在項目啟動時候調(diào)用,也可以在其他訪問接口事件調(diào)用
// 方式一
@PostConstruct
public void init(){
// 在新的線程中運行Netty服務(wù)器
Thread thread = new Thread(new NettyServerRunner());
thread.start();
}
// 方式二調(diào)用
@GetMapping("/test/{id}")
public R getDevice(@PathVariable Long id){
// 在新的線程中運行Netty服務(wù)器
Thread thread = new Thread(new NettyServerRunner());
thread.start();
return deviceService.getDeviceAlarmCount(deviceId);
}注意:
使用新線程時候,spring容器注入的對象為空,容易產(chǎn)生空指針異常,可以借鑒WebSocketHandler 類中方法,重新從spring容器中獲取需要的對象。
總結(jié)
到此這篇關(guān)于利用Netty+SpringBoot實現(xiàn)定時后端向前端推送數(shù)據(jù)的文章就介紹到這了,更多相關(guān)SpringBoot定時后端向前端推送數(shù)據(jù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java并發(fā)編程之關(guān)鍵字volatile知識總結(jié)
今天帶大家學(xué)習(xí)java的相關(guān)知識,文章圍繞著Java關(guān)鍵字volatile展開,文中有非常詳細的知識總結(jié),需要的朋友可以參考下2021-06-06
IDEA SpringBoot項目配置熱更新的步驟詳解(無需每次手動重啟服務(wù)器)
這篇文章主要介紹了IDEA SpringBoot項目配置熱更新的步驟,無需每次手動重啟服務(wù)器,本文通過圖文實例代碼相結(jié)合給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-04-04
springboot整合Mybatis-plus的實現(xiàn)
這篇文章主要介紹了springboot整合Mybatis-plus的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09
SpringBoot優(yōu)雅地實現(xiàn)全局異常處理的方法詳解
這篇文章主要為大家詳細介紹了SpringBoot如何優(yōu)雅地實現(xiàn)全局異常處理,文中的示例代碼講解詳細,感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2022-08-08
Springboot通過請求頭獲取當(dāng)前用戶信息方法詳細示范
這篇文章主要介紹了Springboot通過請求頭獲取當(dāng)前用戶信息的方法,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧2022-11-11

