欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

java WebSocket客戶端斷線重連的實(shí)現(xiàn)方法

 更新時(shí)間:2021年10月27日 17:05:14   作者:劍客阿良_ALiang  
在工作中是否會(huì)遇到實(shí)用websocket客戶端連接服務(wù)端的時(shí)候,網(wǎng)絡(luò)波動(dòng),服務(wù)端斷連的情況,本文可以直接使用的斷線重連,感興趣的可以了解一下

前言

在工作中是否會(huì)遇到實(shí)用websocket客戶端連接服務(wù)端的時(shí)候,網(wǎng)絡(luò)波動(dòng),服務(wù)端斷連的情況。會(huì)導(dǎo)致客戶端被動(dòng)斷開連接。為了解決這個(gè)問題,需要對(duì)被動(dòng)斷開連接的情況進(jìn)行捕獲,并重新創(chuàng)建連接。這篇文章主要是提供可以直接使用的斷線重連websocket客戶端代碼。

Maven依賴

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.5.2</version>
        </dependency>
        <dependency>
            <groupId>org.java-websocket</groupId>
            <artifactId>Java-WebSocket</artifactId>
            <version>1.5.1</version>
        </dependency>

代碼

不廢話,上代碼。

package ai.guiji.csdn.ws.client;
 
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.WebSocket;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.framing.Framedata;
import org.java_websocket.handshake.ServerHandshake;
 
import javax.net.ssl.*;
import java.net.Socket;
import java.net.URI;
import java.nio.ByteBuffer;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
 
/** @Author huyi @Date 2021/10/15 20:03 @Description: 重連websocket客戶端 */
@Slf4j
public class ReConnectWebSocketClient {
  /** 字符串消息回調(diào) */
  private Consumer<String> msgStr;
  /** 字節(jié)流消息回調(diào) */
  private Consumer<ByteBuffer> msgByte;
  /** 異?;卣{(diào) */
  private Consumer<Exception> error;
  /** 連接標(biāo)識(shí) */
  private String key;
  /** ws服務(wù)端連接 */
  private URI serverUri;
  /** 嘗試重連標(biāo)識(shí) */
  private AtomicBoolean tryReconnect;
  /** 需要ping標(biāo)識(shí) */
  private AtomicBoolean needPing;
  /** websocket連接實(shí)體 */
  private WebSocketClient webSocketClient;
  /** 重連次數(shù) */
  private AtomicInteger reConnectTimes;
  /** 連接結(jié)束標(biāo)識(shí) */
  private AtomicBoolean end;
  /** 連接后初始發(fā)送報(bào)文,這里也可以不需要,如果服務(wù)端主動(dòng)斷開連接,重連后可以繼續(xù)推送報(bào)文的話。 */
  private String initReConnectReq;
  /** 結(jié)束回調(diào) */
  private Consumer<String> endConsumer;
 
  public ReConnectWebSocketClient(
      URI serverUri,
      String key,
      Consumer<String> msgStr,
      Consumer<ByteBuffer> msgByte,
      Consumer<Exception> error) {
    this.msgStr = msgStr;
    this.msgByte = msgByte;
    this.error = error;
    this.key = key;
    this.serverUri = serverUri;
    this.tryReconnect = new AtomicBoolean(false);
    this.needPing = new AtomicBoolean(true);
    this.reConnectTimes = new AtomicInteger(0);
    this.end = new AtomicBoolean(false);
    this.endConsumer = this::close;
    init();
  }
 
  /** 初始化連接 */
  public void init() {
    // 創(chuàng)建連接
    createWebSocketClient();
    // ping線程
    circlePing();
  }
 
  private void needReconnect() throws Exception {
    ThreadUtil.sleep(10, TimeUnit.SECONDS);
    int cul = reConnectTimes.incrementAndGet();
    if (cul > 3) {
      close("real stop");
      throw new Exception("服務(wù)端斷連,3次重連均失敗");
    }
    log.warn("[{}]第[{}]次斷開重連", key, cul);
    if (tryReconnect.get()) {
      log.error("[{}]第[{}]次斷開重連結(jié)果 -> 連接正在重連,本次重連請(qǐng)求放棄", key, cul);
      needReconnect();
      return;
    }
    try {
      tryReconnect.set(true);
 
      if (webSocketClient.isOpen()) {
        log.warn("[{}]第[{}]次斷開重連,關(guān)閉舊連接", key, cul);
        webSocketClient.closeConnection(2, "reconnect stop");
      }
      webSocketClient = null;
      createWebSocketClient();
      connect();
      if (StrUtil.hasBlank(initReConnectReq)) {
        send(initReConnectReq);
      }
    } catch (Exception exception) {
      log.error("[{}]第[{}]次斷開重連結(jié)果 -> 連接正在重連,重連異常:[{}]", key, cul, exception.getMessage());
      needReconnect();
    } finally {
      tryReconnect.set(false);
    }
  }
 
  private void createWebSocketClient() {
    webSocketClient =
        new WebSocketClient(serverUri) {
          @Override
          public void onOpen(ServerHandshake serverHandshake) {
            log.info("[{}]ReConnectWebSocketClient [onOpen]連接成功{}", key, getRemoteSocketAddress());
            tryReconnect.set(false);
          }
 
          @Override
          public void onMessage(String text) {
            log.info("[{}]ReConnectWebSocketClient [onMessage]接收到服務(wù)端數(shù)據(jù):text={}", key, text);
            msgStr.accept(text);
          }
 
          @Override
          public void onMessage(ByteBuffer bytes) {
            log.info("[{}]ReConnectWebSocketClient [onMessage]接收到服務(wù)端數(shù)據(jù):bytes={}", key, bytes);
            msgByte.accept(bytes);
          }
 
          @Override
          public void onWebsocketPong(WebSocket conn, Framedata f) {
            log.info(
                "[{}]ReConnectWebSocketClient [onWebsocketPong]接收到服務(wù)端數(shù)據(jù):opcode={}",
                key,
                f.getOpcode());
          }
 
          @Override
          public void onClose(int i, String s, boolean b) {
            log.info("[{}]ReConnectWebSocketClient [onClose]關(guān)閉,s={},b={}", key, s, b);
            if (StrUtil.hasBlank(s) || s.contains("https")) {
              if (end.get()) {
                return;
              }
              try {
                needReconnect();
              } catch (Exception exception) {
                endConsumer.accept("reconnect error");
                error.accept(exception);
              }
            }
          }
 
          @Override
          public void onError(Exception e) {
            log.info("[{}]ReConnectWebSocketClient [onError]異常,e={}", key, e);
            endConsumer.accept("error close");
            error.accept(e);
          }
        };
    if (serverUri.toString().contains("wss://")) {
      trustAllHosts(webSocketClient);
    }
  }
 
  public void circlePing() {
    new Thread(
            () -> {
              while (needPing.get()) {
                if (webSocketClient.isOpen()) {
                  webSocketClient.sendPing();
                }
                ThreadUtil.sleep(5, TimeUnit.SECONDS);
              }
              log.warn("[{}]Ping循環(huán)關(guān)閉", key);
            })
        .start();
  }
 
  /**
   * 連接
   *
   * @throws Exception 異常
   */
  public void connect() throws Exception {
    webSocketClient.connectBlocking(10, TimeUnit.SECONDS);
  }
 
  /**
   * 發(fā)送
   *
   * @param msg 消息
   * @throws Exception 異常
   */
  public void send(String msg) throws Exception {
    this.initReConnectReq = msg;
    if (webSocketClient.isOpen()) {
      webSocketClient.send(msg);
    }
  }
 
  /**
   * 關(guān)閉
   *
   * @param msg 關(guān)閉消息
   */
  public void close(String msg) {
    needPing.set(false);
    end.set(true);
    if (webSocketClient != null) {
      webSocketClient.closeConnection(3, msg);
    }
  }
 
  /**
   * 忽略證書
   *
   * @param client
   */
  public void trustAllHosts(WebSocketClient client) {
    TrustManager[] trustAllCerts =
        new TrustManager[] {
          new X509ExtendedTrustManager() {
 
            @Override
            public void checkClientTrusted(
                X509Certificate[] x509Certificates, String s, Socket socket)
                throws CertificateException {}
 
            @Override
            public void checkServerTrusted(
                X509Certificate[] x509Certificates, String s, Socket socket)
                throws CertificateException {}
 
            @Override
            public void checkClientTrusted(
                X509Certificate[] x509Certificates, String s, SSLEngine sslEngine)
                throws CertificateException {}
 
            @Override
            public void checkServerTrusted(
                X509Certificate[] x509Certificates, String s, SSLEngine sslEngine)
                throws CertificateException {}
 
            @Override
            public void checkClientTrusted(X509Certificate[] x509Certificates, String s)
                throws CertificateException {}
 
            @Override
            public void checkServerTrusted(X509Certificate[] x509Certificates, String s)
                throws CertificateException {}
 
            @Override
            public X509Certificate[] getAcceptedIssuers() {
              return null;
            }
          }
        };
 
    try {
      SSLContext ssl = SSLContext.getInstance("SSL");
      ssl.init(null, trustAllCerts, new java.security.SecureRandom());
      SSLSocketFactory socketFactory = ssl.getSocketFactory();
      client.setSocketFactory(socketFactory);
    } catch (Exception e) {
      log.error("ReConnectWebSocketClient trustAllHosts 異常,e={0}", e);
    }
  }
}

代碼說明:

1、參數(shù)的重連次數(shù)可以配置。

2、增加異步pingpong線程,一旦結(jié)束連接會(huì)自動(dòng)關(guān)閉。

3、對(duì)字符串、字節(jié)流、異常都有回調(diào)措施。

測(cè)試代碼方法

  public static void main(String[] args) throws Exception {
    ReConnectWebSocketClient client =
        new ReConnectWebSocketClient(
            new URI(String.format("wss://192.168.1.77:24009")),
            "test",
            // 字符串消息處理
            msg -> {
              // todo 字符串消息處理
              System.out.println("字符串消息:" + msg);
            },
            null,
            // 異?;卣{(diào)
            error -> {
              // todo 字符串消息處理
              System.out.println("異常:" + error.getMessage());
            });
    client.connect();
    client.send("haha");
  }

驗(yàn)證結(jié)果

16:08:54.468 [WebSocketConnectReadThread-12] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onOpen]連接成功/192.168.1.77:24009
16:08:54.475 [WebSocketConnectReadThread-12] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onMessage]接收到服務(wù)端數(shù)據(jù):text=connect success from tcp4:192.168.6.63:11018!
字符串消息:connect success from tcp4:192.168.6.63:11018!
16:08:56.080 [WebSocketConnectReadThread-12] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onClose]關(guān)閉,s=,b=true
16:09:06.097 [WebSocketConnectReadThread-12] WARN ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]第[1]次斷開重連
16:09:06.150 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onOpen]連接成功/192.168.1.77:24009
16:09:06.150 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onMessage]接收到服務(wù)端數(shù)據(jù):text=connect success from tcp4:192.168.6.63:11038!
字符串消息:connect success from tcp4:192.168.6.63:11038!
16:09:09.369 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務(wù)端數(shù)據(jù):opcode=PONG
16:09:14.370 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務(wù)端數(shù)據(jù):opcode=PONG
16:09:19.371 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務(wù)端數(shù)據(jù):opcode=PONG
16:09:24.379 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務(wù)端數(shù)據(jù):opcode=PONG
16:09:29.382 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務(wù)端數(shù)據(jù):opcode=PONG
16:09:34.398 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務(wù)端數(shù)據(jù):opcode=PONG
16:09:39.402 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務(wù)端數(shù)據(jù):opcode=PONG
16:09:44.404 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務(wù)端數(shù)據(jù):opcode=PONG
16:09:49.415 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務(wù)端數(shù)據(jù):opcode=PONG
16:09:54.429 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務(wù)端數(shù)據(jù):opcode=PONG
16:09:59.437 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務(wù)端數(shù)據(jù):opcode=PONG
16:10:04.449 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務(wù)端數(shù)據(jù):opcode=PONG
16:10:06.154 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務(wù)端數(shù)據(jù):opcode=PONG
16:10:09.455 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務(wù)端數(shù)據(jù):opcode=PONG
16:10:14.462 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務(wù)端數(shù)據(jù):opcode=PONG
16:10:19.468 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服務(wù)端數(shù)據(jù):opcode=PONG
16:10:19.644 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onClose]關(guān)閉,s=,b=true
16:10:29.654 [WebSocketConnectReadThread-16] WARN ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]第[2]次斷開重連
16:10:31.710 [WebSocketConnectReadThread-19] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onError]異常,e={}
java.net.ConnectException: Connection refused: connect
 at java.net.DualStackPlainSocketImpl.connect0(Native Method)
 at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:79)
 at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
 at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
 at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
 at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
 at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
 at java.net.Socket.connect(Socket.java:589)
 at sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:673)
 at org.java_websocket.client.WebSocketClient.run(WebSocketClient.java:461)
 at java.lang.Thread.run(Thread.java:748)
16:10:31.710 [WebSocketConnectReadThread-19] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onClose]關(guān)閉,s=error close,b=false
異常:Connection refused: connect
16:10:34.473 [Thread-0] WARN ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]Ping循環(huán)關(guān)閉

這里我才用的是手動(dòng)關(guān)閉服務(wù)端方式觸發(fā),客戶端被動(dòng)斷連情況。重連兩次,第二次服務(wù)端還未啟動(dòng)導(dǎo)致異常觸發(fā)。

到此這篇關(guān)于java WebSocket客戶端斷線重連的實(shí)現(xiàn)方法的文章就介紹到這了,更多相關(guān)java WebSocket客戶端斷線重連內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Spring執(zhí)行sql腳本文件的方法

    Spring執(zhí)行sql腳本文件的方法

    這篇文章主要介紹了Spring執(zhí)行sql腳本文件的方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2019-03-03
  • JPA如何使用findBy方法自定義查詢

    JPA如何使用findBy方法自定義查詢

    這篇文章主要介紹了JPA如何使用findBy方法自定義查詢,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-11-11
  • 使用Java方法配置Spring代碼解析

    使用Java方法配置Spring代碼解析

    這篇文章主要介紹了使用Java方法配置Spring代碼解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-09-09
  • gRPC實(shí)踐之proto及Maven插件概念及使用詳解

    gRPC實(shí)踐之proto及Maven插件概念及使用詳解

    這篇文章主要為大家介紹了gRPC實(shí)踐之proto及Maven插件概念及使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-04-04
  • Java泛型初學(xué)者之上、下界通配符的深入理解

    Java泛型初學(xué)者之上、下界通配符的深入理解

    這篇文章主要給大家介紹了關(guān)于Java泛型初學(xué)者之上、下界通配符的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者實(shí)用Java具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-12-12
  • 一篇文章教你如何用多種迭代寫法實(shí)現(xiàn)二叉樹遍歷

    一篇文章教你如何用多種迭代寫法實(shí)現(xiàn)二叉樹遍歷

    這篇文章主要介紹了C語(yǔ)言實(shí)現(xiàn)二叉樹遍歷的迭代算法,包括二叉樹的中序遍歷、先序遍歷及后序遍歷等,是非常經(jīng)典的算法,需要的朋友可以參考下
    2021-08-08
  • Java異常分類以及幾種處理機(jī)制分析講解

    Java異常分類以及幾種處理機(jī)制分析講解

    在Java的廣闊宇宙中,有一群特殊的“超級(jí)英雄”,它們?cè)诖a世界中穿梭,守護(hù)著程序的正常運(yùn)行——它們就是“異常”,這些英雄們,各司其職,保護(hù)著程序免受錯(cuò)誤的侵?jǐn)_,今天,我們將深入這個(gè)神秘的世界,全面解析異常的分類,掌握異常的處理機(jī)制
    2024-07-07
  • Spring Cloud出現(xiàn)Options Forbidden 403問題解決方法

    Spring Cloud出現(xiàn)Options Forbidden 403問題解決方法

    本篇文章主要介紹了Spring Cloud出現(xiàn)Options Forbidden 403問題解決方法,具有一定的參考價(jià)值,有興趣的可以了解一下
    2017-11-11
  • 通過Java代碼技巧改善性能

    通過Java代碼技巧改善性能

    在本篇文章里小編給大家分享了關(guān)于通過Java代碼技巧改善性能的相關(guān)知識(shí)點(diǎn),需要的朋友們參考下。
    2019-05-05
  • Mybatis流式查詢并實(shí)現(xiàn)將結(jié)果分批寫入文件

    Mybatis流式查詢并實(shí)現(xiàn)將結(jié)果分批寫入文件

    這篇文章主要介紹了Mybatis流式查詢并實(shí)現(xiàn)將結(jié)果分批寫入文件方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-08-08

最新評(píng)論