欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ源碼解析topic創(chuàng)建機制詳解

 更新時間:2022年08月19日 14:33:59   作者:螞蟻背大象  
這篇文章主要為大家介紹了RocketMQ源碼解析topic創(chuàng)建機制詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

1. RocketMQ Topic創(chuàng)建機制

以下源碼基于Rocket MQ 4.7.0

RocketMQ Topic創(chuàng)建機制分為兩種:一種自動創(chuàng)建,一種手動創(chuàng)建??梢酝ㄟ^設(shè)置broker的配置文件來禁用或者允許自動創(chuàng)建。默認是開啟的允許自動創(chuàng)建

autoCreateTopicEnable=true/false

下面會結(jié)合源碼來深度分析一下自動創(chuàng)建和手動創(chuàng)建的過程。

2. 自動Topic

默認情況下,topic不用手動創(chuàng)建,當(dāng)producer進行消息發(fā)送時,會從nameserver拉取topic的路由信息,如果topic的路由信息不存在,那么會默認拉取broker啟動時默認創(chuàng)建好名為“TBW102”的Topic,這定義在org.apache.rocketmq.common.MixAll類中

// Will be created at broker when isAutoCreateTopicEnable
 public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102"; 

自動創(chuàng)建開關(guān)是下BrokerConfig類中有一個私有變量:

@ImportantField
private boolean autoCreateTopicEnable = true;

這變量可以通過配置文件配置來進行修改,代碼中的默認值為true,所以在默認的情況下Rocket MQ是會自動創(chuàng)建Topic的。

在Broker啟動,會調(diào)用TopicConfigManager的構(gòu)造方法,在構(gòu)造方法中定義了一系列RocketMQ系統(tǒng)內(nèi)置的一些系統(tǒng)Topic(這里只關(guān)注一下TBW102):

{
    // MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
    if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
        String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
        TopicConfig topicConfig = new TopicConfig(topic);
        this.systemTopicList.add(topic);
        topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
            .getDefaultTopicQueueNums()); //8
        topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
            .getDefaultTopicQueueNums()); //8
        int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
        topicConfig.setPerm(perm);
        this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
    }
}

這里有 this.brokerController.getBrokerConfig().isAutoCreateTopicEnable() 這樣一段代碼,在開啟允許自動創(chuàng)建的時候,會把當(dāng)前Topic的信息存入topicConfigTable變量中。

然后通過發(fā)送定期發(fā)送心跳包把Topic和Broker的信息發(fā)送到NameServer的RouteInfoManager中進行保存。在BrokerController中定義了這樣的一個定時任務(wù)來執(zhí)行這個心跳包的發(fā)送:

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

這里就說明了如何把每個Broker的系統(tǒng)自定義的Topic注冊到NameServer。

接下來看在發(fā)送過程中如何從NameServer獲取Topic的路由信息: DefaultMQProducerImpl.sendDefaultImpl

private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        //省略代碼
        //獲取路由信息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    }

通過DefaultMQProducerImpl.tryToFindTopicPublishInfo方法獲取Topic的路由信息。

    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        //第一次從緩存中獲取--肯定沒有因為還沒創(chuàng)建
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            //從NameServer獲取--也是沒有,因為沒有創(chuàng)建
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }
        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            //第二次從這里獲取
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }

下面來看一下 MQClientInstance.updateTopicRouteInfoFromNameServer 的方法:

 public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
    //省略代碼
    if (isDefault && defaultMQProducer != null) {
            //使用默認的TBW102 Topic獲取數(shù)據(jù)
            topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                            1000 * 3);
                if (topicRouteData != null) {
                    for (QueueData data : topicRouteData.getQueueDatas()) {
                                int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                                data.setReadQueueNums(queueNums);
                                data.setWriteQueueNums(queueNums);
                            }
                        }
                    } else {
                        //這是正常的
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    }
      //省略代碼      
    }

如果isDefault=true并且defaultMQProducer不為空,從nameserver中獲取默認路由信息,此時會獲取所有已開啟自動創(chuàng)建開關(guān)的broker的默認“TBW102”topic路由信息,并保存默認的topic消息隊列數(shù)量。

這里會比較一下配在在 DefaultMQProducer.defaultTopicQueueNums中的默認值和TBW102中的值哪個更小。

 if (topicRouteData != null) {
        TopicRouteData old = this.topicRouteTable.get(topic);
        boolean changed = topicRouteDataIsChange(old, topicRouteData);
        if (!changed) {
            changed = this.isNeedUpdateTopicRouteInfo(topic);
        } else {
            log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
        }
 }

判斷獲取默認的是否存在,如果存在把當(dāng)前的Topic的信息更新。

也就是把TBW102 Topic的數(shù)據(jù)更新為自動創(chuàng)建的數(shù)據(jù)。

 if (changed) {
    TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
    for (BrokerData bd : topicRouteData.getBrokerDatas()) {
        this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
    }
    // Update Pub info
    {
        TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
        publishInfo.setHaveTopicRouterInfo(true);
        Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, MQProducerInner> entry = it.next();
            MQProducerInner impl = entry.getValue();
            if (impl != null) {
                impl.updateTopicPublishInfo(topic, publishInfo);
            }
        }
    }
    	// Update sub info
    {
        Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
        Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, MQConsumerInner> entry = it.next();
            MQConsumerInner impl = entry.getValue();
            if (impl != null) {
                impl.updateTopicSubscribeInfo(topic, subscribeInfo);
            }
        }
    }
    log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
    this.topicRouteTable.put(topic, cloneTopicRouteData);
    return true;
}

更新本地的緩存。這樣TBW102 Topic的負載和一些默認的路由信息就會被自己創(chuàng)建的Topic使用。這里就是整個自動創(chuàng)建的過程.

總結(jié)一下就是:通過使用系統(tǒng)內(nèi)部的一個TBW102的Topic的配置來自動創(chuàng)建當(dāng)前用戶的要創(chuàng)建的自定義Topic。

3. 手動創(chuàng)建--預(yù)先創(chuàng)建

手動創(chuàng)建也叫預(yù)先創(chuàng)建,就是在使用Topic之前就創(chuàng)建,可以通過命令行或者通過RocketMQ的管理界面創(chuàng)建Topic。

通過界面控制臺創(chuàng)建

項目地址: github.com/apache/rock…

TopicController主要負責(zé)Topic的管理

@RequestMapping(value = "/createOrUpdate.do", method = { RequestMethod.POST})
@ResponseBody
public Object topicCreateOrUpdateRequest(@RequestBody TopicConfigInfo topicCreateOrUpdateRequest) {
    Preconditions.checkArgument(CollectionUtils.isNotEmpty(topicCreateOrUpdateRequest.getBrokerNameList()) || CollectionUtils.isNotEmpty(topicCreateOrUpdateRequest.getClusterNameList()),
            "clusterName or brokerName can not be all blank");
    logger.info("op=look topicCreateOrUpdateRequest={}", JsonUtil.obj2String(topicCreateOrUpdateRequest));
    topicService.createOrUpdate(topicCreateOrUpdateRequest);
    return true;
}

然后通過MQAdminExtImpl.createAndUpdateTopicConfig方法來創(chuàng)建:

    @Override
    public void createAndUpdateTopicConfig(String addr, TopicConfig config)
        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        MQAdminInstance.threadLocalMQAdminExt().createAndUpdateTopicConfig(addr, config);
    }

通過調(diào)用DefaultMQAdminExtImpl.createAndUpdateTopicConfig創(chuàng)建Topic

@Override
public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, MQBrokerException,
        InterruptedException, MQClientException {
    this.mqClientInstance.getMQClientAPIImpl().createTopic(addr, this.defaultMQAdminExt.getCreateTopicKey(), config, timeoutMillis);
}

最后通過MQClientAPIImpl.createTopic創(chuàng)建Topic

    public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig,
        final long timeoutMillis)
        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
        requestHeader.setTopic(topicConfig.getTopicName());
        requestHeader.setDefaultTopic(defaultTopic);
        requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
        requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
        requestHeader.setPerm(topicConfig.getPerm());
        requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
        requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
        requestHeader.setOrder(topicConfig.isOrder());
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
            request, timeoutMillis);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                return;
            }
            default:
                break;
        }
        throw new MQClientException(response.getCode(), response.getRemark());
    }

以上就是RocketMQ源碼解析topic創(chuàng)建機制詳解的詳細內(nèi)容,更多關(guān)于RocketMQ topic創(chuàng)建的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評論