JAVA Netty實(shí)現(xiàn)聊天室+私聊功能的示例代碼
功能介紹
使用Netty框架實(shí)現(xiàn)聊天室功能,服務(wù)器可監(jiān)控客戶端上下限狀態(tài),消息轉(zhuǎn)發(fā)。同時(shí)實(shí)現(xiàn)了點(diǎn)對點(diǎn)私聊功能。技術(shù)點(diǎn)我都在代碼中做了備注,這里不再重復(fù)寫了。希望能給想學(xué)習(xí)netty的同學(xué)一點(diǎn)參考。
服務(wù)器代碼
服務(wù)器入口代碼
package nio.test.netty.groupChat;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
/**
* netty群聊 服務(wù)器端
* @author zhang
*
*/
public class NettyChatServer {
private int port;
public NettyChatServer(int port){
this.port = port;
}
//初始化 netty服務(wù)器
private void init() throws Exception{
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup work = new NioEventLoopGroup(16);
try {
ServerBootstrap boot = new ServerBootstrap();
boot.group(boss,work);
boot.channel(NioServerSocketChannel.class);//設(shè)置boss selector建立channel使用的對象
boot.option(ChannelOption.SO_BACKLOG, 128);//boss 等待連接的 隊(duì)列長度
boot.childOption(ChannelOption.SO_KEEPALIVE, true); //讓客戶端保持長期活動(dòng)狀態(tài)
boot.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//從channel中獲取pipeline 并往里邊添加Handler
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("encoder",new StringEncoder());
pipeline.addLast("decoder",new StringDecoder());
pipeline.addLast(new ServerMessageHandler());//自定義Handler來處理消息
}
});
System.out.println("服務(wù)器開始啟動(dòng)...");
//綁定端口
ChannelFuture channelFuture = boot.bind(port).sync();
channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future)
throws Exception {
if(future.isSuccess()){
System.out.println("服務(wù)器正在啟動(dòng)...");
}
if(future.isDone()){
System.out.println("服務(wù)器啟動(dòng)成功...OK");
}
}
});
//監(jiān)聽channel關(guān)閉
channelFuture.channel().closeFuture().sync();
channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future)
throws Exception {
if(future.isCancelled()){
System.out.println("服務(wù)器正在關(guān)閉..");
}
if(future.isCancellable()){
System.out.println("服務(wù)器已經(jīng)關(guān)閉..OK");
}
}
});
}finally{
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
/**
* 啟動(dòng)服務(wù)器 main 函數(shù)
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
new NettyChatServer(9090).init();
}
}
服務(wù)器端消息處理Handler
package nio.test.netty.groupChat;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
/**
* 自定義 服務(wù)器端消息處理Handler
* @author zhang
*
*/
public class ServerMessageHandler extends SimpleChannelInboundHandler<String>{
/**
* 管理全局的channel
* GlobalEventExecutor.INSTANCE 全局事件監(jiān)聽器
* 一旦將channel 加入 ChannelGroup 就不要用手動(dòng)去
* 管理channel的連接失效后移除操作,他會(huì)自己移除
*/
private static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 為了實(shí)現(xiàn)私聊功能,這里key存儲用戶的唯一標(biāo)識,
* 我保存 客戶端的端口號
* 當(dāng)然 這個(gè)集合也需要自己去維護(hù) 用戶的上下線 不能像 ChannelGroup那樣自己去維護(hù)
*/
private static Map<String,Channel> all = new HashMap<String,Channel>();
private SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
/**
* 處理收到的消息
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg)
throws Exception {
Channel channel = ctx.channel();
/**
* 這里簡單判斷 如果內(nèi)容里邊包含#那么就是私聊
*/
if(msg.contains("#")){
String id = msg.split("#")[0];
String body = msg.split("#")[1];
Channel userChannel = all.get(id);
String key = channel.remoteAddress().toString().split(":")[1];
userChannel.writeAndFlush(sf.format(new Date())+"\n 【用戶】 "+key+" 說 : "+body);
return;
}
//判斷當(dāng)前消息是不是自己發(fā)送的
for(Channel c : channels){
String addr = c.remoteAddress().toString();
if(channel !=c){
c.writeAndFlush(sf.format(new Date())+"\n 【用戶】 "+addr+" 說 : "+msg);
}else{
c.writeAndFlush(sf.format(new Date())+"\n 【自己】 "+addr+" 說 : "+msg);
}
}
}
/**
* 建立連接以后第一個(gè)調(diào)用的方法
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
String addr = channel.remoteAddress().toString();
/**
* 這里 ChannelGroup 底層封裝會(huì)遍歷給所有的channel發(fā)送消息
*
*/
channels.writeAndFlush(sf.format(new Date())+"\n 【用戶】 "+addr+" 加入聊天室 ");
channels.add(channel);
String key = channel.remoteAddress().toString().split(":")[1];
all.put(key, channel);
}
/**
* channel連接狀態(tài)就緒以后調(diào)用
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String addr = ctx.channel().remoteAddress().toString();
System.out.println(sf.format(new Date())+" \n【用戶】 "+addr+" 上線 ");
}
/**
* channel連接狀態(tài)斷開后觸發(fā)
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
String addr = ctx.channel().remoteAddress().toString();
System.out.println(sf.format(new Date())+" \n【用戶】 "+addr+" 下線 ");
//下線移除
String key = ctx.channel().remoteAddress().toString().split(":")[1];
all.remove(key);
}
/**
* 連接發(fā)生異常時(shí)觸發(fā)
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
//System.out.println("連接發(fā)生異常!");
ctx.close();
}
/**
* 斷開連接會(huì)觸發(fā)該消息
* 同時(shí)當(dāng)前channel 也會(huì)自動(dòng)從ChannelGroup中被移除
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
String addr = channel.remoteAddress().toString();
/**
* 這里 ChannelGroup 底層封裝會(huì)遍歷給所有的channel發(fā)送消息
*
*/
channels.writeAndFlush(sf.format(new Date())+"\n 【用戶】 "+addr+" 離開了 ");
//打印 ChannelGroup中的人數(shù)
System.out.println("當(dāng)前在線人數(shù)是:"+channels.size());
System.out.println("all:"+all.size());
}
}
客戶端主方法代碼
package nio.test.netty.groupChat;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.Scanner;
public class NettyChatClient {
private String ip;
private int port;
public NettyChatClient(String ip,int port){
this.ip = ip;
this.port = port;
}
/**
* 初始化客戶
*/
private void init() throws Exception{
//創(chuàng)建監(jiān)聽事件的監(jiān)聽器
EventLoopGroup work = new NioEventLoopGroup();
try {
Bootstrap boot = new Bootstrap();
boot.group(work);
boot.channel(NioSocketChannel.class);
boot.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch)
throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("encoder",new StringEncoder());
pipeline.addLast("decoder",new StringDecoder());
pipeline.addLast(new ClientMessageHandler());
}
});
ChannelFuture channelFuture = boot.connect(ip, port).sync();
channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future)
throws Exception {
if(future.isSuccess()){
System.out.println("客戶端啟動(dòng)中...");
}
if(future.isDone()){
System.out.println("客戶端啟動(dòng)成功...OK!");
}
}
});
System.out.println(channelFuture.channel().localAddress().toString());
System.out.println("#################################################");
System.out.println("~~~~~~~~~~~~~~端口號#消息內(nèi)容~~這樣可以給單獨(dú)一個(gè)用戶發(fā)消息~~~~~~~~~~~~~~~~~~");
System.out.println("#################################################");
/**
* 這里用控制臺輸入數(shù)據(jù)
*/
Channel channel = channelFuture.channel();
//獲取channel
Scanner scanner = new Scanner(System.in);
while(scanner.hasNextLine()){
String str = scanner.nextLine();
channel.writeAndFlush(str+"\n");
}
channelFuture.channel().closeFuture().sync();
scanner.close();
} finally {
work.shutdownGracefully();
}
}
/**
* 主方法入口
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception{
new NettyChatClient("127.0.0.1",9090).init();
}
}
客戶端消息處理Handler
package nio.test.netty.groupChat;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* 客戶點(diǎn)消息處理 Handler
* @author zhang
*
*/
public class ClientMessageHandler extends SimpleChannelInboundHandler<String> {
/**
* 處理收到的消息
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg)
throws Exception {
System.out.println(msg);
}
/**
* 連接異常后觸發(fā)
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
}
}
測試結(jié)果
啟動(dòng)了四個(gè)客戶端 服務(wù)器端日志效果如下:

客戶端一端日志:

客戶端二日志:

客戶端三日志:

客戶端四日志:

現(xiàn)在在客戶端四發(fā)送消息:

每個(gè)客戶端都可以收到消息:



軟化關(guān)閉客戶端客戶端三:
服務(wù)器日志:

其他客戶端日志:



發(fā)送私聊消息:

這個(gè)客戶端收不到消息


到此這篇關(guān)于JAVA Netty實(shí)現(xiàn)聊天室+私聊功能的示例代碼的文章就介紹到這了,更多相關(guān)JAVA Netty聊天室內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Hibernate中5個(gè)核心接口知識點(diǎn)整理
在本篇文章里小編給大家整理的是一篇關(guān)于Hibernate中5個(gè)核心接口知識點(diǎn)整理等內(nèi)容,有興趣的朋友們跟著學(xué)習(xí)參考下。2021-08-08
Hibernate中實(shí)現(xiàn)增刪改查的步驟詳解
本篇文章主要介紹了Hibernate中實(shí)現(xiàn)增刪改查的步驟與方法,具有很好的參考價(jià)值,下面跟著小編一起來看下吧2017-02-02
Java實(shí)現(xiàn)多線程中的靜態(tài)代理模式
靜態(tài)代理屬于設(shè)計(jì)模式中的代理模式。這篇文章主要介紹了Java實(shí)現(xiàn)多線程中的靜態(tài)代理模式,詳細(xì)的介紹了靜態(tài)代理的使用,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-05-05
Spring?Data?JPA?注解Entity關(guān)聯(lián)關(guān)系使用詳解
這篇文章主要為大家介紹了Spring?Data?JPA?注解Entity關(guān)聯(lián)關(guān)系使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-09-09
Java發(fā)送郵件javax.mail的實(shí)現(xiàn)方法
這篇文章主要為大家介紹了Java發(fā)送郵件javax.mail的實(shí)現(xiàn)方法,具有一定的參考價(jià)值,代碼都有詳細(xì)的注釋,感興趣的小伙伴們可以參考一下2016-01-01
Presto自定義函數(shù)@SqlNullable引發(fā)問題詳解
這篇文章主要為大家介紹了Presto自定義函數(shù)@SqlNullable引發(fā)問題詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-12-12

