SpringBoot+Netty+WebSocket實(shí)現(xiàn)消息發(fā)送的示例代碼
一.導(dǎo)入Netty依賴
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.25.Final</version> </dependency>
二.搭建websocket服務(wù)器
@Component
public class WebSocketServer {
/**
* 主線程池
*/
private EventLoopGroup bossGroup;
/**
* 工作線程池
*/
private EventLoopGroup workerGroup;
/**
* 服務(wù)器
*/
private ServerBootstrap server;
/**
* 回調(diào)
*/
private ChannelFuture future;
public void start() {
future = server.bind(9001);
System.out.println("netty server - 啟動(dòng)成功");
}
public WebSocketServer() {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
server = new ServerBootstrap();
server.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new WebsocketInitializer());
}
}
三.初始化Websocket
public class WebsocketInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// ------------------
// 用于支持Http協(xié)議
// ------------------
// websocket基于http協(xié)議,需要有http的編解碼器
pipeline.addLast(new HttpServerCodec());
// 對(duì)寫(xiě)大數(shù)據(jù)流的支持
pipeline.addLast(new ChunkedWriteHandler());
// 添加對(duì)HTTP請(qǐng)求和響應(yīng)的聚合器:只要使用Netty進(jìn)行Http編程都需要使用
//設(shè)置單次請(qǐng)求的文件的大小
pipeline.addLast(new HttpObjectAggregator(1024 * 64));
//webSocket 服務(wù)器處理的協(xié)議,用于指定給客戶端連接訪問(wèn)的路由 :/ws
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
// 添加Netty空閑超時(shí)檢查的支持
// 1. 讀空閑超時(shí)(超過(guò)一定的時(shí)間會(huì)發(fā)送對(duì)應(yīng)的事件消息)
// 2. 寫(xiě)空閑超時(shí)
// 3. 讀寫(xiě)空閑超時(shí)
pipeline.addLast(new IdleStateHandler(4, 8, 12));
//添加心跳處理
pipeline.addLast(new HearBeatHandler());
// 添加自定義的handler
pipeline.addLast(new ChatHandler());
}
}
四.創(chuàng)建Netty監(jiān)聽(tīng)器
@Component
public class NettyListener implements ApplicationListener<ContextRefreshedEvent> {
@Resource
private WebSocketServer websocketServer;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if(event.getApplicationContext().getParent() == null) {
try {
websocketServer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
五.建立消息通道
public class UserChannelMap {
/**
* 用戶保存用戶id與通道的Map對(duì)象
*/
// private static Map<String, Channel> userChannelMap;
/* static {
userChannelMap = new HashMap<String, Channel>();
}*/
/**
* 定義一個(gè)channel組,管理所有的channel
* GlobalEventExecutor.INSTANCE 是全局的事件執(zhí)行器,是一個(gè)單例
*/
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 存放用戶與Chanel的對(duì)應(yīng)信息,用于給指定用戶發(fā)送消息
*/
private static ConcurrentHashMap<String,Channel> userChannelMap = new ConcurrentHashMap<>();
private UserChannelMap(){}
/**
* 添加用戶id與channel的關(guān)聯(lián)
* @param userNum
* @param channel
*/
public static void put(String userNum, Channel channel) {
userChannelMap.put(userNum, channel);
}
/**
* 根據(jù)用戶id移除用戶id與channel的關(guān)聯(lián)
* @param userNum
*/
public static void remove(String userNum) {
userChannelMap.remove(userNum);
}
/**
* 根據(jù)通道id移除用戶與channel的關(guān)聯(lián)
* @param channelId 通道的id
*/
public static void removeByChannelId(String channelId) {
if(!StringUtils.isNotBlank(channelId)) {
return;
}
for (String s : userChannelMap.keySet()) {
Channel channel = userChannelMap.get(s);
if(channelId.equals(channel.id().asLongText())) {
System.out.println("客戶端連接斷開(kāi),取消用戶" + s + "與通道" + channelId + "的關(guān)聯(lián)");
userChannelMap.remove(s);
UserService userService = SpringUtil.getBean(UserService.class);
userService.logout(s);
break;
}
}
}
/**
* 打印所有的用戶與通道的關(guān)聯(lián)數(shù)據(jù)
*/
public static void print() {
for (String s : userChannelMap.keySet()) {
System.out.println("用戶id:" + s + " 通道:" + userChannelMap.get(s).id());
}
}
/**
* 根據(jù)好友id獲取對(duì)應(yīng)的通道
* @param receiverNum 接收人編號(hào)
* @return Netty通道
*/
public static Channel get(String receiverNum) {
return userChannelMap.get(receiverNum);
}
/**
* 獲取channel組
* @return
*/
public static ChannelGroup getChannelGroup() {
return channelGroup;
}
/**
* 獲取用戶channel map
* @return
*/
public static ConcurrentHashMap<String,Channel> getUserChannelMap(){
return userChannelMap;
}
}
六.自定義消息類型
public class Message {
/**
* 消息類型
*/
private Integer type;
/**
* 聊天消息
*/
private String message;
/**
* 擴(kuò)展消息字段
*/
private Object ext;
public Integer getType() {
return type;
}
public void setType(Integer type) {
this.type = type;
}
public MarketChatRecord getChatRecord() {
return marketChatRecord;
}
public void setChatRecord(MarketChatRecord chatRecord) {
this.marketChatRecord = chatRecord;
}
public Object getExt() {
return ext;
}
public void setExt(Object ext) {
this.ext = ext;
}
@Override
public String toString() {
return "Message{" +
"type=" + type +
", marketChatRecord=" + marketChatRecord +
", ext=" + ext +
'}';
}
}
七.創(chuàng)建處理消息的handler
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);
/**
* 用來(lái)保存所有的客戶端連接
*/
private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
*當(dāng)Channel中有新的事件消息會(huì)自動(dòng)調(diào)用
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
// 當(dāng)接收到數(shù)據(jù)后會(huì)自動(dòng)調(diào)用
// 獲取客戶端發(fā)送過(guò)來(lái)的文本消息
Gson gson = new Gson();
log.info("服務(wù)器收到消息:{}",msg.text());
System.out.println("接收到消息數(shù)據(jù)為:" + msg.text());
Message message = gson.fromJson(msg.text(), Message.class);
//根據(jù)業(yè)務(wù)要求進(jìn)行消息處理
switch (message.getType()) {
// 處理客戶端連接的消息
case 0:
// 建立用戶與通道的關(guān)聯(lián)
// 處理客戶端發(fā)送好友消息
break;
case 1:
// 處理客戶端的簽收消息
break;
case 2:
// 將消息記錄設(shè)置為已讀
break;
case 3:
// 接收心跳消息
break;
default:
break;
}
}
// 當(dāng)有新的客戶端連接服務(wù)器之后,會(huì)自動(dòng)調(diào)用這個(gè)方法
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
log.info("handlerAdded 被調(diào)用"+ctx.channel().id().asLongText());
// 添加到channelGroup 通道組
UserChannelMap.getChannelGroup().add(ctx.channel());
// clients.add(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.info("{異常:}"+cause.getMessage());
// 刪除通道
UserChannelMap.getChannelGroup().remove(ctx.channel());
UserChannelMap.removeByChannelId(ctx.channel().id().asLongText());
ctx.channel().close();
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
log.info("handlerRemoved 被調(diào)用"+ctx.channel().id().asLongText());
//刪除通道
UserChannelMap.getChannelGroup().remove(ctx.channel());
UserChannelMap.removeByChannelId(ctx.channel().id().asLongText());
UserChannelMap.print();
}
}
八.處理心跳
public class HearBeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent)evt;
if(idleStateEvent.state() == IdleState.READER_IDLE) {
System.out.println("讀空閑事件觸發(fā)...");
}
else if(idleStateEvent.state() == IdleState.WRITER_IDLE) {
System.out.println("寫(xiě)空閑事件觸發(fā)...");
}
else if(idleStateEvent.state() == IdleState.ALL_IDLE) {
System.out.println("---------------");
System.out.println("讀寫(xiě)空閑事件觸發(fā)");
System.out.println("關(guān)閉通道資源");
ctx.channel().close();
}
}
}
}
搭建完成后調(diào)用測(cè)試
1.頁(yè)面訪問(wèn)http://localhost:9001/ws
2.端口號(hào)9001和訪問(wèn)路徑ws都是我們?cè)谏线吪渲玫模缓髠魅胛覀冏远x的消息message類型。
3.大概流程:消息發(fā)送 :用戶1先連接通道,然后發(fā)送消息給用戶2,用戶2若是在線直接可以發(fā)送給用戶,若沒(méi)在線可以將消息暫存在redis或者通道里,用戶2鏈接通道的話,兩者可以直接通訊。
消息推送 :用戶1連接通道,根據(jù)通道id查詢要推送的人是否在線,或者推送給所有人,這里我只推送給指定的人。
到此這篇關(guān)于SpringBoot+Netty+WebSocket實(shí)現(xiàn)消息發(fā)送的示例代碼的文章就介紹到這了,更多相關(guān)SpringBoot Netty WebSocket消息發(fā)送內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springboot+hutool批量生成二維碼壓縮導(dǎo)出功能
這篇文章主要介紹了springboot+hutool批量生成二維碼壓縮導(dǎo)出功能,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-10-10
mybatis 返回Integer,Double,String等類型的數(shù)據(jù)操作
這篇文章主要介紹了mybatis 返回Integer,Double,String等類型的數(shù)據(jù)操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-11-11
springboot 事件監(jiān)聽(tīng)的實(shí)現(xiàn)方法
這篇文章主要介紹了springboot 事件監(jiān)聽(tīng)的實(shí)現(xiàn)方法,并詳細(xì)的介紹了四種監(jiān)聽(tīng)方式,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2019-04-04
java如何刪除以逗號(hào)隔開(kāi)的字符串中某一個(gè)值
這篇文章主要介紹了java如何刪除以逗號(hào)隔開(kāi)的字符串中某一個(gè)值,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-06-06
基于SpringCloud手寫(xiě)一個(gè)簡(jiǎn)易版Sentinel
SpringCloud Alibaba Sentinel是當(dāng)前最為流行一種熔斷降級(jí)框架,簡(jiǎn)單易用的方式可以快速幫助我們實(shí)現(xiàn)服務(wù)的限流和降級(jí),保證服務(wù)的穩(wěn)定性。2021-05-05
spring session同域下單點(diǎn)登錄實(shí)現(xiàn)解析
這篇文章主要介紹了spring session同域下單點(diǎn)登錄實(shí)現(xiàn)解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-10-10

