利用Netty+SpringBoot實現(xiàn)定時后端向前端推送數(shù)據(jù)
本文介紹BIO,NIO,AIO 及如何使用Spring Boot集成Netty,實現(xiàn)后臺向前端推送信息的功能。利用Spring Boot簡化Netty的集成和配置。
1.BIO,NIO,AIO
BIO、NIO和AIO是Java編程語言中用于處理輸入輸出(IO)操作的三種不同的機制,它們分別代表 同步阻塞I/O,同步非阻塞I/O 和 異步非阻塞I/O。
1.1 BIO
BIO(Blocking IO) 是最傳統(tǒng)的IO模型,也稱為同步阻塞IO。它實現(xiàn)的是同步阻塞模型,即服務器實現(xiàn)模式為一個連接一個線程,即客戶端有連接請求時服務器端就需要啟動一個線程進行處理。如果這個連接不做任何事情會造成不必要的線程開銷,并且線程在進行IO操作期間是被阻塞的,無法進行其他任務。在高并發(fā)環(huán)境下,BIO的性能較差,因為它需要為每個連接創(chuàng)建一個線程,而且線程切換開銷較大,不過可以通過線程池機制改善。BIO適合一些簡單的、低頻的、短連接的通信場景,例如HTTP請求。
1.2 NIO
NIO是Java 1.4引入的新IO模型,也稱為同步非阻塞IO,它提供了一種基于事件驅(qū)動的方式來處理I/O操作。
相比于傳統(tǒng)的BIO模型,NIO采用了Channel、Buffer和Selector等組件,線程可以對某個IO事件進行監(jiān)聽,并繼續(xù)執(zhí)行其他任務,不需要阻塞等待。當IO事件就緒時,線程會得到通知,然后可以進行相應的操作,實現(xiàn)了非阻塞式的高伸縮性網(wǎng)絡通信。在NIO模型中,數(shù)據(jù)總是從Channel讀入Buffer,或者從Buffer寫入Channel,這種模式提高了IO效率,并且可以充分利用系統(tǒng)資源。
NIO主要由三部分組成:選擇器(Selector)、緩沖區(qū)(Buffer)和通道(Channel)。Channel是一個可以進行數(shù)據(jù)讀寫的對象,所有的數(shù)據(jù)都通過Buffer來處理,這種方式避免了直接將字節(jié)寫入通道中,而是將數(shù)據(jù)寫入包含一個或者多個字節(jié)的緩沖區(qū)。在多線程模式下,一個線程可以處理多個請求,這是通過將客戶端的連接請求注冊到多路復用器上,然后由多路復用器輪詢到連接有I/O請求時進行處理。
對于NIO,如果從特性來看,它是非阻塞式IO,N是Non-Blocking的意思;如果從技術(shù)角度,NIO對于BIO來說是一個新技術(shù),N的意思是New的意思。所以NIO也常常被稱作Non-Blocking I/O或New I/O。
NIO適用于連接數(shù)目多且連接比較短(輕操作)的架構(gòu),例如聊天服務器、彈幕系統(tǒng)、服務器間通訊等。它通過引入非阻塞通道的概念,提高了系統(tǒng)的伸縮性和并發(fā)性能。同時,NIO的使用也簡化了程序編寫,提高了開發(fā)效率。
1.3 AIO
Java AIO(Asynchronous I/O)是Java提供的異步非阻塞IO編程模型,從Java 7版本開始支持,AIO又稱NIO 2.0。
相比于NIO模型,AIO模型更進一步地實現(xiàn)了異步非阻塞IO,提高了系統(tǒng)的并發(fā)性能和伸縮性。在NIO模型中,雖然可以通過多路復用器處理多個連接請求,但仍需要在每個連接上進行讀寫操作,這仍然存在一定的阻塞。而在AIO模型中,所有的IO操作都是異步的,不會阻塞任何線程,可以更好地利用系統(tǒng)資源。
AIO模型有以下特性:
- 異步能力:AIO模型的最大特性是異步能力,對于socket和I/O操作都有效。讀寫操作都是異步的,完成后會自動調(diào)用回調(diào)函數(shù)。
- 回調(diào)函數(shù):在AIO模型中,當一個異步操作完成后,會通知相關(guān)線程進行后續(xù)處理,這種處理方式稱為“回調(diào)”?;卣{(diào)函數(shù)可以由開發(fā)者自行定義,用于處理異步操作的結(jié)果。
- 非阻塞:AIO模型實現(xiàn)了完全的異步非阻塞IO,不會阻塞任何線程,可以更好地利用系統(tǒng)資源。
- 高性能:由于AIO模型的異步能力和非阻塞特性,它可以更好地處理高并發(fā)、高伸縮性的網(wǎng)絡通信場景,進一步提高系統(tǒng)的性能和效率。
- 操作系統(tǒng)支持:AIO模型需要操作系統(tǒng)的支持,因此在不同的操作系統(tǒng)上可能會有不同的表現(xiàn)。在Linux內(nèi)核2.6版本之后增加了對真正異步IO的實現(xiàn)
2 Netty原理
2.1 Netty原理
Netty基于Java NIO(非阻塞IO)實現(xiàn),它采用事件驅(qū)動的編程模型,將IO操作抽象為事件,通過事件處理器來處理這些事件。Netty的主要組件包括:
- Bootstrap:用于啟動客戶端和服務器的引導類
- Channel:代表IO操作的通道,用于網(wǎng)絡讀寫操作
- ChannelHandler:用于處理IO事件的事件處理器
- EventLoopGroup:用于處理IO操作的多線程事件循環(huán)組
3 Spring Boot集成Netty和Websocket
在Spring Boot應用程序中,我們可以通過集成Netty,實現(xiàn)后臺向前端推送信息的功能。首先,我們需要添加Netty依賴,然后在Spring Boot應用程序中創(chuàng)建一個NettyServer類,用于初始化Websocket通道。
1.引入依賴
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.111.Final</version> </dependency>
2.創(chuàng)建 NettyConfig 配置管理所有管道
import io.netty.channel.Channel; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; import java.util.concurrent.ConcurrentHashMap; @SuppressWarnings("all") public class NettyConfig { /** * 定義全局單例channel組 管理所有channel */ private static volatile ChannelGroup channelGroup = null; /** * 存放請求ID與channel的對應關(guān)系 */ private static volatile ConcurrentHashMap<String, Channel> channelMap = null; /** * 定義兩把鎖 */ private static final Object lock1 = new Object(); private static final Object lock2 = new Object(); public static ChannelGroup getChannelGroup() { if (null == channelGroup) { synchronized (lock1) { if (null == channelGroup) { channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); } } } return channelGroup; } public static ConcurrentHashMap<String, Channel> getChannelMap() { if (null == channelMap) { synchronized (lock2) { if (null == channelMap) { channelMap = new ConcurrentHashMap<>(); } } } return channelMap; } public static Channel getChannel(String userId) { if (null == channelMap) { return getChannelMap().get(userId); } return channelMap.get(userId); } }
3.創(chuàng)建MyChannelHandlerPool 通道組池
import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; /** * MyChannelHandlerPool * 通道組池,管理所有websocket連接 */ public class MyChannelHandlerPool { private MyChannelHandlerPool(){} public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); }
4.創(chuàng)建NettyServer 初始化Netty
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; /** * NettyServer Netty服務器配置 */ @Slf4j @Component @SuppressWarnings("all") public class NettyServer { private String url = "/admin/socket"; public NettyServer() {} public void start() throws Exception { // 主事件組 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 執(zhí)行事件組 EventLoopGroup group = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024); int port = 12345; serverBootstrap.group(group, bossGroup) // 綁定線程池 .channel(NioServerSocketChannel.class) // 指定使用的channel .localAddress(port)// 綁定監(jiān)聽端口 .childHandler(new ChannelInitializer<SocketChannel>() { // 綁定客戶端連接時候觸發(fā)操作 @Override protected void initChannel(SocketChannel ch) throws Exception { // 添加ObjectEncoder和ObjectDecoder來處理對象的序列化和反序列化 log.info("收到新連接"); //websocket協(xié)議本身是基于http協(xié)議的,所以這邊也要使用http解編碼器 ch.pipeline().addLast(new HttpServerCodec()); //以塊的方式來寫的處理器 ch.pipeline().addLast(new ChunkedWriteHandler()); ch.pipeline().addLast(new HttpObjectAggregator(8192)); ch.pipeline().addLast(new WebSocketHandler()); ch.pipeline().addLast(new WebSocketServerProtocolHandler(url, null, true, 65536 * 10)); } }); // 綁定端口并同步等待直到綁定完成 ChannelFuture future = serverBootstrap.bind().sync(); log.info(NettyServer.class.getName() + "啟動正在監(jiān)聽: " + future.channel().localAddress()); // 等待服務器通道關(guān)閉 future.channel().closeFuture().sync(); } finally { // 釋放線程池資源 group.shutdownGracefully().sync(); bossGroup.shutdownGracefully().sync(); } } }
5. 創(chuàng)建 WebSocketHandler 執(zhí)行任務
@Slf4j @Component @SuppressWarnings("all") public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { public static final String NETTY_START = "Netty-start"; public WebSocketHandler() {} private ScheduledFuture<?> sendDataTask; @Autowired private DeviceLevelFourService deviceLevelFourService; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //添加到channelGroup通道組 MyChannelHandlerPool.channelGroup.add(ctx.channel()); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("與客戶端斷開連接,通道關(guān)閉!"); //添加到channelGroup 通道組 MyChannelHandlerPool.channelGroup.remove(ctx.channel()); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 首次連接是FullHttpRequest,處理參數(shù) if (msg instanceof FullHttpRequest) { FullHttpRequest request = (FullHttpRequest) msg; String uri = request.uri(); Map<String, String> paramMap = getUrlParams(uri); log.info("接收到的參數(shù)是:" + JSON.toJSONString(paramMap)); // 如果url包含參數(shù),需要處理 if (uri.contains("?")) { String newUri = uri.substring(0, uri.indexOf("?")); log.info(newUri); request.setUri(newUri); } // 當連接建立時,啟動定時任務 sendDataTask = ctx.channel().eventLoop().scheduleAtFixedRate(new Runnable() { @Override public void run() { try { if (deviceLevelFourService == null) { deviceLevelFourService = SpringContextUtil.getBean(DeviceLevelFourServiceImpl.class); } // 此處可以接收路徑參數(shù) ,直接獲取前端傳遞參數(shù) // "ws://localhost:12345/admin/socket?id=1" String deviceId = paramMap.get("id"); /** * -------此處為自己的數(shù)據(jù)--------- */ // 調(diào)用service 得到前端需要的數(shù)據(jù),用JSON工具類轉(zhuǎn)換推送到前端 List<DeviceNettyData> deviceNettyData = deviceLevelFourService.handlerDeviceData(Long.parseLong(deviceId)); String json = JSON.toJSONString(deviceNettyData, SerializerFeature.WriteMapNullValue); log.info(json); // 將 JSON 字符串封裝為 TextWebSocketFrame TextWebSocketFrame frameNetty = new TextWebSocketFrame(json); ctx.writeAndFlush(frameNetty); // 發(fā)送 WebSocket 幀 } catch (Exception e) { log.error(e.getMessage(), e); } } }, 0, 30, TimeUnit.SECONDS); // 立即開始,每30秒發(fā)送一次 // 調(diào)用父類方法,處理下一個handler super.channelRead(ctx, request); } else if (msg instanceof TextWebSocketFrame frame) { // 正常的TEXT消息類型 sendAllMessage(frame.text()); // 繼續(xù)傳遞給后續(xù)handler super.channelRead(ctx, frame); } else { // 如果消息類型不匹配,記錄警告或處理異常情況 log.error("未處理的消息類型:" + msg.getClass()); super.channelRead(ctx, msg); // 仍然傳遞給后續(xù)處理 } } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception { log.info(channelHandlerContext.name()); } //讀取完成刷新 @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } //異常則關(guān)閉ChannelHandlerContext連接 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } private void sendAllMessage(String message){ //收到信息后,群發(fā)給所有channel MyChannelHandlerPool.channelGroup.writeAndFlush( new TextWebSocketFrame(message)); } private static Map<String ,String> getUrlParams(String url){ Map<String,String> map = new HashMap<>(); url = url.replace("?",";"); if (!url.contains(";")){ return map; } if (url.split(";").length > 0){ String[] arr = url.split(";")[1].split("&"); for (String s : arr){ String key = s.split("=")[0]; String value = s.split("=")[1]; map.put(key,value); } return map; }else{ return map; } } }
6.創(chuàng)建NettyServerRunner 用來使用新線程調(diào)用NetterServer
@Slf4j public class NettyServerRunner implements Runnable { @Override public void run() { try { new NettyServer().start(); } catch (Exception e) { // 使用Logger進行日志記錄 log.error(e.getMessage(), e); } } }
7. 最后可以在隨意注入到spring容器類中,在項目啟動時候調(diào)用,也可以在其他訪問接口事件調(diào)用
// 方式一 @PostConstruct public void init(){ // 在新的線程中運行Netty服務器 Thread thread = new Thread(new NettyServerRunner()); thread.start(); } // 方式二調(diào)用 @GetMapping("/test/{id}") public R getDevice(@PathVariable Long id){ // 在新的線程中運行Netty服務器 Thread thread = new Thread(new NettyServerRunner()); thread.start(); return deviceService.getDeviceAlarmCount(deviceId); }
注意:
使用新線程時候,spring容器注入的對象為空,容易產(chǎn)生空指針異常,可以借鑒WebSocketHandler 類中方法,重新從spring容器中獲取需要的對象。
總結(jié)
到此這篇關(guān)于利用Netty+SpringBoot實現(xiàn)定時后端向前端推送數(shù)據(jù)的文章就介紹到這了,更多相關(guān)SpringBoot定時后端向前端推送數(shù)據(jù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java并發(fā)編程之關(guān)鍵字volatile知識總結(jié)
今天帶大家學習java的相關(guān)知識,文章圍繞著Java關(guān)鍵字volatile展開,文中有非常詳細的知識總結(jié),需要的朋友可以參考下2021-06-06IDEA SpringBoot項目配置熱更新的步驟詳解(無需每次手動重啟服務器)
這篇文章主要介紹了IDEA SpringBoot項目配置熱更新的步驟,無需每次手動重啟服務器,本文通過圖文實例代碼相結(jié)合給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-04-04springboot整合Mybatis-plus的實現(xiàn)
這篇文章主要介紹了springboot整合Mybatis-plus的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-09-09SpringBoot優(yōu)雅地實現(xiàn)全局異常處理的方法詳解
這篇文章主要為大家詳細介紹了SpringBoot如何優(yōu)雅地實現(xiàn)全局異常處理,文中的示例代碼講解詳細,感興趣的小伙伴可以跟隨小編一起學習一下2022-08-08