Netty的心跳檢測解析
一、網(wǎng)絡(luò)連接假死現(xiàn)象
客戶端的心跳檢測對于任何長連接的應(yīng)用來說,都是一個非?;A(chǔ)的功能。
要理解心跳的重要性,首先需要從網(wǎng)絡(luò)連接假死的現(xiàn)象說起。
什么是連接假死呢?如果底層的TCP連接已經(jīng)斷開,但是服務(wù)器端并沒有正常地關(guān)閉套接字,認(rèn)為這條連接仍然是存在的。
連接假死的具體表現(xiàn)如下:
- 在服務(wù)器端,會有一些處于TCP_ESTABLISHED狀態(tài)的正常連接
- 在客戶端,TCP客戶端已經(jīng)顯示連接已經(jīng)斷開
- 客戶端此時雖然可以進行斷線重連操作,但是上一次連接狀態(tài)依然被服務(wù)器端認(rèn)為有效,并且服務(wù)器端的資源得不到正確釋放,包括套接字上下文以及接受/發(fā)送緩沖區(qū)
連接假死的情況雖然不常見,但是確實存在。服務(wù)器端長時間運行后,會面臨大量假死連接得不到釋放的情況。由于每個連接都會消耗CPU和內(nèi)存資源,因此大量假死的連接會逐漸耗光服務(wù)器的資源,使得服務(wù)器越來越慢,IO處理效率越來越低,最終導(dǎo)致服務(wù)器崩潰。
連接假死通常是由多個原因造成的:
- 應(yīng)用程序出現(xiàn)線程堵塞,無法進行連接的讀寫
- 網(wǎng)絡(luò)相關(guān)的設(shè)別出現(xiàn)故障
- 網(wǎng)絡(luò)丟包
解決假死的有效手段是客戶端定時進行心跳檢測,服務(wù)端定時進行空閑檢測。
二、服務(wù)器端的空閑檢測
想解決假死問題,服務(wù)器端的有效手段是空閑檢測。所謂空閑檢測就是每隔一段時間監(jiān)測子通道是否有數(shù)據(jù)讀寫,如果有則子通道是正常的,如果沒有則判定為假死,關(guān)閉子通道。
服務(wù)器端實現(xiàn)空閑檢測只需要使用Netty自帶的IdleStateHandler空閑狀態(tài)處理器就可以實現(xiàn)這個功能。
@Slf4j public class HeartBeatServerHandler extends IdleStateHandler { private static final int READ_IDLE_GAP = 150; // 最大空閑時間(s) public HeartBeatServerHandler() { super(READ_IDLE_GAP, 0, 0, TimeUnit.SECONDS); } @Override protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception { log.info("{}秒內(nèi)未讀到數(shù)據(jù),關(guān)閉連接", READ_IDLE_GAP); // 其他處理,如關(guān)閉會話 } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 判斷消息實例 if (!(msg instanceof MessageProtos.Message message)) { super.channelRead(ctx, msg); return; } if (message.getType() == MessageProtos.HeadType.HEART_BEAT) { if (ctx.channel().isActive()) { // 將心跳數(shù)據(jù)包直接回給客戶端 ctx.writeAndFlush(msg); } } super.channelRead(ctx, msg); } }
在HeartBeatServerHandler的構(gòu)造函數(shù)中,調(diào)用了基類IdleStateHandler的構(gòu)造函數(shù),傳遞了四個參數(shù):
- 入站空閑檢測時長:指的是一段時間內(nèi)如果沒有消息入站就判定為連接假死
- 出站空閑檢測時長:指的是一段時間內(nèi)如果沒有數(shù)據(jù)出站就判定為連接假死
- 出/入站檢測時長:表示在一段時間內(nèi)如果沒有出站或者入站就判定為連接假死
- 時間單位
判定為假死之后IdleStateHandler會回調(diào)自己的channelIdle()方法,一般在這個方法中去進行一些連接的關(guān)閉。
HeartBeatServerHandler實現(xiàn)的主要功能是空閑檢測,需要客戶端定時發(fā)送心跳數(shù)據(jù)包(或報文、消息)進行配合,而且客戶端發(fā)送心跳數(shù)據(jù)包的時間間隔需要遠遠小于服務(wù)器端的空閑檢測時間間隔。
收到客戶端的心跳數(shù)據(jù)包之后可以直接回復(fù)客戶端,讓客戶端也能進行類似的空閑檢測。由于IdleStateHandler本身是一個入站處理器,只需要重寫這個子類的channelRead方法,然后將心跳數(shù)據(jù)包直接寫回給客戶端即可。
如果HeartBeatServerHandler要重寫channelRead方法,一定要調(diào)用積累的channelRead方法,不然IdleStateHandler的入站空閑檢測會無效。
三、客戶端的心跳報文
與服務(wù)器端的空閑檢測相配合,客戶端需要定期發(fā)送數(shù)據(jù)包到服務(wù)器端,通常這個數(shù)據(jù)包稱為心跳數(shù)據(jù)包。
@Slf4j public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter { // 心跳的時間間隔,單位為秒 private static final int HEART_BEAT_INTERVAL = 50; // 在Handler業(yè)務(wù)處理器被加入到流水線時開始發(fā)送心跳數(shù)據(jù)包 @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { ClientSession session = ctx.channel().attr(ClientSession.CLIENT_SESSION).get(); MessageProtos.MessageHeartBeat heartBeat = MessageProtos.MessageHeartBeat.newBuilder() .setSeq(0) .setJson("{\"from\":\"client\"}") .setUid(session.getUserDTO().getUserId()) .build(); MessageProtos.Message message = MessageProtos.Message.newBuilder() .setType(MessageProtos.HeadType.HEART_BEAT) .setSessionId(session.getSessionId()) .setMessageHeartBeat(heartBeat) .build(); heartBeat(ctx, message); super.handlerAdded(ctx); } private void heartBeat(ChannelHandlerContext ctx, MessageProtos.Message message) { // 提交在給定延遲后啟用的一次性任務(wù)。 ctx.executor().schedule(() -> { if (ctx.channel().isActive()) { log.info("發(fā)送心跳消息給服務(wù)端"); ctx.writeAndFlush(message); // 遞歸調(diào)用,發(fā)送下一次的心跳 heartBeat(ctx, message); } }, HEART_BEAT_INTERVAL, TimeUnit.SECONDS); } // 接收到服務(wù)器的心跳回寫 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (!(msg instanceof MessageProtos.Message message)) { super.channelRead(ctx, msg); return; } if (message.getType() == MessageProtos.HeadType.HEART_BEAT) { log.info("收到會寫的心跳信息"); } else { super.channelRead(ctx, msg); } } }
在HeartBeatClientHandler實例被加入到流水線時,它重寫的handlerAdded方法被回調(diào)。在handlerAdded方法中開始調(diào)用heartBeat方法發(fā)送心跳數(shù)據(jù)包。heartBeat是一個不斷遞歸調(diào)用的方法,它使用了ctx.executor()獲取當(dāng)前通道綁定的Reactor反應(yīng)器NIO線,然后通過NIO現(xiàn)線程的schedule定時調(diào)度方法,在50s后觸發(fā)這個方法的執(zhí)行,再之后遞歸調(diào)用同樣延時50s后繼續(xù)發(fā)送。
客戶端的心跳間隔要比服務(wù)器端的空閑檢測時間間隔要短,一般來說要比它的一半要短一些,可以直接定義為空閑檢測時間間隔的1/3,以防止公網(wǎng)偶發(fā)的秒級抖動。
HeartBeatClientHandler實例并不是一開始就裝配到流水線中的,它裝配的實際實在登錄成功之后。
HeartBeatClientHandler實際上也可以集成IdleStateHandler類在客戶端進行空閑檢測,這樣客戶端也可以對服務(wù)器進行假死判定,在服務(wù)器假死的情況下,客戶端可以發(fā)起重連。
相關(guān)文章

如果淘寶的七天自動確認(rèn)收貨讓你設(shè)計你用Java怎么實現(xiàn)

SpringCloud turbine監(jiān)控實現(xiàn)過程解析