欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ生產(chǎn)者一個應(yīng)用不能發(fā)送多個NameServer消息解決

 更新時間:2022年11月29日 10:52:04   作者:夢想實現(xiàn)家_Z  
這篇文章主要為大家介紹了RocketMQ生產(chǎn)者一個應(yīng)用不能發(fā)送多個NameServer消息原因及解決方法詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

前言

目前有兩套RocketMQ集群,集群A包含topic名稱為cluster_A_topic,集群B包含topic名稱為cluster_B_topic,在應(yīng)用服務(wù)OrderApp上通過RocketMQ Client創(chuàng)建兩個DefaultMQProducer實例發(fā)送消息給集群A和集群B

架構(gòu)圖如下:

根據(jù)上述架構(gòu)圖,我們給出的示例代碼如下:

// 創(chuàng)建第一個DefaultMQProducer
DefaultMQProducer producer1 = new DefaultMQProducer("producer_group_1");
    // 設(shè)置nameServer地址
    producer1.setNamesrvAddr("192.168.2.230:9876");
    try {
      producer1.start();
      // 發(fā)送消息
      SendResult result1 = producer1.send(new Message("cluster_A_topic", "ping".getBytes(StandardCharsets.UTF_8)));
      switch (result1.getSendStatus()) {
        case SEND_OK:
          System.out.println("cluster_A_topic 發(fā)送成功!");
          break;
        case FLUSH_DISK_TIMEOUT:
          System.out.println("cluster_A_topic 持久化失??!");
          break;
        case FLUSH_SLAVE_TIMEOUT:
          System.out.println("cluster_A_topic 同步slave失?。?);
          break;
        case SLAVE_NOT_AVAILABLE:
          System.out.println("cluster_A_topic 副本不可用!");
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
    // 創(chuàng)建第二個DefaultMQProducer
    DefaultMQProducer producer2 = new DefaultMQProducer("producer_group_2");
    // 設(shè)置nameServer地址
    producer2.setNamesrvAddr("192.168.2.231:9876");
    try {
      producer2.start();
      // 發(fā)送消息
      SendResult result2 = producer2.send(new Message("cluster_B_topic", "ping".getBytes(StandardCharsets.UTF_8)));
      switch (result2.getSendStatus()) {
        case SEND_OK:
          System.out.println("cluster_B_topic 發(fā)送成功!");
          break;
        case FLUSH_DISK_TIMEOUT:
          System.out.println("cluster_B_topic 持久化失??!");
          break;
        case FLUSH_SLAVE_TIMEOUT:
          System.out.println("cluster_B_topic 同步slave失?。?);
          break;
        case SLAVE_NOT_AVAILABLE:
          System.out.println("cluster_B_topic 副本不可用!");
      }
      return "ok";
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      producer1.shutdown();
      producer2.shutdown();
    }

結(jié)果竟然報錯了,報錯內(nèi)容時cluster_B_topic不存在:

經(jīng)過不斷的測試,發(fā)現(xiàn)只有放在最前面啟動的DefaultMQProducer會生效,后面啟動的DefaultMQProducer發(fā)送消息就報錯說對應(yīng)的topic不存在,而且報錯的broker竟然是前面啟動的DefaultMQProducer對應(yīng)的broker。這就不科學(xué)了,難道RocketMQ不允許在一個應(yīng)用上創(chuàng)建多個生產(chǎn)者?

問題定位

首先說明一下,當(dāng)前使用的RocketMQ Client版本是4.8.0。為了確定是哪兒出了問題,不得不對源碼來一波探索[哭泣臉??]。

我們都知道生產(chǎn)者是發(fā)送消息給Broker的,獲取Broker信息是通過連接NameServer獲取的。既然報錯的Broker和目標(biāo)Broker竟然不對應(yīng),肯定是后面啟動的生產(chǎn)者獲取的Broker不對。有了最基本的判斷,我們先從DefaultMQProducer#start()入手,最終我們定位到這樣一段代碼DefaultMQProducerImpl#start(final boolean startFactory)

public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                this.checkConfig();
// 如果生產(chǎn)者group名稱不是`CLIENT_INNER_PRODUCER`,那么修改InstanceName值
                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
                }
            // 創(chuàng)建MQClientInstance實例
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
            // 注冊生產(chǎn)者實例到MQClientInstance中
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
            // 添加TBW102對應(yīng)的topic信息,broker設(shè)置autoCreateTopicEnable = true才起作用
            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
                if (startFactory) {
                    // 啟動剛剛創(chuàng)建的MQClientInstance實例
                    mQClientFactory.start();
                }
                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                // 修改服務(wù)狀態(tài)為RUNNING
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }

上面的代碼主要是創(chuàng)建了MQClientInstance實例,并且通過start()方法啟動。

通過針對這兩段代碼的debug,我們發(fā)現(xiàn)創(chuàng)建的兩個DefaultMQProducer對象是共用了一個MQClientInstance實例,并且所有針對NameServerBroker的遠程操作全部是通過MQClientInstance實例來做的。比如發(fā)送消息的時候需要找到對應(yīng)的Broker下的消息隊列:

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            // 從NameServer更新topic路由
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }
        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }

最終我們發(fā)現(xiàn)兩個DefaultMQProducer對象都是去同一個NameServer下獲取對應(yīng)的topic信息,這下問題就定位到了:因為使用了同一個MQClientInstance實例導(dǎo)致不同的DefaultMQProducer去訪問了同一個NameServer,同一個集群需要同時接收兩個topic的消息,也就出現(xiàn)了前面的報錯說topic不存在的情況。

如何解決

我們來看看MQClientInstance實例是如何保證唯一性的:

public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
        // 生成clientID
        String clientId = clientConfig.buildMQClientId();
        // 從緩存中獲取MQClientInstance
        MQClientInstance instance = this.factoryTable.get(clientId);
        if (null == instance) {
            // 沒有緩存的話就創(chuàng)建一個MQClientInstance
            instance =
                new MQClientInstance(clientConfig.cloneClientConfig(),
                    this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
            // 新創(chuàng)建出來的再放進緩存
            MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
            if (prev != null) {
                instance = prev;
                log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
            } else {
                log.info("Created new MQClientInstance for clientId:[{}]", clientId);
            }
        }
        // 返回MQClientInstance實例
        return instance;
    }

我們之所以拿到的MQClientInstance實例是同一個,是因為在同一個服務(wù)下創(chuàng)建的clientId相同:

    public String buildMQClientId() {
        StringBuilder sb = new StringBuilder();
        sb.append(this.getClientIP());
        sb.append("@");
        sb.append(this.getInstanceName());
        if (!UtilAll.isBlank(this.unitName)) {
            sb.append("@");
            sb.append(this.unitName);
        }
        return sb.toString();
    }

兩個clientId都是192.168.18.173@14933,為了防止clientId相同,我們可以在創(chuàng)建DefaultMQProducer實例是加上unitName值,保證兩個unitName值不同來避免共享同一個MQClientInstance。

DefaultMQProducer producer1 = new DefaultMQProducer("producer_group_1");
producer1.setNamesrvAddr("192.168.2.230:9876");
producer1.setUnitName("producer1");
producer1.start();
DefaultMQProducer producer2 = new DefaultMQProducer("producer_group_1");
producer2.setNamesrvAddr("192.168.2.231:9876");
producer2.setUnitName("producer2");
producer2.start();

通過上述代碼修改后,兩個消息都發(fā)送成功了。

另一個辦法就是升級RocketMQ Client4.9.0,我們來看一下RocketMQ Client 4.9.0是怎么解決這個問題的:

    public void changeInstanceNameToPID() {
        if (this.instanceName.equals("DEFAULT")) {
            this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();
        }
    }

RocketMQ Client 4.9.0在后面補充了一個納秒值,之前的代碼是這樣的:

    public void changeInstanceNameToPID() {
        if (this.instanceName.equals("DEFAULT")) {
            this.instanceName = String.valueOf(UtilAll.getPid());
        }
    }

也就是說,在新的版本中,一個應(yīng)用服務(wù)內(nèi)創(chuàng)建多個DefaultMQProducer就會有多個MQClientInstance實例對應(yīng),不會再出現(xiàn)我們前面的報錯。

以上就是RocketMQ生產(chǎn)者一個應(yīng)用不能發(fā)送多個NameServer消息解決的詳細內(nèi)容,更多關(guān)于RocketMQ發(fā)送NameServer的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 淺談JAVA 線程狀態(tài)中可能存在的一些誤區(qū)

    淺談JAVA 線程狀態(tài)中可能存在的一些誤區(qū)

    這篇文章主要介紹了淺談JAVA 線程狀態(tài)中可能存在的一些誤區(qū),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-04-04
  • IDEA maven依賴錯誤中包下面紅色波浪線

    IDEA maven依賴錯誤中包下面紅色波浪線

    這篇文章主要介紹了IDEA maven依賴錯誤中包下面紅色波浪線,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-08-08
  • ToStringBuilder類的一些心得

    ToStringBuilder類的一些心得

    ToStringBuilder類的一些心得,需要的朋友可以參考一下
    2013-02-02
  • java 如何從字符串里面提取時間

    java 如何從字符串里面提取時間

    這篇文章主要介紹了java實現(xiàn)從字符串里面提取時間的方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-09-09
  • java實現(xiàn)獲取安卓設(shè)備里已安裝的軟件包

    java實現(xiàn)獲取安卓設(shè)備里已安裝的軟件包

    本文給大家介紹的是如何獲取設(shè)備中已經(jīng)安裝的應(yīng)用軟件包的代碼,其核心方法原理很簡單,我們通過Android中提供的PackageManager類,來獲取手機中安裝的應(yīng)用程序信息
    2015-10-10
  • spring mvc DispatcherServlet之前端控制器架構(gòu)詳解

    spring mvc DispatcherServlet之前端控制器架構(gòu)詳解

    這篇文章主要為大家詳細介紹了spring mvc DispatcherServlet之前端控制器架構(gòu),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2018-04-04
  • Java程序單實例運行的簡單實現(xiàn)

    Java程序單實例運行的簡單實現(xiàn)

    這篇文章主要介紹了Java程序單實例運行的簡單實現(xiàn)方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-08-08
  • 深入理解JDK8中Stream使用

    深入理解JDK8中Stream使用

    Stream 是 Java8 中處理集合的關(guān)鍵抽象概念,它可以指定你希望對集合進行的操作,可以執(zhí)行非常復(fù)雜的查找、過濾和映射數(shù)據(jù)等操作。這篇文章主要介紹了JDK8中Stream使用解析,需要的朋友可以參考下
    2021-06-06
  • Java Math類的三個方法ceil,floor,round用法

    Java Math類的三個方法ceil,floor,round用法

    這篇文章主要介紹了Java Math類的三個方法ceil,floor,round用法,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-07-07
  • Mybatis order by 動態(tài)傳參出現(xiàn)的問題及解決方法

    Mybatis order by 動態(tài)傳參出現(xiàn)的問題及解決方法

    今天,我正在愉快地CRUD,突然發(fā)現(xiàn)出現(xiàn)一個Bug,我們來看看是怎么回事吧!接下來通過本文給大家介紹Mybatis order by 動態(tài)傳參出現(xiàn)的一個小bug,需要的朋友可以參考下
    2021-07-07

最新評論