RocketMQ源碼解析topic創(chuàng)建機制詳解
1. RocketMQ Topic創(chuàng)建機制
以下源碼基于Rocket MQ 4.7.0
RocketMQ Topic創(chuàng)建機制分為兩種:一種自動創(chuàng)建,一種手動創(chuàng)建??梢酝ㄟ^設(shè)置broker的配置文件來禁用或者允許自動創(chuàng)建。默認是開啟的允許自動創(chuàng)建
autoCreateTopicEnable=true/false
下面會結(jié)合源碼來深度分析一下自動創(chuàng)建和手動創(chuàng)建的過程。
2. 自動Topic
默認情況下,topic不用手動創(chuàng)建,當(dāng)producer進行消息發(fā)送時,會從nameserver拉取topic的路由信息,如果topic的路由信息不存在,那么會默認拉取broker啟動時默認創(chuàng)建好名為“TBW102”的Topic,這定義在org.apache.rocketmq.common.MixAll類中
// Will be created at broker when isAutoCreateTopicEnable public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102";
自動創(chuàng)建開關(guān)是下BrokerConfig類中有一個私有變量:
@ImportantField private boolean autoCreateTopicEnable = true;
這變量可以通過配置文件配置來進行修改,代碼中的默認值為true,所以在默認的情況下Rocket MQ是會自動創(chuàng)建Topic的。
在Broker啟動,會調(diào)用TopicConfigManager的構(gòu)造方法,在構(gòu)造方法中定義了一系列RocketMQ系統(tǒng)內(nèi)置的一些系統(tǒng)Topic(這里只關(guān)注一下TBW102):
{ // MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) { String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC; TopicConfig topicConfig = new TopicConfig(topic); this.systemTopicList.add(topic); topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig() .getDefaultTopicQueueNums()); //8 topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig() .getDefaultTopicQueueNums()); //8 int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE; topicConfig.setPerm(perm); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } }
這里有 this.brokerController.getBrokerConfig().isAutoCreateTopicEnable() 這樣一段代碼,在開啟允許自動創(chuàng)建的時候,會把當(dāng)前Topic的信息存入topicConfigTable變量中。
然后通過發(fā)送定期發(fā)送心跳包把Topic和Broker的信息發(fā)送到NameServer的RouteInfoManager中進行保存。在BrokerController中定義了這樣的一個定時任務(wù)來執(zhí)行這個心跳包的發(fā)送:
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
這里就說明了如何把每個Broker的系統(tǒng)自定義的Topic注冊到NameServer。
接下來看在發(fā)送過程中如何從NameServer獲取Topic的路由信息: DefaultMQProducerImpl.sendDefaultImpl
private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { //省略代碼 //獲取路由信息 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); }
通過DefaultMQProducerImpl.tryToFindTopicPublishInfo方法獲取Topic的路由信息。
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); //第一次從緩存中獲取--肯定沒有因為還沒創(chuàng)建 if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); //從NameServer獲取--也是沒有,因為沒有創(chuàng)建 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { //第二次從這里獲取 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }
下面來看一下 MQClientInstance.updateTopicRouteInfoFromNameServer 的方法:
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) { //省略代碼 if (isDefault && defaultMQProducer != null) { //使用默認的TBW102 Topic獲取數(shù)據(jù) topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 1000 * 3); if (topicRouteData != null) { for (QueueData data : topicRouteData.getQueueDatas()) { int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums()); data.setReadQueueNums(queueNums); data.setWriteQueueNums(queueNums); } } } else { //這是正常的 topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3); } //省略代碼 }
如果isDefault=true并且defaultMQProducer不為空,從nameserver中獲取默認路由信息,此時會獲取所有已開啟自動創(chuàng)建開關(guān)的broker的默認“TBW102”topic路由信息,并保存默認的topic消息隊列數(shù)量。
這里會比較一下配在在 DefaultMQProducer.defaultTopicQueueNums中的默認值和TBW102中的值哪個更小。
if (topicRouteData != null) { TopicRouteData old = this.topicRouteTable.get(topic); boolean changed = topicRouteDataIsChange(old, topicRouteData); if (!changed) { changed = this.isNeedUpdateTopicRouteInfo(topic); } else { log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData); } }
判斷獲取默認的是否存在,如果存在把當(dāng)前的Topic的信息更新。
也就是把TBW102 Topic的數(shù)據(jù)更新為自動創(chuàng)建的數(shù)據(jù)。
if (changed) { TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData(); for (BrokerData bd : topicRouteData.getBrokerDatas()) { this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs()); } // Update Pub info { TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData); publishInfo.setHaveTopicRouterInfo(true); Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQProducerInner> entry = it.next(); MQProducerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicPublishInfo(topic, publishInfo); } } } // Update sub info { Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData); Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQConsumerInner> entry = it.next(); MQConsumerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicSubscribeInfo(topic, subscribeInfo); } } } log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData); this.topicRouteTable.put(topic, cloneTopicRouteData); return true; }
更新本地的緩存。這樣TBW102 Topic的負載和一些默認的路由信息就會被自己創(chuàng)建的Topic使用。這里就是整個自動創(chuàng)建的過程.
總結(jié)一下就是:通過使用系統(tǒng)內(nèi)部的一個TBW102的Topic的配置來自動創(chuàng)建當(dāng)前用戶的要創(chuàng)建的自定義Topic。
3. 手動創(chuàng)建--預(yù)先創(chuàng)建
手動創(chuàng)建也叫預(yù)先創(chuàng)建,就是在使用Topic之前就創(chuàng)建,可以通過命令行或者通過RocketMQ的管理界面創(chuàng)建Topic。
通過界面控制臺創(chuàng)建
項目地址: github.com/apache/rock…
TopicController主要負責(zé)Topic的管理
@RequestMapping(value = "/createOrUpdate.do", method = { RequestMethod.POST}) @ResponseBody public Object topicCreateOrUpdateRequest(@RequestBody TopicConfigInfo topicCreateOrUpdateRequest) { Preconditions.checkArgument(CollectionUtils.isNotEmpty(topicCreateOrUpdateRequest.getBrokerNameList()) || CollectionUtils.isNotEmpty(topicCreateOrUpdateRequest.getClusterNameList()), "clusterName or brokerName can not be all blank"); logger.info("op=look topicCreateOrUpdateRequest={}", JsonUtil.obj2String(topicCreateOrUpdateRequest)); topicService.createOrUpdate(topicCreateOrUpdateRequest); return true; }
然后通過MQAdminExtImpl.createAndUpdateTopicConfig方法來創(chuàng)建:
@Override public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { MQAdminInstance.threadLocalMQAdminExt().createAndUpdateTopicConfig(addr, config); }
通過調(diào)用DefaultMQAdminExtImpl.createAndUpdateTopicConfig創(chuàng)建Topic
@Override public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { this.mqClientInstance.getMQClientAPIImpl().createTopic(addr, this.defaultMQAdminExt.getCreateTopicKey(), config, timeoutMillis); }
最后通過MQClientAPIImpl.createTopic創(chuàng)建Topic
public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader(); requestHeader.setTopic(topicConfig.getTopicName()); requestHeader.setDefaultTopic(defaultTopic); requestHeader.setReadQueueNums(topicConfig.getReadQueueNums()); requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums()); requestHeader.setPerm(topicConfig.getPerm()); requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name()); requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag()); requestHeader.setOrder(topicConfig.isOrder()); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { return; } default: break; } throw new MQClientException(response.getCode(), response.getRemark()); }
以上就是RocketMQ源碼解析topic創(chuàng)建機制詳解的詳細內(nèi)容,更多關(guān)于RocketMQ topic創(chuàng)建的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
IDEA中將SpringBoot項目提交到git倉庫的方法步驟
本文主要介紹了IDEA中將SpringBoot項目提交到git倉庫的方法步驟,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-12-12spring cloud中微服務(wù)之間的調(diào)用以及eureka的自我保護機制詳解
這篇文章主要介紹了spring cloud中微服務(wù)之間的調(diào)用以及eureka的自我保護機制詳解,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-07-07JUC并發(fā)編程LinkedBlockingQueue隊列深入分析源碼
LinkedBlockingQueue 是一個可選有界阻塞隊列,這篇文章主要為大家詳細介紹了Java中LinkedBlockingQueue的實現(xiàn)原理與適用場景,感興趣的可以了解一下2023-04-04