RocketMQ源碼解析topic創(chuàng)建機(jī)制詳解
1. RocketMQ Topic創(chuàng)建機(jī)制
以下源碼基于Rocket MQ 4.7.0
RocketMQ Topic創(chuàng)建機(jī)制分為兩種:一種自動(dòng)創(chuàng)建,一種手動(dòng)創(chuàng)建??梢酝ㄟ^(guò)設(shè)置broker的配置文件來(lái)禁用或者允許自動(dòng)創(chuàng)建。默認(rèn)是開(kāi)啟的允許自動(dòng)創(chuàng)建
autoCreateTopicEnable=true/false
下面會(huì)結(jié)合源碼來(lái)深度分析一下自動(dòng)創(chuàng)建和手動(dòng)創(chuàng)建的過(guò)程。
2. 自動(dòng)Topic
默認(rèn)情況下,topic不用手動(dòng)創(chuàng)建,當(dāng)producer進(jìn)行消息發(fā)送時(shí),會(huì)從nameserver拉取topic的路由信息,如果topic的路由信息不存在,那么會(huì)默認(rèn)拉取broker啟動(dòng)時(shí)默認(rèn)創(chuàng)建好名為“TBW102”的Topic,這定義在org.apache.rocketmq.common.MixAll類(lèi)中
// Will be created at broker when isAutoCreateTopicEnable public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102";
自動(dòng)創(chuàng)建開(kāi)關(guān)是下BrokerConfig類(lèi)中有一個(gè)私有變量:
@ImportantField private boolean autoCreateTopicEnable = true;
這變量可以通過(guò)配置文件配置來(lái)進(jìn)行修改,代碼中的默認(rèn)值為true,所以在默認(rèn)的情況下Rocket MQ是會(huì)自動(dòng)創(chuàng)建Topic的。
在Broker啟動(dòng),會(huì)調(diào)用TopicConfigManager的構(gòu)造方法,在構(gòu)造方法中定義了一系列RocketMQ系統(tǒng)內(nèi)置的一些系統(tǒng)Topic(這里只關(guān)注一下TBW102):
{ // MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) { String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC; TopicConfig topicConfig = new TopicConfig(topic); this.systemTopicList.add(topic); topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig() .getDefaultTopicQueueNums()); //8 topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig() .getDefaultTopicQueueNums()); //8 int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE; topicConfig.setPerm(perm); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } }
這里有 this.brokerController.getBrokerConfig().isAutoCreateTopicEnable() 這樣一段代碼,在開(kāi)啟允許自動(dòng)創(chuàng)建的時(shí)候,會(huì)把當(dāng)前Topic的信息存入topicConfigTable變量中。
然后通過(guò)發(fā)送定期發(fā)送心跳包把Topic和Broker的信息發(fā)送到NameServer的RouteInfoManager中進(jìn)行保存。在BrokerController中定義了這樣的一個(gè)定時(shí)任務(wù)來(lái)執(zhí)行這個(gè)心跳包的發(fā)送:
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
這里就說(shuō)明了如何把每個(gè)Broker的系統(tǒng)自定義的Topic注冊(cè)到NameServer。
接下來(lái)看在發(fā)送過(guò)程中如何從NameServer獲取Topic的路由信息: DefaultMQProducerImpl.sendDefaultImpl
private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { //省略代碼 //獲取路由信息 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); }
通過(guò)DefaultMQProducerImpl.tryToFindTopicPublishInfo方法獲取Topic的路由信息。
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); //第一次從緩存中獲取--肯定沒(méi)有因?yàn)檫€沒(méi)創(chuàng)建 if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); //從NameServer獲取--也是沒(méi)有,因?yàn)闆](méi)有創(chuàng)建 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { //第二次從這里獲取 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }
下面來(lái)看一下 MQClientInstance.updateTopicRouteInfoFromNameServer 的方法:
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) { //省略代碼 if (isDefault && defaultMQProducer != null) { //使用默認(rèn)的TBW102 Topic獲取數(shù)據(jù) topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 1000 * 3); if (topicRouteData != null) { for (QueueData data : topicRouteData.getQueueDatas()) { int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums()); data.setReadQueueNums(queueNums); data.setWriteQueueNums(queueNums); } } } else { //這是正常的 topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3); } //省略代碼 }
如果isDefault=true并且defaultMQProducer不為空,從nameserver中獲取默認(rèn)路由信息,此時(shí)會(huì)獲取所有已開(kāi)啟自動(dòng)創(chuàng)建開(kāi)關(guān)的broker的默認(rèn)“TBW102”topic路由信息,并保存默認(rèn)的topic消息隊(duì)列數(shù)量。
這里會(huì)比較一下配在在 DefaultMQProducer.defaultTopicQueueNums中的默認(rèn)值和TBW102中的值哪個(gè)更小。
if (topicRouteData != null) { TopicRouteData old = this.topicRouteTable.get(topic); boolean changed = topicRouteDataIsChange(old, topicRouteData); if (!changed) { changed = this.isNeedUpdateTopicRouteInfo(topic); } else { log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData); } }
判斷獲取默認(rèn)的是否存在,如果存在把當(dāng)前的Topic的信息更新。
也就是把TBW102 Topic的數(shù)據(jù)更新為自動(dòng)創(chuàng)建的數(shù)據(jù)。
if (changed) { TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData(); for (BrokerData bd : topicRouteData.getBrokerDatas()) { this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs()); } // Update Pub info { TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData); publishInfo.setHaveTopicRouterInfo(true); Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQProducerInner> entry = it.next(); MQProducerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicPublishInfo(topic, publishInfo); } } } // Update sub info { Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData); Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQConsumerInner> entry = it.next(); MQConsumerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicSubscribeInfo(topic, subscribeInfo); } } } log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData); this.topicRouteTable.put(topic, cloneTopicRouteData); return true; }
更新本地的緩存。這樣TBW102 Topic的負(fù)載和一些默認(rèn)的路由信息就會(huì)被自己創(chuàng)建的Topic使用。這里就是整個(gè)自動(dòng)創(chuàng)建的過(guò)程.
總結(jié)一下就是:通過(guò)使用系統(tǒng)內(nèi)部的一個(gè)TBW102的Topic的配置來(lái)自動(dòng)創(chuàng)建當(dāng)前用戶(hù)的要?jiǎng)?chuàng)建的自定義Topic。
3. 手動(dòng)創(chuàng)建--預(yù)先創(chuàng)建
手動(dòng)創(chuàng)建也叫預(yù)先創(chuàng)建,就是在使用Topic之前就創(chuàng)建,可以通過(guò)命令行或者通過(guò)RocketMQ的管理界面創(chuàng)建Topic。
通過(guò)界面控制臺(tái)創(chuàng)建
項(xiàng)目地址: github.com/apache/rock…
TopicController主要負(fù)責(zé)Topic的管理
@RequestMapping(value = "/createOrUpdate.do", method = { RequestMethod.POST}) @ResponseBody public Object topicCreateOrUpdateRequest(@RequestBody TopicConfigInfo topicCreateOrUpdateRequest) { Preconditions.checkArgument(CollectionUtils.isNotEmpty(topicCreateOrUpdateRequest.getBrokerNameList()) || CollectionUtils.isNotEmpty(topicCreateOrUpdateRequest.getClusterNameList()), "clusterName or brokerName can not be all blank"); logger.info("op=look topicCreateOrUpdateRequest={}", JsonUtil.obj2String(topicCreateOrUpdateRequest)); topicService.createOrUpdate(topicCreateOrUpdateRequest); return true; }
然后通過(guò)MQAdminExtImpl.createAndUpdateTopicConfig方法來(lái)創(chuàng)建:
@Override public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { MQAdminInstance.threadLocalMQAdminExt().createAndUpdateTopicConfig(addr, config); }
通過(guò)調(diào)用DefaultMQAdminExtImpl.createAndUpdateTopicConfig創(chuàng)建Topic
@Override public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { this.mqClientInstance.getMQClientAPIImpl().createTopic(addr, this.defaultMQAdminExt.getCreateTopicKey(), config, timeoutMillis); }
最后通過(guò)MQClientAPIImpl.createTopic創(chuàng)建Topic
public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader(); requestHeader.setTopic(topicConfig.getTopicName()); requestHeader.setDefaultTopic(defaultTopic); requestHeader.setReadQueueNums(topicConfig.getReadQueueNums()); requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums()); requestHeader.setPerm(topicConfig.getPerm()); requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name()); requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag()); requestHeader.setOrder(topicConfig.isOrder()); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { return; } default: break; } throw new MQClientException(response.getCode(), response.getRemark()); }
以上就是RocketMQ源碼解析topic創(chuàng)建機(jī)制詳解的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ topic創(chuàng)建的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
IDEA中將SpringBoot項(xiàng)目提交到git倉(cāng)庫(kù)的方法步驟
本文主要介紹了IDEA中將SpringBoot項(xiàng)目提交到git倉(cāng)庫(kù)的方法步驟,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-12-12詳解如何用Java實(shí)現(xiàn)對(duì)m3u8直播流抽幀
抽幀(frame extraction)是指從視頻流中提取一些特定的幀,通常是關(guān)鍵幀或者隨機(jī)幀,以供后續(xù)處理。這篇文章主要為大家介紹了如何用Java實(shí)現(xiàn)對(duì)m3u8直播流抽幀,需要的可以參考一下2023-03-03spring cloud中微服務(wù)之間的調(diào)用以及eureka的自我保護(hù)機(jī)制詳解
這篇文章主要介紹了spring cloud中微服務(wù)之間的調(diào)用以及eureka的自我保護(hù)機(jī)制詳解,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-07-07Java中常用的設(shè)計(jì)模式之責(zé)任鏈模式詳解
這篇文章主要為大家詳細(xì)介紹了Java中常用的設(shè)計(jì)模式之責(zé)任鏈模式,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來(lái)幫助2022-02-02java中vector與hashtable操作實(shí)例分享
java中vector與hashtable操作實(shí)例,有需要的朋友可以參考一下2014-01-01JUC并發(fā)編程LinkedBlockingQueue隊(duì)列深入分析源碼
LinkedBlockingQueue 是一個(gè)可選有界阻塞隊(duì)列,這篇文章主要為大家詳細(xì)介紹了Java中LinkedBlockingQueue的實(shí)現(xiàn)原理與適用場(chǎng)景,感興趣的可以了解一下2023-04-04