java WebSocket客戶端斷線重連的實(shí)現(xiàn)方法
前言
在工作中是否會(huì)遇到實(shí)用websocket客戶端連接服務(wù)端的時(shí)候,網(wǎng)絡(luò)波動(dòng),服務(wù)端斷連的情況。會(huì)導(dǎo)致客戶端被動(dòng)斷開連接。為了解決這個(gè)問題,需要對(duì)被動(dòng)斷開連接的情況進(jìn)行捕獲,并重新創(chuàng)建連接。這篇文章主要是提供可以直接使用的斷線重連websocket客戶端代碼。
Maven依賴
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.5.2</version> </dependency> <dependency> <groupId>org.java-websocket</groupId> <artifactId>Java-WebSocket</artifactId> <version>1.5.1</version> </dependency>
代碼
不廢話,上代碼。
package ai.guiji.csdn.ws.client; import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.util.StrUtil; import lombok.extern.slf4j.Slf4j; import org.java_websocket.WebSocket; import org.java_websocket.client.WebSocketClient; import org.java_websocket.framing.Framedata; import org.java_websocket.handshake.ServerHandshake; import javax.net.ssl.*; import java.net.Socket; import java.net.URI; import java.nio.ByteBuffer; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; /** @Author huyi @Date 2021/10/15 20:03 @Description: 重連websocket客戶端 */ @Slf4j public class ReConnectWebSocketClient { /** 字符串消息回調(diào) */ private Consumer<String> msgStr; /** 字節(jié)流消息回調(diào) */ private Consumer<ByteBuffer> msgByte; /** 異?;卣{(diào) */ private Consumer<Exception> error; /** 連接標(biāo)識(shí) */ private String key; /** ws服務(wù)端連接 */ private URI serverUri; /** 嘗試重連標(biāo)識(shí) */ private AtomicBoolean tryReconnect; /** 需要ping標(biāo)識(shí) */ private AtomicBoolean needPing; /** websocket連接實(shí)體 */ private WebSocketClient webSocketClient; /** 重連次數(shù) */ private AtomicInteger reConnectTimes; /** 連接結(jié)束標(biāo)識(shí) */ private AtomicBoolean end; /** 連接后初始發(fā)送報(bào)文,這里也可以不需要,如果服務(wù)端主動(dòng)斷開連接,重連后可以繼續(xù)推送報(bào)文的話。 */ private String initReConnectReq; /** 結(jié)束回調(diào) */ private Consumer<String> endConsumer; public ReConnectWebSocketClient( URI serverUri, String key, Consumer<String> msgStr, Consumer<ByteBuffer> msgByte, Consumer<Exception> error) { this.msgStr = msgStr; this.msgByte = msgByte; this.error = error; this.key = key; this.serverUri = serverUri; this.tryReconnect = new AtomicBoolean(false); this.needPing = new AtomicBoolean(true); this.reConnectTimes = new AtomicInteger(0); this.end = new AtomicBoolean(false); this.endConsumer = this::close; init(); } /** 初始化連接 */ public void init() { // 創(chuàng)建連接 createWebSocketClient(); // ping線程 circlePing(); } private void needReconnect() throws Exception { ThreadUtil.sleep(10, TimeUnit.SECONDS); int cul = reConnectTimes.incrementAndGet(); if (cul > 3) { close("real stop"); throw new Exception("服務(wù)端斷連,3次重連均失敗"); } log.warn("[{}]第[{}]次斷開重連", key, cul); if (tryReconnect.get()) { log.error("[{}]第[{}]次斷開重連結(jié)果 -> 連接正在重連,本次重連請(qǐng)求放棄", key, cul); needReconnect(); return; } try { tryReconnect.set(true); if (webSocketClient.isOpen()) { log.warn("[{}]第[{}]次斷開重連,關(guān)閉舊連接", key, cul); webSocketClient.closeConnection(2, "reconnect stop"); } webSocketClient = null; createWebSocketClient(); connect(); if (StrUtil.hasBlank(initReConnectReq)) { send(initReConnectReq); } } catch (Exception exception) { log.error("[{}]第[{}]次斷開重連結(jié)果 -> 連接正在重連,重連異常:[{}]", key, cul, exception.getMessage()); needReconnect(); } finally { tryReconnect.set(false); } } private void createWebSocketClient() { webSocketClient = new WebSocketClient(serverUri) { @Override public void onOpen(ServerHandshake serverHandshake) { log.info("[{}]ReConnectWebSocketClient [onOpen]連接成功{}", key, getRemoteSocketAddress()); tryReconnect.set(false); } @Override public void onMessage(String text) { log.info("[{}]ReConnectWebSocketClient [onMessage]接收到服務(wù)端數(shù)據(jù):text={}", key, text); msgStr.accept(text); } @Override public void onMessage(ByteBuffer bytes) { log.info("[{}]ReConnectWebSocketClient [onMessage]接收到服務(wù)端數(shù)據(jù):bytes={}", key, bytes); msgByte.accept(bytes); } @Override public void onWebsocketPong(WebSocket conn, Framedata f) { log.info( "[{}]ReConnectWebSocketClient [onWebsocketPong]接收到服務(wù)端數(shù)據(jù):opcode={}", key, f.getOpcode()); } @Override public void onClose(int i, String s, boolean b) { log.info("[{}]ReConnectWebSocketClient [onClose]關(guān)閉,s={},b={}", key, s, b); if (StrUtil.hasBlank(s) || s.contains("https")) { if (end.get()) { return; } try { needReconnect(); } catch (Exception exception) { endConsumer.accept("reconnect error"); error.accept(exception); } } } @Override public void onError(Exception e) { log.info("[{}]ReConnectWebSocketClient [onError]異常,e={}", key, e); endConsumer.accept("error close"); error.accept(e); } }; if (serverUri.toString().contains("wss://")) { trustAllHosts(webSocketClient); } } public void circlePing() { new Thread( () -> { while (needPing.get()) { if (webSocketClient.isOpen()) { webSocketClient.sendPing(); } ThreadUtil.sleep(5, TimeUnit.SECONDS); } log.warn("[{}]Ping循環(huán)關(guān)閉", key); }) .start(); } /** * 連接 * * @throws Exception 異常 */ public void connect() throws Exception { webSocketClient.connectBlocking(10, TimeUnit.SECONDS); } /** * 發(fā)送 * * @param msg 消息 * @throws Exception 異常 */ public void send(String msg) throws Exception { this.initReConnectReq = msg; if (webSocketClient.isOpen()) { webSocketClient.send(msg); } } /** * 關(guān)閉 * * @param msg 關(guān)閉消息 */ public void close(String msg) { needPing.set(false); end.set(true); if (webSocketClient != null) { webSocketClient.closeConnection(3, msg); } } /** * 忽略證書 * * @param client */ public void trustAllHosts(WebSocketClient client) { TrustManager[] trustAllCerts = new TrustManager[] { new X509ExtendedTrustManager() { @Override public void checkClientTrusted( X509Certificate[] x509Certificates, String s, Socket socket) throws CertificateException {} @Override public void checkServerTrusted( X509Certificate[] x509Certificates, String s, Socket socket) throws CertificateException {} @Override public void checkClientTrusted( X509Certificate[] x509Certificates, String s, SSLEngine sslEngine) throws CertificateException {} @Override public void checkServerTrusted( X509Certificate[] x509Certificates, String s, SSLEngine sslEngine) throws CertificateException {} @Override public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {} @Override public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {} @Override public X509Certificate[] getAcceptedIssuers() { return null; } } }; try { SSLContext ssl = SSLContext.getInstance("SSL"); ssl.init(null, trustAllCerts, new java.security.SecureRandom()); SSLSocketFactory socketFactory = ssl.getSocketFactory(); client.setSocketFactory(socketFactory); } catch (Exception e) { log.error("ReConnectWebSocketClient trustAllHosts 異常,e={0}", e); } } }
代碼說明:
1、參數(shù)的重連次數(shù)可以配置。
2、增加異步pingpong線程,一旦結(jié)束連接會(huì)自動(dòng)關(guān)閉。
3、對(duì)字符串、字節(jié)流、異常都有回調(diào)措施。
測(cè)試代碼方法
public static void main(String[] args) throws Exception { ReConnectWebSocketClient client = new ReConnectWebSocketClient( new URI(String.format("wss://192.168.1.77:24009")), "test", // 字符串消息處理 msg -> { // todo 字符串消息處理 System.out.println("字符串消息:" + msg); }, null, // 異?;卣{(diào) error -> { // todo 字符串消息處理 System.out.println("異常:" + error.getMessage()); }); client.connect(); client.send("haha"); }
驗(yàn)證結(jié)果
16:08:54.468 [WebSocketConnectReadThread-12] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onOpen]連接成功/192.168.1.77:24009
16:08:54.475 [WebSocketConnectReadThread-12] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onMessage]接收到服務(wù)端數(shù)據(jù):text=connect success from tcp4:192.168.6.63:11018!
字符串消息:connect success from tcp4:192.168.6.63:11018!
16:08:56.080 [WebSocketConnectReadThread-12] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onClose]關(guān)閉,s=,b=true
16:09:06.097 [WebSocketConnectReadThread-12] WARN ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]第[1]次斷開重連
16:09:06.150 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onOpen]連接成功/192.168.1.77:24009
16:09:06.150 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onMessage]接收到服務(wù)端數(shù)據(jù):text=connect success from tcp4:192.168.6.63:11038!
字符串消息:connect success from tcp4:192.168.6.63:11038!
16:09:09.369 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務(wù)端數(shù)據(jù):opcode=PONG
16:09:14.370 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務(wù)端數(shù)據(jù):opcode=PONG
16:09:19.371 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務(wù)端數(shù)據(jù):opcode=PONG
16:09:24.379 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務(wù)端數(shù)據(jù):opcode=PONG
16:09:29.382 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務(wù)端數(shù)據(jù):opcode=PONG
16:09:34.398 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務(wù)端數(shù)據(jù):opcode=PONG
16:09:39.402 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務(wù)端數(shù)據(jù):opcode=PONG
16:09:44.404 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務(wù)端數(shù)據(jù):opcode=PONG
16:09:49.415 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務(wù)端數(shù)據(jù):opcode=PONG
16:09:54.429 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務(wù)端數(shù)據(jù):opcode=PONG
16:09:59.437 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務(wù)端數(shù)據(jù):opcode=PONG
16:10:04.449 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務(wù)端數(shù)據(jù):opcode=PONG
16:10:06.154 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務(wù)端數(shù)據(jù):opcode=PONG
16:10:09.455 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務(wù)端數(shù)據(jù):opcode=PONG
16:10:14.462 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務(wù)端數(shù)據(jù):opcode=PONG
16:10:19.468 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務(wù)端數(shù)據(jù):opcode=PONG
16:10:19.644 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onClose]關(guān)閉,s=,b=true
16:10:29.654 [WebSocketConnectReadThread-16] WARN ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]第[2]次斷開重連
16:10:31.710 [WebSocketConnectReadThread-19] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onError]異常,e={}
java.net.ConnectException: Connection refused: connect
at java.net.DualStackPlainSocketImpl.connect0(Native Method)
at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:79)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:673)
at org.java_websocket.client.WebSocketClient.run(WebSocketClient.java:461)
at java.lang.Thread.run(Thread.java:748)
16:10:31.710 [WebSocketConnectReadThread-19] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onClose]關(guān)閉,s=error close,b=false
異常:Connection refused: connect
16:10:34.473 [Thread-0] WARN ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]Ping循環(huán)關(guān)閉
這里我才用的是手動(dòng)關(guān)閉服務(wù)端方式觸發(fā),客戶端被動(dòng)斷連情況。重連兩次,第二次服務(wù)端還未啟動(dòng)導(dǎo)致異常觸發(fā)。
到此這篇關(guān)于java WebSocket客戶端斷線重連的實(shí)現(xiàn)方法的文章就介紹到這了,更多相關(guān)java WebSocket客戶端斷線重連內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
gRPC實(shí)踐之proto及Maven插件概念及使用詳解
這篇文章主要為大家介紹了gRPC實(shí)踐之proto及Maven插件概念及使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-04-04一篇文章教你如何用多種迭代寫法實(shí)現(xiàn)二叉樹遍歷
這篇文章主要介紹了C語(yǔ)言實(shí)現(xiàn)二叉樹遍歷的迭代算法,包括二叉樹的中序遍歷、先序遍歷及后序遍歷等,是非常經(jīng)典的算法,需要的朋友可以參考下2021-08-08Spring Cloud出現(xiàn)Options Forbidden 403問題解決方法
本篇文章主要介紹了Spring Cloud出現(xiàn)Options Forbidden 403問題解決方法,具有一定的參考價(jià)值,有興趣的可以了解一下2017-11-11Mybatis流式查詢并實(shí)現(xiàn)將結(jié)果分批寫入文件
這篇文章主要介紹了Mybatis流式查詢并實(shí)現(xiàn)將結(jié)果分批寫入文件方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-08-08