RocketMQ生產(chǎn)者一個應(yīng)用不能發(fā)送多個NameServer消息解決
前言
目前有兩套RocketMQ集群,集群A包含topic
名稱為cluster_A_topic
,集群B包含topic
名稱為cluster_B_topic
,在應(yīng)用服務(wù)OrderApp
上通過RocketMQ Client
創(chuàng)建兩個DefaultMQProducer
實例發(fā)送消息給集群A和集群B
架構(gòu)圖如下:
根據(jù)上述架構(gòu)圖,我們給出的示例代碼如下:
// 創(chuàng)建第一個DefaultMQProducer DefaultMQProducer producer1 = new DefaultMQProducer("producer_group_1"); // 設(shè)置nameServer地址 producer1.setNamesrvAddr("192.168.2.230:9876"); try { producer1.start(); // 發(fā)送消息 SendResult result1 = producer1.send(new Message("cluster_A_topic", "ping".getBytes(StandardCharsets.UTF_8))); switch (result1.getSendStatus()) { case SEND_OK: System.out.println("cluster_A_topic 發(fā)送成功!"); break; case FLUSH_DISK_TIMEOUT: System.out.println("cluster_A_topic 持久化失??!"); break; case FLUSH_SLAVE_TIMEOUT: System.out.println("cluster_A_topic 同步slave失?。?); break; case SLAVE_NOT_AVAILABLE: System.out.println("cluster_A_topic 副本不可用!"); } } catch (Exception e) { e.printStackTrace(); } // 創(chuàng)建第二個DefaultMQProducer DefaultMQProducer producer2 = new DefaultMQProducer("producer_group_2"); // 設(shè)置nameServer地址 producer2.setNamesrvAddr("192.168.2.231:9876"); try { producer2.start(); // 發(fā)送消息 SendResult result2 = producer2.send(new Message("cluster_B_topic", "ping".getBytes(StandardCharsets.UTF_8))); switch (result2.getSendStatus()) { case SEND_OK: System.out.println("cluster_B_topic 發(fā)送成功!"); break; case FLUSH_DISK_TIMEOUT: System.out.println("cluster_B_topic 持久化失??!"); break; case FLUSH_SLAVE_TIMEOUT: System.out.println("cluster_B_topic 同步slave失?。?); break; case SLAVE_NOT_AVAILABLE: System.out.println("cluster_B_topic 副本不可用!"); } return "ok"; } catch (Exception e) { e.printStackTrace(); } finally { producer1.shutdown(); producer2.shutdown(); }
結(jié)果竟然報錯了,報錯內(nèi)容時cluster_B_topic
不存在:
經(jīng)過不斷的測試,發(fā)現(xiàn)只有放在最前面啟動的DefaultMQProducer
會生效,后面啟動的DefaultMQProducer
發(fā)送消息就報錯說對應(yīng)的topic
不存在,而且報錯的broker
竟然是前面啟動的DefaultMQProducer
對應(yīng)的broker
。這就不科學(xué)了,難道RocketMQ不允許在一個應(yīng)用上創(chuàng)建多個生產(chǎn)者?
問題定位
首先說明一下,當(dāng)前使用的RocketMQ Client
版本是4.8.0
。為了確定是哪兒出了問題,不得不對源碼來一波探索[哭泣臉??]。
我們都知道生產(chǎn)者是發(fā)送消息給Broker
的,獲取Broker
信息是通過連接NameServer
獲取的。既然報錯的Broker
和目標(biāo)Broker
竟然不對應(yīng),肯定是后面啟動的生產(chǎn)者獲取的Broker
不對。有了最基本的判斷,我們先從DefaultMQProducer#start()
入手,最終我們定位到這樣一段代碼DefaultMQProducerImpl#start(final boolean startFactory)
:
public void start(final boolean startFactory) throws MQClientException { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; this.checkConfig(); // 如果生產(chǎn)者group名稱不是`CLIENT_INNER_PRODUCER`,那么修改InstanceName值 if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { this.defaultMQProducer.changeInstanceNameToPID(); } // 創(chuàng)建MQClientInstance實例 this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook); // 注冊生產(chǎn)者實例到MQClientInstance中 boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } // 添加TBW102對應(yīng)的topic信息,broker設(shè)置autoCreateTopicEnable = true才起作用 this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo()); if (startFactory) { // 啟動剛剛創(chuàng)建的MQClientInstance實例 mQClientFactory.start(); } log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), this.defaultMQProducer.isSendMessageWithVIPChannel()); // 修改服務(wù)狀態(tài)為RUNNING this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The producer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; }
上面的代碼主要是創(chuàng)建了MQClientInstance
實例,并且通過start()
方法啟動。
通過針對這兩段代碼的debug,我們發(fā)現(xiàn)創(chuàng)建的兩個DefaultMQProducer
對象是共用了一個MQClientInstance
實例,并且所有針對NameServer
和Broker
的遠程操作全部是通過MQClientInstance
實例來做的。比如發(fā)送消息的時候需要找到對應(yīng)的Broker
下的消息隊列:
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); // 從NameServer更新topic路由 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }
最終我們發(fā)現(xiàn)兩個DefaultMQProducer
對象都是去同一個NameServer
下獲取對應(yīng)的topic
信息,這下問題就定位到了:因為使用了同一個MQClientInstance
實例導(dǎo)致不同的DefaultMQProducer
去訪問了同一個NameServer
,同一個集群需要同時接收兩個topic
的消息,也就出現(xiàn)了前面的報錯說topic
不存在的情況。
如何解決
我們來看看MQClientInstance
實例是如何保證唯一性的:
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) { // 生成clientID String clientId = clientConfig.buildMQClientId(); // 從緩存中獲取MQClientInstance MQClientInstance instance = this.factoryTable.get(clientId); if (null == instance) { // 沒有緩存的話就創(chuàng)建一個MQClientInstance instance = new MQClientInstance(clientConfig.cloneClientConfig(), this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook); // 新創(chuàng)建出來的再放進緩存 MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance); if (prev != null) { instance = prev; log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId); } else { log.info("Created new MQClientInstance for clientId:[{}]", clientId); } } // 返回MQClientInstance實例 return instance; }
我們之所以拿到的MQClientInstance
實例是同一個,是因為在同一個服務(wù)下創(chuàng)建的clientId
相同:
public String buildMQClientId() { StringBuilder sb = new StringBuilder(); sb.append(this.getClientIP()); sb.append("@"); sb.append(this.getInstanceName()); if (!UtilAll.isBlank(this.unitName)) { sb.append("@"); sb.append(this.unitName); } return sb.toString(); }
兩個clientId
都是192.168.18.173@14933
,為了防止clientId
相同,我們可以在創(chuàng)建DefaultMQProducer
實例是加上unitName
值,保證兩個unitName
值不同來避免共享同一個MQClientInstance
。
DefaultMQProducer producer1 = new DefaultMQProducer("producer_group_1"); producer1.setNamesrvAddr("192.168.2.230:9876"); producer1.setUnitName("producer1"); producer1.start(); DefaultMQProducer producer2 = new DefaultMQProducer("producer_group_1"); producer2.setNamesrvAddr("192.168.2.231:9876"); producer2.setUnitName("producer2"); producer2.start();
通過上述代碼修改后,兩個消息都發(fā)送成功了。
另一個辦法就是升級RocketMQ Client
到4.9.0
,我們來看一下RocketMQ Client 4.9.0
是怎么解決這個問題的:
public void changeInstanceNameToPID() { if (this.instanceName.equals("DEFAULT")) { this.instanceName = UtilAll.getPid() + "#" + System.nanoTime(); } }
RocketMQ Client 4.9.0
在后面補充了一個納秒值,之前的代碼是這樣的:
public void changeInstanceNameToPID() { if (this.instanceName.equals("DEFAULT")) { this.instanceName = String.valueOf(UtilAll.getPid()); } }
也就是說,在新的版本中,一個應(yīng)用服務(wù)內(nèi)創(chuàng)建多個DefaultMQProducer
就會有多個MQClientInstance
實例對應(yīng),不會再出現(xiàn)我們前面的報錯。
以上就是RocketMQ生產(chǎn)者一個應(yīng)用不能發(fā)送多個NameServer消息解決的詳細內(nèi)容,更多關(guān)于RocketMQ發(fā)送NameServer的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
淺談JAVA 線程狀態(tài)中可能存在的一些誤區(qū)
這篇文章主要介紹了淺談JAVA 線程狀態(tài)中可能存在的一些誤區(qū),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04java實現(xiàn)獲取安卓設(shè)備里已安裝的軟件包
本文給大家介紹的是如何獲取設(shè)備中已經(jīng)安裝的應(yīng)用軟件包的代碼,其核心方法原理很簡單,我們通過Android中提供的PackageManager類,來獲取手機中安裝的應(yīng)用程序信息2015-10-10spring mvc DispatcherServlet之前端控制器架構(gòu)詳解
這篇文章主要為大家詳細介紹了spring mvc DispatcherServlet之前端控制器架構(gòu),具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-04-04Java Math類的三個方法ceil,floor,round用法
這篇文章主要介紹了Java Math類的三個方法ceil,floor,round用法,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-07-07Mybatis order by 動態(tài)傳參出現(xiàn)的問題及解決方法
今天,我正在愉快地CRUD,突然發(fā)現(xiàn)出現(xiàn)一個Bug,我們來看看是怎么回事吧!接下來通過本文給大家介紹Mybatis order by 動態(tài)傳參出現(xiàn)的一個小bug,需要的朋友可以參考下2021-07-07