欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

利用Netty+SpringBoot實現(xiàn)定時后端向前端推送數(shù)據(jù)

 更新時間:2025年01月18日 09:43:28   作者:xiaohong_strive  
這篇文章主要介紹了BIO、NIO、AIO三種Java?IO模型,并探討了如何使用Spring?Boot集成Netty實現(xiàn)后臺向前端推送信息的功能,文中通過代碼介紹的非常詳細,需要的朋友可以參考下

本文介紹BIO,NIO,AIO 及如何使用Spring Boot集成Netty,實現(xiàn)后臺向前端推送信息的功能。利用Spring Boot簡化Netty的集成和配置。

1.BIO,NIO,AIO

BIO、NIO和AIO是Java編程語言中用于處理輸入輸出(IO)操作的三種不同的機制,它們分別代表 同步阻塞I/O,同步非阻塞I/O 和 異步非阻塞I/O。

1.1 BIO

BIO(Blocking IO) 是最傳統(tǒng)的IO模型,也稱為同步阻塞IO。它實現(xiàn)的是同步阻塞模型,即服務器實現(xiàn)模式為一個連接一個線程,即客戶端有連接請求時服務器端就需要啟動一個線程進行處理。如果這個連接不做任何事情會造成不必要的線程開銷,并且線程在進行IO操作期間是被阻塞的,無法進行其他任務。在高并發(fā)環(huán)境下,BIO的性能較差,因為它需要為每個連接創(chuàng)建一個線程,而且線程切換開銷較大,不過可以通過線程池機制改善。BIO適合一些簡單的、低頻的、短連接的通信場景,例如HTTP請求。

1.2 NIO

NIO是Java 1.4引入的新IO模型,也稱為同步非阻塞IO,它提供了一種基于事件驅(qū)動的方式來處理I/O操作。

相比于傳統(tǒng)的BIO模型,NIO采用了Channel、Buffer和Selector等組件,線程可以對某個IO事件進行監(jiān)聽,并繼續(xù)執(zhí)行其他任務,不需要阻塞等待。當IO事件就緒時,線程會得到通知,然后可以進行相應的操作,實現(xiàn)了非阻塞式的高伸縮性網(wǎng)絡通信。在NIO模型中,數(shù)據(jù)總是從Channel讀入Buffer,或者從Buffer寫入Channel,這種模式提高了IO效率,并且可以充分利用系統(tǒng)資源。

NIO主要由三部分組成:選擇器(Selector)、緩沖區(qū)(Buffer)通道(Channel)。Channel是一個可以進行數(shù)據(jù)讀寫的對象,所有的數(shù)據(jù)都通過Buffer來處理,這種方式避免了直接將字節(jié)寫入通道中,而是將數(shù)據(jù)寫入包含一個或者多個字節(jié)的緩沖區(qū)。在多線程模式下,一個線程可以處理多個請求,這是通過將客戶端的連接請求注冊到多路復用器上,然后由多路復用器輪詢到連接有I/O請求時進行處理。

對于NIO,如果從特性來看,它是非阻塞式IO,N是Non-Blocking的意思;如果從技術(shù)角度,NIO對于BIO來說是一個新技術(shù),N的意思是New的意思。所以NIO也常常被稱作Non-Blocking I/ONew I/O。

NIO適用于連接數(shù)目多且連接比較短(輕操作)的架構(gòu),例如聊天服務器、彈幕系統(tǒng)、服務器間通訊等。它通過引入非阻塞通道的概念,提高了系統(tǒng)的伸縮性和并發(fā)性能。同時,NIO的使用也簡化了程序編寫,提高了開發(fā)效率。

1.3 AIO

Java AIO(Asynchronous I/O)是Java提供的異步非阻塞IO編程模型,從Java 7版本開始支持,AIO又稱NIO 2.0。

相比于NIO模型,AIO模型更進一步地實現(xiàn)了異步非阻塞IO,提高了系統(tǒng)的并發(fā)性能和伸縮性。在NIO模型中,雖然可以通過多路復用器處理多個連接請求,但仍需要在每個連接上進行讀寫操作,這仍然存在一定的阻塞。而在AIO模型中,所有的IO操作都是異步的,不會阻塞任何線程,可以更好地利用系統(tǒng)資源。

AIO模型有以下特性:

  • 異步能力:AIO模型的最大特性是異步能力,對于socket和I/O操作都有效。讀寫操作都是異步的,完成后會自動調(diào)用回調(diào)函數(shù)。
  • 回調(diào)函數(shù):在AIO模型中,當一個異步操作完成后,會通知相關(guān)線程進行后續(xù)處理,這種處理方式稱為“回調(diào)”?;卣{(diào)函數(shù)可以由開發(fā)者自行定義,用于處理異步操作的結(jié)果。
  • 非阻塞:AIO模型實現(xiàn)了完全的異步非阻塞IO,不會阻塞任何線程,可以更好地利用系統(tǒng)資源。
  • 高性能:由于AIO模型的異步能力和非阻塞特性,它可以更好地處理高并發(fā)、高伸縮性的網(wǎng)絡通信場景,進一步提高系統(tǒng)的性能和效率。
  • 操作系統(tǒng)支持:AIO模型需要操作系統(tǒng)的支持,因此在不同的操作系統(tǒng)上可能會有不同的表現(xiàn)。在Linux內(nèi)核2.6版本之后增加了對真正異步IO的實現(xiàn)

2 Netty原理

2.1 Netty原理

Netty基于Java NIO(非阻塞IO)實現(xiàn),它采用事件驅(qū)動的編程模型,將IO操作抽象為事件,通過事件處理器來處理這些事件。Netty的主要組件包括:

  • Bootstrap:用于啟動客戶端和服務器的引導類
  • Channel:代表IO操作的通道,用于網(wǎng)絡讀寫操作
  • ChannelHandler:用于處理IO事件的事件處理器
  • EventLoopGroup:用于處理IO操作的多線程事件循環(huán)組

3  Spring Boot集成Netty和Websocket

在Spring Boot應用程序中,我們可以通過集成Netty,實現(xiàn)后臺向前端推送信息的功能。首先,我們需要添加Netty依賴,然后在Spring Boot應用程序中創(chuàng)建一個NettyServer類,用于初始化Websocket通道。

1.引入依賴

<dependency>
	<groupId>io.netty</groupId>
	<artifactId>netty-all</artifactId>
    <version>4.1.111.Final</version>
</dependency>

2.創(chuàng)建 NettyConfig 配置管理所有管道

import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.concurrent.ConcurrentHashMap;

@SuppressWarnings("all")
public class NettyConfig {
	/**
	 * 定義全局單例channel組 管理所有channel
	 */
	private static volatile ChannelGroup channelGroup = null;

	/**
	 * 存放請求ID與channel的對應關(guān)系
	 */
	private static volatile ConcurrentHashMap<String, Channel> channelMap = null;

	/**
	 * 定義兩把鎖
	 */
	private static final Object lock1 = new Object();
	private static final Object lock2 = new Object();


	public static ChannelGroup getChannelGroup() {
		if (null == channelGroup) {
			synchronized (lock1) {
				if (null == channelGroup) {
					channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
				}
			}
		}
		return channelGroup;
	}

	public static ConcurrentHashMap<String, Channel> getChannelMap() {
		if (null == channelMap) {
			synchronized (lock2) {
				if (null == channelMap) {
					channelMap = new ConcurrentHashMap<>();
				}
			}
		}
		return channelMap;
	}

	public static Channel getChannel(String userId) {
		if (null == channelMap) {
			return getChannelMap().get(userId);
		}
		return channelMap.get(userId);
	}


}

3.創(chuàng)建MyChannelHandlerPool 通道組池

import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

/**
 * MyChannelHandlerPool
 * 通道組池,管理所有websocket連接
 */
public class MyChannelHandlerPool {

	private MyChannelHandlerPool(){}

	public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);


}

4.創(chuàng)建NettyServer 初始化Netty

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * NettyServer Netty服務器配置
 */
@Slf4j
@Component
@SuppressWarnings("all")
public class NettyServer {

	private String url = "/admin/socket";

	public NettyServer() {}

    public void start() throws Exception {
		// 主事件組
        EventLoopGroup bossGroup = new NioEventLoopGroup();
		// 執(zhí)行事件組
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
			int port = 12345;
			serverBootstrap.group(group, bossGroup) // 綁定線程池
                    .channel(NioServerSocketChannel.class) // 指定使用的channel
                    .localAddress(port)// 綁定監(jiān)聽端口
                    .childHandler(new ChannelInitializer<SocketChannel>() { // 綁定客戶端連接時候觸發(fā)操作
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // 添加ObjectEncoder和ObjectDecoder來處理對象的序列化和反序列化
							log.info("收到新連接");
                            //websocket協(xié)議本身是基于http協(xié)議的,所以這邊也要使用http解編碼器
                            ch.pipeline().addLast(new HttpServerCodec());
                            //以塊的方式來寫的處理器
                            ch.pipeline().addLast(new ChunkedWriteHandler());
                            ch.pipeline().addLast(new HttpObjectAggregator(8192));
                            ch.pipeline().addLast(new WebSocketHandler());
                            ch.pipeline().addLast(new WebSocketServerProtocolHandler(url, null, true, 65536 * 10));
                        }
                    });
			// 綁定端口并同步等待直到綁定完成
			ChannelFuture future = serverBootstrap.bind().sync();
			log.info(NettyServer.class.getName() + "啟動正在監(jiān)聽: " + future.channel().localAddress());
			// 等待服務器通道關(guān)閉
			future.channel().closeFuture().sync();
        } finally {
			// 釋放線程池資源
            group.shutdownGracefully().sync();
            bossGroup.shutdownGracefully().sync();
        }
    }


}

 5. 創(chuàng)建 WebSocketHandler 執(zhí)行任務

@Slf4j
@Component
@SuppressWarnings("all")
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {


	public static final String NETTY_START = "Netty-start";

	public WebSocketHandler() {}

	private ScheduledFuture<?> sendDataTask;

	@Autowired
	private DeviceLevelFourService deviceLevelFourService;

	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		//添加到channelGroup通道組
		MyChannelHandlerPool.channelGroup.add(ctx.channel());
	}



	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		log.info("與客戶端斷開連接,通道關(guān)閉!");
		//添加到channelGroup 通道組
		MyChannelHandlerPool.channelGroup.remove(ctx.channel());
	}

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		// 首次連接是FullHttpRequest,處理參數(shù)
		if (msg instanceof FullHttpRequest) {
			FullHttpRequest request = (FullHttpRequest) msg;
			String uri = request.uri();
			Map<String, String> paramMap = getUrlParams(uri);
			log.info("接收到的參數(shù)是:" + JSON.toJSONString(paramMap));
			// 如果url包含參數(shù),需要處理
			if (uri.contains("?")) {
				String newUri = uri.substring(0, uri.indexOf("?"));
				log.info(newUri);
				request.setUri(newUri);
			}
			// 當連接建立時,啟動定時任務
			sendDataTask = ctx.channel().eventLoop().scheduleAtFixedRate(new Runnable() {
				@Override
				public void run() {
					try {
						if (deviceLevelFourService == null) {
							deviceLevelFourService = SpringContextUtil.getBean(DeviceLevelFourServiceImpl.class);
						}
                        // 此處可以接收路徑參數(shù) ,直接獲取前端傳遞參數(shù)
                        // "ws://localhost:12345/admin/socket?id=1"
						String deviceId = paramMap.get("id");
                        /**
                         * -------此處為自己的數(shù)據(jù)---------
                         */
                        // 調(diào)用service 得到前端需要的數(shù)據(jù),用JSON工具類轉(zhuǎn)換推送到前端
						List<DeviceNettyData> deviceNettyData = deviceLevelFourService.handlerDeviceData(Long.parseLong(deviceId));
						String json = JSON.toJSONString(deviceNettyData, SerializerFeature.WriteMapNullValue);
						log.info(json);
						// 將 JSON 字符串封裝為 TextWebSocketFrame
						TextWebSocketFrame frameNetty = new TextWebSocketFrame(json);
						ctx.writeAndFlush(frameNetty); // 發(fā)送 WebSocket 幀
					} catch (Exception e) {
						log.error(e.getMessage(), e);
					}
				}
			}, 0, 30, TimeUnit.SECONDS); // 立即開始,每30秒發(fā)送一次
			// 調(diào)用父類方法,處理下一個handler
			super.channelRead(ctx, request);
		} else if (msg instanceof TextWebSocketFrame frame) {
			// 正常的TEXT消息類型
			sendAllMessage(frame.text());
			// 繼續(xù)傳遞給后續(xù)handler
			super.channelRead(ctx, frame);
		} else {
			// 如果消息類型不匹配,記錄警告或處理異常情況
			log.error("未處理的消息類型:" + msg.getClass());
			super.channelRead(ctx, msg); // 仍然傳遞給后續(xù)處理
		}
	}

	@Override
	protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
		log.info(channelHandlerContext.name());
	}

	//讀取完成刷新
	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) {
		ctx.flush();
	}

	//異常則關(guān)閉ChannelHandlerContext連接
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		// Close the connection when an exception is raised.
		cause.printStackTrace();
		ctx.close();
	}

	private void sendAllMessage(String message){
		//收到信息后,群發(fā)給所有channel
		MyChannelHandlerPool.channelGroup.writeAndFlush( new TextWebSocketFrame(message));
	}

	private static Map<String ,String> getUrlParams(String url){
		Map<String,String> map = new HashMap<>();
		url = url.replace("?",";");
		if (!url.contains(";")){
			return map;
		}
		if (url.split(";").length > 0){
			String[] arr = url.split(";")[1].split("&");
			for (String s : arr){
				String key = s.split("=")[0];
				String value = s.split("=")[1];
				map.put(key,value);
			}
			return  map;

		}else{
			return map;
		}
	}



}

6.創(chuàng)建NettyServerRunner 用來使用新線程調(diào)用NetterServer

@Slf4j
public class NettyServerRunner implements Runnable {
	@Override
	public void run() {
		try {
			new NettyServer().start();
		} catch (Exception e) {
			// 使用Logger進行日志記錄
			log.error(e.getMessage(), e);
		}
	}

}

7. 最后可以在隨意注入到spring容器類中,在項目啟動時候調(diào)用,也可以在其他訪問接口事件調(diào)用

// 方式一
@PostConstruct
public void init(){
	// 在新的線程中運行Netty服務器
	Thread thread = new Thread(new NettyServerRunner());
	thread.start();
}




// 方式二調(diào)用
@GetMapping("/test/{id}")
public R getDevice(@PathVariable Long id){
	// 在新的線程中運行Netty服務器
	Thread thread = new Thread(new NettyServerRunner());
	thread.start();
	return deviceService.getDeviceAlarmCount(deviceId);
}

注意:

使用新線程時候,spring容器注入的對象為空,容易產(chǎn)生空指針異常,可以借鑒WebSocketHandler 類中方法,重新從spring容器中獲取需要的對象。

總結(jié)

到此這篇關(guān)于利用Netty+SpringBoot實現(xiàn)定時后端向前端推送數(shù)據(jù)的文章就介紹到這了,更多相關(guān)SpringBoot定時后端向前端推送數(shù)據(jù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java并發(fā)編程之關(guān)鍵字volatile知識總結(jié)

    Java并發(fā)編程之關(guān)鍵字volatile知識總結(jié)

    今天帶大家學習java的相關(guān)知識,文章圍繞著Java關(guān)鍵字volatile展開,文中有非常詳細的知識總結(jié),需要的朋友可以參考下
    2021-06-06
  • Java中try catch 的基本用法示例

    Java中try catch 的基本用法示例

    這篇文章主要給大家介紹了關(guān)于Java中try catch 的基本用法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2021-01-01
  • Springboot如何使用外部yml啟動

    Springboot如何使用外部yml啟動

    這篇文章主要介紹了Springboot如何使用外部yml啟動問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-05-05
  • IDEA SpringBoot項目配置熱更新的步驟詳解(無需每次手動重啟服務器)

    IDEA SpringBoot項目配置熱更新的步驟詳解(無需每次手動重啟服務器)

    這篇文章主要介紹了IDEA SpringBoot項目配置熱更新的步驟,無需每次手動重啟服務器,本文通過圖文實例代碼相結(jié)合給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-04-04
  • springboot整合Mybatis-plus的實現(xiàn)

    springboot整合Mybatis-plus的實現(xiàn)

    這篇文章主要介紹了springboot整合Mybatis-plus的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-09-09
  • Spring Boot JPA訪問Mysql示例

    Spring Boot JPA訪問Mysql示例

    本篇文章主要介紹了Spring Boot JPA訪問Mysql示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-03-03
  • java web項目里ehcache.xml介紹

    java web項目里ehcache.xml介紹

    java web項目里ehcache.xml介紹,需要的朋友可以參考一下
    2013-03-03
  • java簡單實現(xiàn)桌球滾動效果

    java簡單實現(xiàn)桌球滾動效果

    這篇文章主要為大家詳細介紹了java簡單實現(xiàn)桌球滾動效果,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2019-10-10
  • SpringBoot優(yōu)雅地實現(xiàn)全局異常處理的方法詳解

    SpringBoot優(yōu)雅地實現(xiàn)全局異常處理的方法詳解

    這篇文章主要為大家詳細介紹了SpringBoot如何優(yōu)雅地實現(xiàn)全局異常處理,文中的示例代碼講解詳細,感興趣的小伙伴可以跟隨小編一起學習一下
    2022-08-08
  • Springboot通過請求頭獲取當前用戶信息方法詳細示范

    Springboot通過請求頭獲取當前用戶信息方法詳細示范

    這篇文章主要介紹了Springboot通過請求頭獲取當前用戶信息的方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習吧
    2022-11-11

最新評論