Java Netty實(shí)現(xiàn)心跳機(jī)制過程解析
netty心跳機(jī)制示例,使用Netty實(shí)現(xiàn)心跳機(jī)制,使用netty4,IdleStateHandler 實(shí)現(xiàn)。Netty心跳機(jī)制,netty心跳檢測(cè),netty,心跳
本文假設(shè)你已經(jīng)了解了Netty的使用,或者至少寫過netty的helloworld,知道了netty的基本使用。我們知道使用netty的時(shí)候,大多數(shù)的東西都與Handler有關(guān),我們的業(yè)務(wù)邏輯基本都是在Handler中實(shí)現(xiàn)的。Netty中自帶了一個(gè)IdleStateHandler 可以用來(lái)實(shí)現(xiàn)心跳檢測(cè)。
心跳檢測(cè)的邏輯
本文中我們將要實(shí)現(xiàn)的心跳檢測(cè)邏輯是這樣的:服務(wù)端啟動(dòng)后,等待客戶端連接,客戶端連接之后,向服務(wù)端發(fā)送消息。如果客戶端在“干活”那么服務(wù)端必定會(huì)收到數(shù)據(jù),如果客戶端“閑下來(lái)了”那么服務(wù)端就接收不到這個(gè)客戶端的消息,既然客戶端閑下來(lái)了,不干事,那么何必浪費(fèi)連接資源呢?所以服務(wù)端檢測(cè)到一定時(shí)間內(nèi)客戶端不活躍的時(shí)候,將客戶端連接關(guān)閉。本文要實(shí)現(xiàn)的邏輯步驟為:
- 啟動(dòng)服務(wù)端,啟動(dòng)客戶端
- 客戶端向服務(wù)端發(fā)送"I am alive",并sleep隨機(jī)時(shí)間,用來(lái)模擬空閑。
- 服務(wù)端接收客戶端消息,并返回"copy that",客戶端空閑時(shí) 計(jì)數(shù)+1.
- 服務(wù)端客戶端繼續(xù)通信
- 服務(wù)端檢測(cè)客戶端空閑太多,關(guān)閉連接??蛻舳税l(fā)現(xiàn)連接關(guān)閉了,就退出了。
有了這個(gè)思路,我們先來(lái)編寫服務(wù)端。
心跳檢測(cè)服務(wù)端代碼
public class HeartBeatServer {
int port ;
public HeartBeatServer(int port){
this.port = port;
}
public void start(){
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try{
bootstrap.group(boss,worker)
.handler(new LoggingHandler(LogLevel.INFO))
.channel(NioServerSocketChannel.class)
.childHandler(new HeartBeatInitializer());
ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();
}catch(Exception e){
e.printStackTrace();
}finally {
worker.shutdownGracefully();
boss.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
HeartBeatServer server = new HeartBeatServer(8090);
server.start();
}
}
熟悉netty的同志,對(duì)于上面的模板一樣的代碼一定是在熟悉不過了。啥都不用看,只需要看childHandler(new HeartBeatInitializer()) 這一句。HeartBeatInitializer就是一個(gè)ChannelInitializer顧名思義,他就是在初始化channel的時(shí)做一些事情。我們所需要開發(fā)的業(yè)務(wù)邏輯Handler就是在這里添加的。其代碼如下:
public class HeartBeatInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast(new IdleStateHandler(2,2,2, TimeUnit.SECONDS));
pipeline.addLast(new HeartBeatHandler());
}
}
代碼很簡(jiǎn)單,我們先添加了StringDecoder,和StringEncoder。這兩個(gè)其實(shí)就是編解碼用的,下面的IdleStateHandler才是本次心跳的核心組件。我們可以看到IdleStateHandler的構(gòu)造函數(shù)中接收了4個(gè)參數(shù),其定義如下:
public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit);
三個(gè)空閑時(shí)間參數(shù),以及時(shí)間參數(shù)的格式。我們的例子中設(shè)置的是2,2,2,意思就是客戶端2秒沒有讀/寫,這個(gè)超時(shí)時(shí)間就會(huì)被觸發(fā)。超時(shí)事件觸發(fā)就需要我們來(lái)處理了,這就是上的HeartBeatInitializer中最后一行的HeartBeatHandler所做的事情。代碼如下:
public class HeartBeatHandler extends SimpleChannelInboundHandler<String> {
int readIdleTimes = 0;
@Override
protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
System.out.println(" ====== > [server] message received : " + s);
if("I am alive".equals(s)){
ctx.channel().writeAndFlush("copy that");
}else {
System.out.println(" 其他信息處理 ... ");
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent)evt;
String eventType = null;
switch (event.state()){
case READER_IDLE:
eventType = "讀空閑";
readIdleTimes ++; // 讀空閑的計(jì)數(shù)加1
break;
case WRITER_IDLE:
eventType = "寫空閑";
// 不處理
break;
case ALL_IDLE:
eventType ="讀寫空閑";
// 不處理
break;
}
System.out.println(ctx.channel().remoteAddress() + "超時(shí)事件:" +eventType);
if(readIdleTimes > 3){
System.out.println(" [server]讀空閑超過3次,關(guān)閉連接");
ctx.channel().writeAndFlush("you are out");
ctx.channel().close();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.err.println("=== " + ctx.channel().remoteAddress() + " is active ===");
}
}
至此,我們的服務(wù)端寫好了。
心跳檢測(cè)客戶端代碼
netty的api設(shè)計(jì)使得編碼的模式非常具有通用性,所以客戶端代碼和服務(wù)端的代碼幾乎一樣:?jiǎn)?dòng)client端的代碼幾乎一樣,也需要一個(gè)ChannelInitializer,也需要Handler。改動(dòng)的地方很少,因此本文不對(duì)客戶端代碼進(jìn)行詳細(xì)解釋。下面給出client端的完整代碼:
public class HeartBeatClient {
int port;
Channel channel;
Random random ;
public HeartBeatClient(int port){
this.port = port;
random = new Random();
}
public static void main(String[] args) throws Exception{
HeartBeatClient client = new HeartBeatClient(8090);
client.start();
}
public void start() {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new HeartBeatClientInitializer());
connect(bootstrap,port);
String text = "I am alive";
while (channel.isActive()){
sendMsg(text);
}
}catch(Exception e){
// do something
}finally {
eventLoopGroup.shutdownGracefully();
}
}
public void connect(Bootstrap bootstrap,int port) throws Exception{
channel = bootstrap.connect("localhost",8090).sync().channel();
}
public void sendMsg(String text) throws Exception{
int num = random.nextInt(10);
Thread.sleep(num * 1000);
channel.writeAndFlush(text);
}
static class HeartBeatClientInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast(new HeartBeatClientHandler());
}
}
static class HeartBeatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(" client received :" +msg);
if(msg!= null && msg.equals("you are out")) {
System.out.println(" server closed connection , so client will close too");
ctx.channel().closeFuture();
}
}
}
}
運(yùn)行代碼
在上面的代碼寫好之后,我們先啟動(dòng)服務(wù)端,然后在啟動(dòng)客戶端。運(yùn)行日志如下:
server端:
=== /127.0.0.1:57700 is active === ====== > [server] message received : I am alive ====== > [server] message received : I am alive /127.0.0.1:57700超時(shí)事件:寫空閑 /127.0.0.1:57700超時(shí)事件:讀空閑 /127.0.0.1:57700超時(shí)事件:讀寫空閑 /127.0.0.1:57700超時(shí)事件:寫空閑 /127.0.0.1:57700超時(shí)事件:讀空閑 /127.0.0.1:57700超時(shí)事件:讀寫空閑 /127.0.0.1:57700超時(shí)事件:寫空閑 ====== > [server] message received : I am alive /127.0.0.1:57700超時(shí)事件:寫空閑 /127.0.0.1:57700超時(shí)事件:讀寫空閑 /127.0.0.1:57700超時(shí)事件:讀空閑 /127.0.0.1:57700超時(shí)事件:寫空閑 /127.0.0.1:57700超時(shí)事件:讀寫空閑 /127.0.0.1:57700超時(shí)事件:讀空閑 [server]讀空閑超過3次,關(guān)閉連接
client端:
client sent msg and sleep 2 client received :copy that client received :copy that client sent msg and sleep 6 client sent msg and sleep 6 client received :copy that client received :you are out server closed connection , so client will close too Process finished with exit code 0
通過上面的運(yùn)行日志,我們可以看到:
1.客戶端在與服務(wù)器成功建立之后,發(fā)送了3次'I am alive',服務(wù)端也回應(yīng)了3次:'copy that'
2.由于客戶端消極怠工,超時(shí)了多次,服務(wù)端關(guān)閉了鏈接。
3.客戶端知道服務(wù)端拋棄自己之后,也關(guān)閉了連接,程序退出。
以上簡(jiǎn)單了演示了一下,netty的心跳機(jī)制,其實(shí)主要就是使用了IdleStateHandler。源碼下載
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
java多線程編程之使用thread類創(chuàng)建線程
在Java中創(chuàng)建線程有兩種方法:使用Thread類和使用Runnable接口。在使用Runnable接口時(shí)需要建立一個(gè)Thread實(shí)例2014-01-01
一文看懂springboot實(shí)現(xiàn)短信服務(wù)功能
項(xiàng)目中的短信服務(wù)基本上上都會(huì)用到,簡(jiǎn)單的注冊(cè)驗(yàn)證碼,消息通知等等都會(huì)用到。這篇文章主要介紹了springboot 實(shí)現(xiàn)短信服務(wù)功能,需要的朋友可以參考下2019-10-10
Spring?Boot實(shí)現(xiàn)MyBatis動(dòng)態(tài)創(chuàng)建表的操作語(yǔ)句
這篇文章主要介紹了Spring?Boot實(shí)現(xiàn)MyBatis動(dòng)態(tài)創(chuàng)建表,MyBatis提供了動(dòng)態(tài)SQL,我們可以通過動(dòng)態(tài)SQL,傳入表名等信息然組裝成建表和操作語(yǔ)句,本文通過案例講解展示我們的設(shè)計(jì)思路,需要的朋友可以參考下2024-01-01
idea快捷鍵生成getter和setter,有構(gòu)造參數(shù),無(wú)構(gòu)造參數(shù),重寫toString方式
這篇文章主要介紹了java之idea快捷鍵生成getter和setter,有構(gòu)造參數(shù),無(wú)構(gòu)造參數(shù),重寫toString方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-11-11

