使用Netty快速實(shí)現(xiàn)一個(gè)群聊功能的示例詳解
前言
通過之前的文章介紹,我們可以深刻認(rèn)識(shí)到Netty在網(wǎng)絡(luò)編程領(lǐng)域的卓越表現(xiàn)和強(qiáng)大實(shí)力。這篇文章將介紹如何利用 Netty 框架開發(fā)一個(gè) WebSocket 服務(wù)端,從而實(shí)現(xiàn)一個(gè)簡單的在線聊天功能。
聲明
文章中所提供的代碼僅供參考,旨在幫助無 Netty 經(jīng)驗(yàn)的開發(fā)人員快速上手。請注意,這些代碼并不適用于實(shí)際應(yīng)用中。
功能說明
聊天頁面:
- 用戶進(jìn)入頁面后,會(huì)看到一個(gè)簡單的文本框,可以用來發(fā)送消息。
- 頁面下方會(huì)顯示聊天的消息內(nèi)容。
服務(wù)端主要有以下三個(gè)功能:
- 響應(yīng)聊天頁面:用來接收和響應(yīng)聊天頁面的請求。
- 處理消息:對接收到的消息進(jìn)行處理。
- 實(shí)現(xiàn)群聊功能:提供群聊的功能,使多個(gè)用戶能夠在同一個(gè)聊天室中進(jìn)行交流。
功能很簡單,但是可以通過這個(gè)示例實(shí)現(xiàn)更多復(fù)雜的場景。
實(shí)現(xiàn)步驟
創(chuàng)建一個(gè)簡單的 Maven 項(xiàng)目,直接引入 netty-all 包即可編碼。
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.28.Final</version>
</dependency>
實(shí)現(xiàn)該功能共有五個(gè)類,如下:
├── MakeIndexPage.java
├── ProcessWsIndexPageHandler.java
├── ProcesssWsFrameHandler.java
├── WebSocketServer.java
└── WebSocketServerInitializer.java
下面對實(shí)現(xiàn)該功能所涉及的五個(gè)類的代碼進(jìn)行詳細(xì)說明
WebSocket 服務(wù)啟動(dòng)
這個(gè)類是一個(gè)基于 Netty 啟動(dòng)的常規(guī)服務(wù)端。它包含了一些配置項(xiàng),包括 Reactor 模式、IO 類型以及消息處理配置,大部分都是這樣。代碼如下:
/**
* 類說明:
*/
public final class WebSocketServer {
/*創(chuàng)建 DefaultChannelGroup,用來保存所
有已經(jīng)連接的 WebSocket Channel,群發(fā)和一對一功能可以用上*/
private final static ChannelGroup channelGroup =
new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
static final boolean SSL = false;//是否啟用ssl
/*通過ssl訪問端口為8443,否則為8080*/
static final int PORT
= Integer.parseInt(
System.getProperty("port", SSL? "8443" : "80"));
public static void main(String[] args) throws Exception {
/*SSL配置*/
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(),
ssc.privateKey()).build();
} else {
sslCtx = null;
}
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new WebSocketServerInitializer(sslCtx,channelGroup));
Channel ch = b.bind(PORT).sync().channel();
System.out.println("打開瀏覽器訪問: " +
(SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/');
ch.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
Channel 初始化
這個(gè)類的主要功能是創(chuàng)建了一個(gè) ChannelInitializer,用于初始化 ChannelPipeline,并添加了一些通道處理器。這些處理器包括由Netty提供的處理SSL協(xié)議、處理HTTP協(xié)議和支持WebSocket協(xié)議的功能,還有一些由業(yè)務(wù)自定義的處理器,用于處理頁面展示和處理WebSocket數(shù)據(jù)。代碼如下:
/**
* 類說明:增加handler
*/
public class WebSocketServerInitializer
extends ChannelInitializer<SocketChannel> {
private final ChannelGroup group;
/*websocket訪問路徑*/
private static final String WEBSOCKET_PATH = "/chat";
private final SslContext sslCtx;
public WebSocketServerInitializer(SslContext sslCtx,ChannelGroup group) {
this.sslCtx = sslCtx;
this.group = group;
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
/*增加對http的支持*/
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
/*Netty提供,支持WebSocket應(yīng)答數(shù)據(jù)壓縮傳輸*/
pipeline.addLast(new WebSocketServerCompressionHandler());
/*Netty提供,對整個(gè)websocket的通信進(jìn)行了初始化(發(fā)現(xiàn)http報(bào)文中有升級為websocket的請求)
,包括握手,以及以后的一些通信控制*/
pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH,
null, true));
/*瀏覽器訪問時(shí)展示index頁面*/
pipeline.addLast(new ProcessWsIndexPageHandler(WEBSOCKET_PATH));
/*對websocket的數(shù)據(jù)進(jìn)行處理*/
pipeline.addLast(new ProcesssWsFrameHandler(group));
}
}
HTTP 請求處理
這個(gè)類的主要功能是在收到 HTTP 請求時(shí),當(dāng) URI 為“/”或“/index.html”時(shí),會(huì)返回一個(gè)聊天界面作為響應(yīng)。代碼如下:
/**
* 類說明:對http請求,將index的頁面返回給前端
*/
public class ProcessWsIndexPageHandler
extends SimpleChannelInboundHandler<FullHttpRequest> {
private final String websocketPath;
public ProcessWsIndexPageHandler(String websocketPath) {
this.websocketPath = websocketPath;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx,
FullHttpRequest req) throws Exception {
// 處理錯(cuò)誤或者無法解析的http請求
if (!req.decoderResult().isSuccess()) {
sendHttpResponse(ctx, req,
new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
return;
}
//只允許Get請求
if (req.method() != GET) {
sendHttpResponse(ctx, req,
new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN));
return;
}
// 發(fā)送index頁面的內(nèi)容
if ("/".equals(req.uri()) || "/index.html".equals(req.uri())) {
//生成WebSocket的訪問地址,寫入index頁面中
String webSocketLocation
= getWebSocketLocation(ctx.pipeline(), req,
websocketPath);
System.out.println("WebSocketLocation:["+webSocketLocation+"]");
//生成index頁面的具體內(nèi)容,并送往瀏覽器
ByteBuf content
= MakeIndexPage.getContent(
webSocketLocation);
FullHttpResponse res = new DefaultFullHttpResponse(
HTTP_1_1, OK, content);
res.headers().set(HttpHeaderNames.CONTENT_TYPE,
"text/html; charset=UTF-8");
HttpUtil.setContentLength(res, content.readableBytes());
sendHttpResponse(ctx, req, res);
} else {
sendHttpResponse(ctx, req,
new DefaultFullHttpResponse(HTTP_1_1, NOT_FOUND));
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
/*發(fā)送應(yīng)答*/
private static void sendHttpResponse(ChannelHandlerContext ctx,
FullHttpRequest req,
FullHttpResponse res) {
// 錯(cuò)誤的請求進(jìn)行處理 (code<>200).
if (res.status().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(),
CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
HttpUtil.setContentLength(res, res.content().readableBytes());
}
// 發(fā)送應(yīng)答.
ChannelFuture f = ctx.channel().writeAndFlush(res);
//對于不是長連接或者錯(cuò)誤的請求直接關(guān)閉連接
if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
/*根據(jù)用戶的訪問,告訴用戶的瀏覽器,WebSocket的訪問地址*/
private static String getWebSocketLocation(ChannelPipeline cp,
HttpRequest req,
String path) {
String protocol = "ws";
if (cp.get(SslHandler.class) != null) {
protocol = "wss";
}
return protocol + "://" + req.headers().get(HttpHeaderNames.HOST)
+ path;
}
}
HTTP 頁面內(nèi)容
這個(gè)類的主要目的是生成一個(gè)包含消息發(fā)送框和內(nèi)容展示功能的HTML頁面,并實(shí)現(xiàn)WebSocket的相關(guān)功能,包括建立連接、向服務(wù)端發(fā)送消息以及接收服務(wù)端的響應(yīng)。當(dāng)然,也可以單獨(dú)寫一個(gè)HTML文件。代碼如下:
/**
* 類說明:生成index頁面的內(nèi)容
*/
public final class MakeIndexPage {
private static final String NEWLINE = "\r\n";
public static ByteBuf getContent(String webSocketLocation) {
return Unpooled.copiedBuffer(
"<html><head><title>Web Socket Test</title><meta charset=\"utf-8\" /></head>"
+ NEWLINE +
"<body>" + NEWLINE +
"<script type=\"text/javascript\">" + NEWLINE +
"var socket;" + NEWLINE +
"if (!window.WebSocket) {" + NEWLINE +
" window.WebSocket = window.MozWebSocket;" + NEWLINE +
'}' + NEWLINE +
"if (window.WebSocket) {" + NEWLINE +
" socket = new WebSocket(\"" + webSocketLocation + "\");"
+ NEWLINE +
" socket.onmessage = function(event) {" + NEWLINE +
" var ta = document.getElementById('responseText');"
+ NEWLINE +
" ta.value = ta.value + '\\n' + event.data" + NEWLINE +
" };" + NEWLINE +
" socket.onopen = function(event) {" + NEWLINE +
" var ta = document.getElementById('responseText');"
+ NEWLINE +
" ta.value = \"Web Socket opened!\";" + NEWLINE +
" };" + NEWLINE +
" socket.onclose = function(event) {" + NEWLINE +
" var ta = document.getElementById('responseText');"
+ NEWLINE +
" ta.value = ta.value + \"Web Socket closed\"; "
+ NEWLINE +
" };" + NEWLINE +
"} else {" + NEWLINE +
" alert(\"Your browser does not support Web Socket.\");"
+ NEWLINE +
'}' + NEWLINE +
NEWLINE +
"function send(message) {" + NEWLINE +
" if (!window.WebSocket) { return; }" + NEWLINE +
" if (socket.readyState == WebSocket.OPEN) {" + NEWLINE +
" socket.send(message);" + NEWLINE +
" } else {" + NEWLINE +
" alert(\"The socket is not open.\");" + NEWLINE +
" }" + NEWLINE +
'}' + NEWLINE +
"</script>" + NEWLINE +
"<form onsubmit=\"return false;\">" + NEWLINE +
"<input type=\"text\" name=\"message\" " +
"value=\"Hi, 你好啊\"/>" +
"<input type=\"button\" value=\"發(fā)送\""
+ NEWLINE +
" onclick=\"send(this.form.message.value)\" />"
+ NEWLINE +
"<h3>消息內(nèi)容</h3>" + NEWLINE +
"<textarea id=\"responseText\" " +
"style=\"width:500px;height:300px;\"></textarea>"
+ NEWLINE +
"</form>" + NEWLINE +
"</body>" + NEWLINE +
"</html>" + NEWLINE, CharsetUtil.UTF_8);
}
}
WebSocket 請求處理
這個(gè)類的主要功能是處理與 Channel 相關(guān)的事件。例如,當(dāng)一個(gè) Channel 連接成功時(shí),會(huì)將該 Channel 添加到一個(gè) ChannelGroup 中。當(dāng)接收到該 Channel 的數(shù)據(jù)時(shí),可以通過向 ChannelGroup 寫入數(shù)據(jù)來實(shí)現(xiàn)群聊效果。代碼如下
/**
* 類說明:對websocket的數(shù)據(jù)進(jìn)行處理
*/
public class ProcesssWsFrameHandler
extends SimpleChannelInboundHandler<WebSocketFrame> {
private final ChannelGroup group;
public ProcesssWsFrameHandler(ChannelGroup group) {
this.group = group;
}
private static final Logger logger
= LoggerFactory.getLogger(ProcesssWsFrameHandler.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx,
WebSocketFrame frame) throws Exception {
//判斷是否為文本幀,目前只處理文本幀
if (frame instanceof TextWebSocketFrame) {
// Send the uppercase string back.
String request = ((TextWebSocketFrame) frame).text();
logger.info("{} received {}", ctx.channel(), request);
// ctx.channel().writeAndFlush(
// new TextWebSocketFrame(request.toUpperCase(Locale.CHINA)));
/*群發(fā)實(shí)現(xiàn):一對一道理一樣*/
group.writeAndFlush(new TextWebSocketFrame(
ctx.channel().remoteAddress() + " :" + request.toUpperCase(Locale.CHINA)));
} else {
String message = "unsupported frame type: "
+ frame.getClass().getName();
throw new UnsupportedOperationException(message);
}
}
/*重寫 userEventTriggered()方法以處理自定義事件*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx,
Object evt) throws Exception {
/*檢測事件,如果是握手成功事件,做點(diǎn)業(yè)務(wù)處理*/
if (evt == WebSocketServerProtocolHandler
.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
//通知所有已經(jīng)連接的 WebSocket 客戶端新的客戶端已經(jīng)連接上了
group.writeAndFlush(new TextWebSocketFrame(
"Client " + ctx.channel().remoteAddress() + " joined"));
//將新的 WebSocket Channel 添加到 ChannelGroup 中,
// 以便它可以接收到所有的消息
group.add(ctx.channel());
} else {
super.userEventTriggered(ctx, evt);
}
}
}
效果展示
服務(wù)端啟動(dòng)

聊天頁面1

聊天頁面2

總結(jié)
總的來說,基于 Netty 實(shí)現(xiàn)一個(gè) WebSocket 功能是非常方便且高效的,但是我們需要知其所以然,要理解 Websocket 協(xié)議,也要懂的在 Netty 中,通過添加 ChannelHandler 來處理各種異常情況,例如握手失敗、連接關(guān)閉等,當(dāng)然,還要考慮安全性問題,例如處理跨站腳本攻擊(XSS)、防止惡意數(shù)據(jù)傳輸?shù)取?/p>
以上就是使用Netty快速實(shí)現(xiàn)一個(gè)群聊功能的示例詳解的詳細(xì)內(nèi)容,更多關(guān)于Netty實(shí)現(xiàn)群聊的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
詳解Java的call by value和call by reference
在本篇文章里小編給大家總結(jié)了關(guān)于Java的call by value和call by reference的相關(guān)用法和知識(shí)點(diǎn)內(nèi)容,需要的朋友們學(xué)習(xí)下。2019-03-03
@valid 無法觸發(fā)BindingResult的解決
這篇文章主要介紹了@valid 無法觸發(fā)BindingResult的解決方案,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12
Springboot 如何使用 SaToken 進(jìn)行登錄認(rèn)證、權(quán)限管理及路由規(guī)則接口攔截
Sa-Token 是一個(gè)輕量級 Java 權(quán)限認(rèn)證框架,主要解決:登錄認(rèn)證、權(quán)限認(rèn)證、單點(diǎn)登錄、OAuth2.0、分布式Session會(huì)話、微服務(wù)網(wǎng)關(guān)鑒權(quán) 等一系列權(quán)限相關(guān)問題,這篇文章主要介紹了Springboot 使用 SaToken 進(jìn)行登錄認(rèn)證、權(quán)限管理以及路由規(guī)則接口攔截,需要的朋友可以參考下2024-06-06
詳解Java的MyBatis框架和Spring框架的整合運(yùn)用
在Web端的SSH框架整合中Spring主要負(fù)責(zé)數(shù)據(jù)庫處理,而引入MyBatis后二者的集成使用效果更佳,下面我們就來詳解Java的MyBatis框架和Spring框架的整合運(yùn)用2016-06-06
Java適配器模式_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
這篇文章主要介紹了Java適配器模式,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-07-07

