欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ源碼解析topic創(chuàng)建機(jī)制詳解

 更新時(shí)間:2022年08月19日 14:33:59   作者:螞蟻背大象  
這篇文章主要為大家介紹了RocketMQ源碼解析topic創(chuàng)建機(jī)制詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

1. RocketMQ Topic創(chuàng)建機(jī)制

以下源碼基于Rocket MQ 4.7.0

RocketMQ Topic創(chuàng)建機(jī)制分為兩種:一種自動(dòng)創(chuàng)建,一種手動(dòng)創(chuàng)建??梢酝ㄟ^(guò)設(shè)置broker的配置文件來(lái)禁用或者允許自動(dòng)創(chuàng)建。默認(rèn)是開(kāi)啟的允許自動(dòng)創(chuàng)建

autoCreateTopicEnable=true/false

下面會(huì)結(jié)合源碼來(lái)深度分析一下自動(dòng)創(chuàng)建和手動(dòng)創(chuàng)建的過(guò)程。

2. 自動(dòng)Topic

默認(rèn)情況下,topic不用手動(dòng)創(chuàng)建,當(dāng)producer進(jìn)行消息發(fā)送時(shí),會(huì)從nameserver拉取topic的路由信息,如果topic的路由信息不存在,那么會(huì)默認(rèn)拉取broker啟動(dòng)時(shí)默認(rèn)創(chuàng)建好名為“TBW102”的Topic,這定義在org.apache.rocketmq.common.MixAll類(lèi)中

// Will be created at broker when isAutoCreateTopicEnable
 public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102"; 

自動(dòng)創(chuàng)建開(kāi)關(guān)是下BrokerConfig類(lèi)中有一個(gè)私有變量:

@ImportantField
private boolean autoCreateTopicEnable = true;

這變量可以通過(guò)配置文件配置來(lái)進(jìn)行修改,代碼中的默認(rèn)值為true,所以在默認(rèn)的情況下Rocket MQ是會(huì)自動(dòng)創(chuàng)建Topic的。

在Broker啟動(dòng),會(huì)調(diào)用TopicConfigManager的構(gòu)造方法,在構(gòu)造方法中定義了一系列RocketMQ系統(tǒng)內(nèi)置的一些系統(tǒng)Topic(這里只關(guān)注一下TBW102):

{
    // MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
    if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
        String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
        TopicConfig topicConfig = new TopicConfig(topic);
        this.systemTopicList.add(topic);
        topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
            .getDefaultTopicQueueNums()); //8
        topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
            .getDefaultTopicQueueNums()); //8
        int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
        topicConfig.setPerm(perm);
        this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
    }
}

這里有 this.brokerController.getBrokerConfig().isAutoCreateTopicEnable() 這樣一段代碼,在開(kāi)啟允許自動(dòng)創(chuàng)建的時(shí)候,會(huì)把當(dāng)前Topic的信息存入topicConfigTable變量中。

然后通過(guò)發(fā)送定期發(fā)送心跳包把Topic和Broker的信息發(fā)送到NameServer的RouteInfoManager中進(jìn)行保存。在BrokerController中定義了這樣的一個(gè)定時(shí)任務(wù)來(lái)執(zhí)行這個(gè)心跳包的發(fā)送:

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

這里就說(shuō)明了如何把每個(gè)Broker的系統(tǒng)自定義的Topic注冊(cè)到NameServer。

接下來(lái)看在發(fā)送過(guò)程中如何從NameServer獲取Topic的路由信息: DefaultMQProducerImpl.sendDefaultImpl

private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        //省略代碼
        //獲取路由信息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    }

通過(guò)DefaultMQProducerImpl.tryToFindTopicPublishInfo方法獲取Topic的路由信息。

    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        //第一次從緩存中獲取--肯定沒(méi)有因?yàn)檫€沒(méi)創(chuàng)建
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            //從NameServer獲取--也是沒(méi)有,因?yàn)闆](méi)有創(chuàng)建
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }
        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            //第二次從這里獲取
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }

下面來(lái)看一下 MQClientInstance.updateTopicRouteInfoFromNameServer 的方法:

 public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
    //省略代碼
    if (isDefault && defaultMQProducer != null) {
            //使用默認(rèn)的TBW102 Topic獲取數(shù)據(jù)
            topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                            1000 * 3);
                if (topicRouteData != null) {
                    for (QueueData data : topicRouteData.getQueueDatas()) {
                                int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                                data.setReadQueueNums(queueNums);
                                data.setWriteQueueNums(queueNums);
                            }
                        }
                    } else {
                        //這是正常的
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    }
      //省略代碼      
    }

如果isDefault=true并且defaultMQProducer不為空,從nameserver中獲取默認(rèn)路由信息,此時(shí)會(huì)獲取所有已開(kāi)啟自動(dòng)創(chuàng)建開(kāi)關(guān)的broker的默認(rèn)“TBW102”topic路由信息,并保存默認(rèn)的topic消息隊(duì)列數(shù)量。

這里會(huì)比較一下配在在 DefaultMQProducer.defaultTopicQueueNums中的默認(rèn)值和TBW102中的值哪個(gè)更小。

 if (topicRouteData != null) {
        TopicRouteData old = this.topicRouteTable.get(topic);
        boolean changed = topicRouteDataIsChange(old, topicRouteData);
        if (!changed) {
            changed = this.isNeedUpdateTopicRouteInfo(topic);
        } else {
            log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
        }
 }

判斷獲取默認(rèn)的是否存在,如果存在把當(dāng)前的Topic的信息更新。

也就是把TBW102 Topic的數(shù)據(jù)更新為自動(dòng)創(chuàng)建的數(shù)據(jù)。

 if (changed) {
    TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
    for (BrokerData bd : topicRouteData.getBrokerDatas()) {
        this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
    }
    // Update Pub info
    {
        TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
        publishInfo.setHaveTopicRouterInfo(true);
        Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, MQProducerInner> entry = it.next();
            MQProducerInner impl = entry.getValue();
            if (impl != null) {
                impl.updateTopicPublishInfo(topic, publishInfo);
            }
        }
    }
    	// Update sub info
    {
        Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
        Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, MQConsumerInner> entry = it.next();
            MQConsumerInner impl = entry.getValue();
            if (impl != null) {
                impl.updateTopicSubscribeInfo(topic, subscribeInfo);
            }
        }
    }
    log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
    this.topicRouteTable.put(topic, cloneTopicRouteData);
    return true;
}

更新本地的緩存。這樣TBW102 Topic的負(fù)載和一些默認(rèn)的路由信息就會(huì)被自己創(chuàng)建的Topic使用。這里就是整個(gè)自動(dòng)創(chuàng)建的過(guò)程.

總結(jié)一下就是:通過(guò)使用系統(tǒng)內(nèi)部的一個(gè)TBW102的Topic的配置來(lái)自動(dòng)創(chuàng)建當(dāng)前用戶(hù)的要?jiǎng)?chuàng)建的自定義Topic。

3. 手動(dòng)創(chuàng)建--預(yù)先創(chuàng)建

手動(dòng)創(chuàng)建也叫預(yù)先創(chuàng)建,就是在使用Topic之前就創(chuàng)建,可以通過(guò)命令行或者通過(guò)RocketMQ的管理界面創(chuàng)建Topic。

通過(guò)界面控制臺(tái)創(chuàng)建

項(xiàng)目地址: github.com/apache/rock…

TopicController主要負(fù)責(zé)Topic的管理

@RequestMapping(value = "/createOrUpdate.do", method = { RequestMethod.POST})
@ResponseBody
public Object topicCreateOrUpdateRequest(@RequestBody TopicConfigInfo topicCreateOrUpdateRequest) {
    Preconditions.checkArgument(CollectionUtils.isNotEmpty(topicCreateOrUpdateRequest.getBrokerNameList()) || CollectionUtils.isNotEmpty(topicCreateOrUpdateRequest.getClusterNameList()),
            "clusterName or brokerName can not be all blank");
    logger.info("op=look topicCreateOrUpdateRequest={}", JsonUtil.obj2String(topicCreateOrUpdateRequest));
    topicService.createOrUpdate(topicCreateOrUpdateRequest);
    return true;
}

然后通過(guò)MQAdminExtImpl.createAndUpdateTopicConfig方法來(lái)創(chuàng)建:

    @Override
    public void createAndUpdateTopicConfig(String addr, TopicConfig config)
        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        MQAdminInstance.threadLocalMQAdminExt().createAndUpdateTopicConfig(addr, config);
    }

通過(guò)調(diào)用DefaultMQAdminExtImpl.createAndUpdateTopicConfig創(chuàng)建Topic

@Override
public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, MQBrokerException,
        InterruptedException, MQClientException {
    this.mqClientInstance.getMQClientAPIImpl().createTopic(addr, this.defaultMQAdminExt.getCreateTopicKey(), config, timeoutMillis);
}

最后通過(guò)MQClientAPIImpl.createTopic創(chuàng)建Topic

    public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig,
        final long timeoutMillis)
        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
        requestHeader.setTopic(topicConfig.getTopicName());
        requestHeader.setDefaultTopic(defaultTopic);
        requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
        requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
        requestHeader.setPerm(topicConfig.getPerm());
        requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
        requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
        requestHeader.setOrder(topicConfig.isOrder());
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
            request, timeoutMillis);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                return;
            }
            default:
                break;
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

以上就是RocketMQ源碼解析topic創(chuàng)建機(jī)制詳解的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ topic創(chuàng)建的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評(píng)論