欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ源碼解析broker?啟動(dòng)流程

 更新時(shí)間:2023年03月23日 11:09:25   作者:hsfxuebao  
這篇文章主要為大家介紹了RocketMQ源碼解析broker啟動(dòng)流程詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

1. 啟動(dòng)入口

本系列RocketMQ4.8注釋github地址,希望對(duì)大家有所幫助,要是覺(jué)得可以的話(huà)麻煩給點(diǎn)一下Star哈

前面我們已經(jīng)分析完了NameServerproducer,從本文開(kāi)始,我們將分析Broker

broker的啟動(dòng)類(lèi)為org.apache.rocketmq.broker.BrokerStartup,代碼如下:

public class BrokerStartup {
    ...
    public static void main(String[] args) {
        start(createBrokerController(args));
    }
    ...
}

main()方法中,僅有一行代碼,這行代碼包含了兩個(gè)操作:

  • createBrokerController(...):創(chuàng)建BrokerController
  • start(...):?jiǎn)?dòng)Broker

接下來(lái)我們就來(lái)分析這兩個(gè)操作。

2. 創(chuàng)建BrokerController

創(chuàng)建BrokerController的方法為BrokerStartup#createBrokerController,代碼如下:

/**
 * 創(chuàng)建 broker 的配置參數(shù)
 */
public static BrokerController createBrokerController(String[] args) {
    ...
    try {
        //解析命令行參數(shù)
        Options options = ServerUtil.buildCommandlineOptions(new Options());
        commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
            new PosixParser());
        if (null == commandLine) {
            System.exit(-1);
        }
        // 處理配置
        final BrokerConfig brokerConfig = new BrokerConfig();
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        final NettyClientConfig nettyClientConfig = new NettyClientConfig();
        // tls安全相關(guān)
        nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
            String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
        // 配置端口
        nettyServerConfig.setListenPort(10911);
        // 消息存儲(chǔ)的配置
        final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
        ...
        // 將命令行中的配置設(shè)置到brokerConfig對(duì)象中
        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
        // 檢查環(huán)境變量:ROCKETMQ_HOME
        if (null == brokerConfig.getRocketmqHome()) {
            System.out.printf("Please set the %s variable in your environment to match 
                the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
            System.exit(-2);
        }
        //省略一些配置
        ...
        // 創(chuàng)建 brokerController
        final BrokerController controller = new BrokerController(
            brokerConfig,
            nettyServerConfig,
            nettyClientConfig,
            messageStoreConfig);
        controller.getConfiguration().registerConfig(properties);
        // 初始化
        boolean initResult = controller.initialize();
        if (!initResult) {
            controller.shutdown();
            System.exit(-3);
        }
        // 關(guān)閉鉤子,在關(guān)閉前處理一些操作
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            private volatile boolean hasShutdown = false;
            private AtomicInteger shutdownTimes = new AtomicInteger(0);
            @Override
            public void run() {
                synchronized (this) {
                    if (!this.hasShutdown) {
                        ...
                        // 這里會(huì)發(fā)一條注銷(xiāo)消息給nameServer
                        controller.shutdown();
                        ...
                    }
                }
            }
        }, "ShutdownHook"));
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }
    return null;
}

這個(gè)方法的代碼有點(diǎn)長(zhǎng),但功能并不多,總的來(lái)說(shuō)就三個(gè)功能:

  • 處理配置:主要是處理nettyServerConfignettyClientConfig的配置,這塊就是一些配置解析的操作,處理方式與NameServer很類(lèi)似,這里就不多說(shuō)了。
  • 創(chuàng)建及初始化controller:調(diào)用方法controller.initialize(),這塊內(nèi)容我們后面分析。
  • 注冊(cè)關(guān)閉鉤子:調(diào)用Runtime.getRuntime().addShutdownHook(...),可以在jvm進(jìn)程關(guān)閉前進(jìn)行一些操作。

2.1 controller實(shí)例化

BrokerController的創(chuàng)建及初始化是在BrokerStartup#createBrokerController方法中進(jìn)行,我們先來(lái)看看它的構(gòu)造方法:

public BrokerController(
    final BrokerConfig brokerConfig,
    final NettyServerConfig nettyServerConfig,
    final NettyClientConfig nettyClientConfig,
    final MessageStoreConfig messageStoreConfig
) {
    // 4個(gè)核心配置信息
    this.brokerConfig = brokerConfig;
    this.nettyServerConfig = nettyServerConfig;
    this.nettyClientConfig = nettyClientConfig;
    this.messageStoreConfig = messageStoreConfig;
    // 管理consumer消費(fèi)消息的offset
    this.consumerOffsetManager = new ConsumerOffsetManager(this);
    // 管理topic配置
    this.topicConfigManager = new TopicConfigManager(this);
    // 處理 consumer 拉消息請(qǐng)求的
    this.pullMessageProcessor = new PullMessageProcessor(this);
    this.pullRequestHoldService = new PullRequestHoldService(this);
    // 消息送達(dá)的監(jiān)聽(tīng)器
    this.messageArrivingListener 
        = new NotifyMessageArrivingListener(this.pullRequestHoldService);
    ...
    // 往外發(fā)消息的組件
    this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
    ...
}

BrokerController的構(gòu)造方法很長(zhǎng),基本都是一些賦值操作,代碼中已列出關(guān)鍵項(xiàng),這些包括:

  • 核心配置賦值:主要是brokerConfig/nettyServerConfig/nettyClientConfig/messageStoreConfig四個(gè)配置
  • ConsumerOffsetManager:管理consumer消費(fèi)消息位置的偏移量,偏移量表示消費(fèi)者組消費(fèi)該topic消息的位置,后面再消費(fèi)時(shí),就從該位置后消費(fèi),避免重復(fù)消費(fèi)消息,也避免了漏消費(fèi)消息。
  • topicConfigManagertopic配置管理器,就是用來(lái)管理topic配置的,如topic名稱(chēng),topic隊(duì)列數(shù)量
  • pullMessageProcessor:消息處理器,用來(lái)處理消費(fèi)者拉消息
  • messageArrivingListener:消息送達(dá)的監(jiān)聽(tīng)器,當(dāng)生產(chǎn)者的消息送達(dá)時(shí),由該監(jiān)聽(tīng)器監(jiān)聽(tīng)
  • brokerOuterAPI:往外發(fā)消息的組件,如向NameServer發(fā)送注冊(cè)/注銷(xiāo)消息

以上這些組件的用處,這里先混個(gè)臉熟,我們后面再分析。

2.2 初始化controller

我們?cè)賮?lái)看看初始化操作,方法為BrokerController#initialize

public boolean initialize() throws CloneNotSupportedException {
    // 加載配置文件中的配置
    boolean result = this.topicConfigManager.load();
    result = result && this.consumerOffsetManager.load();
    result = result && this.subscriptionGroupManager.load();
    result = result && this.consumerFilterManager.load();
    if (result) {
        try {
            // 消息存儲(chǔ)管理組件,管理磁盤(pán)上的消息
            this.messageStore =
                new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, 
                    this.messageArrivingListener, this.brokerConfig);
            // 啟用了DLeger,就創(chuàng)建DLeger相關(guān)組件
            if (messageStoreConfig.isEnableDLegerCommitLog()) {
                ...
            }
            // broker統(tǒng)計(jì)組件
            this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
            //load plugin
            MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, 
                brokerStatsManager, messageArrivingListener, brokerConfig);
            this.messageStore = MessageStoreFactory.build(context, this.messageStore);
            this.messageStore.getDispatcherList().addFirst(
                new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
        } catch (IOException e) {
            result = false;
            log.error("Failed to initialize", e);
        }
    }
    // 加載磁盤(pán)上的記錄,如commitLog寫(xiě)入的位置、消費(fèi)者主題/隊(duì)列的信息
    result = result && this.messageStore.load();
    if (result) {
        // 處理 nettyServer
        this.remotingServer = new NettyRemotingServer(
            this.nettyServerConfig, this.clientHousekeepingService);
        NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
        fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
        this.fastRemotingServer = new NettyRemotingServer(
            fastConfig, this.clientHousekeepingService);
        // 創(chuàng)建線程池start... 這里會(huì)創(chuàng)建多種類(lèi)型的線程池
        ...
        // 處理consumer pull操作的線程池
        this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getPullMessageThreadPoolNums(),
            this.brokerConfig.getPullMessageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.pullThreadPoolQueue,
            new ThreadFactoryImpl("PullMessageThread_"));
        ...
        // 創(chuàng)建線程池end...
        // 注冊(cè)處理器
        this.registerProcessor();
        // 啟動(dòng)定時(shí)任務(wù)start... 這里會(huì)啟動(dòng)好多的定時(shí)任務(wù)
        ...
        // 定時(shí)將consumer消費(fèi)到的offset進(jìn)行持久化操作,即將數(shù)據(jù)保存到磁盤(pán)上
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.consumerOffsetManager.persist();
                } catch (Throwable e) {
                    log.error("schedule persist consumerOffset error.", e);
                }
            }
        }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
        ...
        // 啟動(dòng)定時(shí)任務(wù)end...
        ...
        // 開(kāi)啟 DLeger 的一些操作
        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
            ...
        }
        // 處理tls配置
        if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
            ...
        }
        // 初始化一些操作
        initialTransaction();
        initialAcl();
        initialRpcHooks();
    }
    return result;
}

這個(gè)還是很長(zhǎng),關(guān)鍵部分都做了注釋?zhuān)摲椒ㄋ龅墓ぷ魅缦拢?/p>

  • 加載配置文件中的配置
  • 賦值與初始化操作
  • 創(chuàng)建線程池
  • 注冊(cè)處理器
  • 啟動(dòng)定時(shí)任務(wù)

這里我們來(lái)看下注冊(cè)處理器的操作this.registerProcessor():

2.2.1 注冊(cè)處理器:BrokerController#registerProcessor

this.registerProcessor()實(shí)際調(diào)用的方法是BrokerController#registerProcessor,代碼如下:

public void registerProcessor() {
    /**
     * SendMessageProcessor
     */
    SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
    sendProcessor.registerSendMessageHook(sendMessageHookList);
    sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
    this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, 
        this.sendMessageExecutor);
    this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor,  
        this.sendMessageExecutor);
    this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, 
        this.sendMessageExecutor);
    this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, 
        this.sendMessageExecutor);
    ...
    /**
     * PullMessageProcessor
     */
    this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, 
        this.pullMessageExecutor);
    this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
    /**
        * ReplyMessageProcessor
        */
    ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);
    replyMessageProcessor.registerSendMessageHook(sendMessageHookList);
    ...
}

這個(gè)方法里注冊(cè)了許許多多的處理器,這里僅列出了與消息相關(guān)的內(nèi)容,如發(fā)送消息、回復(fù)消息、拉取消息等,后面在處理producer/consumer的消息時(shí),就會(huì)用到這些處理器,這里先不展開(kāi)分析。

2.2.2 remotingServer注冊(cè)處理器:NettyRemotingServer#registerProcessor

我們來(lái)看下remotingServer注冊(cè)處理器的操作,方法為NettyRemotingServer#registerProcessor

public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
    ...
    @Override
    public void registerProcessor(int requestCode, NettyRequestProcessor processor, 
            ExecutorService executor) {
        ExecutorService executorThis = executor;
        if (null == executor) {
            executorThis = this.publicExecutor;
        }
        Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, 
                ExecutorService>(processor, executorThis);
        // 注冊(cè)到processorTable 中
        this.processorTable.put(requestCode, pair);
    }
    ...
}

最終,這些處理器注冊(cè)到了processorTable中,它是NettyRemotingAbstract的成員變量,定義如下:

HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>>

這是一個(gè)hashMap的結(jié)構(gòu),keycodevaluePair,該類(lèi)中有兩個(gè)成員變量:NettyRequestProcessor、ExecutorService,codeNettyRequestProcessor的映射關(guān)系就是在hashMap里存儲(chǔ)的。

2.3 注冊(cè)關(guān)閉鉤子:Runtime.getRuntime().addShutdownHook(...)

接著我們來(lái)看看注冊(cè)關(guān)閉鉤子的操作:

// 關(guān)閉鉤子,在關(guān)閉前處理一些操作
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
    private volatile boolean hasShutdown = false;
    private AtomicInteger shutdownTimes = new AtomicInteger(0);
    @Override
    public void run() {
        synchronized (this) {
            if (!this.hasShutdown) {
                ...
                // 這里會(huì)發(fā)一條注銷(xiāo)消息給nameServer
                controller.shutdown();
                ...
            }
        }
    }
}, "ShutdownHook"));

跟進(jìn)BrokerController#shutdown方法:

public void shutdown() {
    // 調(diào)用各組件的shutdown方法
    ...
    // 發(fā)送注銷(xiāo)消息到NameServer
    this.unregisterBrokerAll();
    ...
    // 持久化consumer的消費(fèi)偏移量
    this.consumerOffsetManager.persist();
    // 又是調(diào)用各組件的shutdown方法
    ...

這個(gè)方法里會(huì)調(diào)用各組件的shutdown()方法、發(fā)送注銷(xiāo)消息給NameServer、持久化consumer的消費(fèi)偏移量,這里我們主要看發(fā)送注銷(xiāo)消息的方法BrokerController#unregisterBrokerAll:

private void unregisterBrokerAll() {
    // 發(fā)送一條注銷(xiāo)消息給nameServer
    this.brokerOuterAPI.unregisterBrokerAll(
        this.brokerConfig.getBrokerClusterName(),
        this.getBrokerAddr(),
        this.brokerConfig.getBrokerName(),
        this.brokerConfig.getBrokerId());
}

繼續(xù)進(jìn)入BrokerOuterAPI#unregisterBrokerAll

public void unregisterBrokerAll(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId
) {
    // 獲取所有的 nameServer,遍歷發(fā)送注銷(xiāo)消息
    List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
    if (nameServerAddressList != null) {
        for (String namesrvAddr : nameServerAddressList) {
            try {
                this.unregisterBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId);
                log.info("unregisterBroker OK, NamesrvAddr: {}", namesrvAddr);
            } catch (Exception e) {
                log.warn("unregisterBroker Exception, {}", namesrvAddr, e);
            }
        }
    }
}

這個(gè)方法里,會(huì)獲取到所有的nameServer,然后逐個(gè)發(fā)送注銷(xiāo)消息,繼續(xù)進(jìn)入BrokerOuterAPI#unregisterBroker方法:

public void unregisterBroker(
    final String namesrvAddr,
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId
) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, 
        InterruptedException, MQBrokerException {
    UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader();
    requestHeader.setBrokerAddr(brokerAddr);
    requestHeader.setBrokerId(brokerId);
    requestHeader.setBrokerName(brokerName);
    requestHeader.setClusterName(clusterName);
    // 發(fā)送的注銷(xiāo)消息:RequestCode.UNREGISTER_BROKER
    RemotingCommand request = RemotingCommand.createRequestCommand(
            c, requestHeader);
    RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, 3000);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            return;
        }
        default:
            break;
    }
    throw new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr);
}

最終調(diào)用的是RemotingClient#invokeSync進(jìn)行消息發(fā)送,請(qǐng)求codeRequestCode.UNREGISTER_BROKER,這就與NameServer接收broker的注銷(xiāo)消息對(duì)應(yīng)上了。

3. 啟動(dòng)Broker:start(...)

我們?cè)賮?lái)看看Broker的啟動(dòng)流程,處理方法為BrokerController#start

public void start() throws Exception {
    // 啟動(dòng)各組件
    // 啟動(dòng)消息存儲(chǔ)相關(guān)組件
    if (this.messageStore != null) {
        this.messageStore.start();
    }
    // 啟動(dòng) remotingServer,其實(shí)就是啟動(dòng)一個(gè)netty服務(wù),用來(lái)接收producer傳來(lái)的消息
    if (this.remotingServer != null) {
        this.remotingServer.start();
    }
    ...
    // broker對(duì)外發(fā)放消息的組件,向nameServer上報(bào)存活消息時(shí)使用了它,也是一個(gè)netty服務(wù)
    if (this.brokerOuterAPI != null) {
        this.brokerOuterAPI.start();
    }
    ...
    // broker 核心的心跳注冊(cè)任務(wù)
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                BrokerController.this.registerBrokerAll(true, false, 
                    brokerConfig.isForceRegister());
            } catch (Throwable e) {
                log.error("registerBrokerAll Exception", e);
            }
        }
        // brokerConfig.getRegisterNameServerPeriod() 值為 1000 * 30,最終計(jì)算得到默認(rèn)30秒執(zhí)行一次
    }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), 
            TimeUnit.MILLISECONDS);
    ...
}

這個(gè)方法主要就是啟動(dòng)各組件了,這里列出了幾大重要組件的啟動(dòng):

  • messageStore:消息存儲(chǔ)組件,在這個(gè)組件里,會(huì)啟動(dòng)消息存儲(chǔ)相關(guān)的線程,如消息的投遞操作、commitLog文件的flush操作、comsumeQueue文件的flush操作等
  • remotingServernetty服務(wù),用來(lái)接收請(qǐng)求消息,如producer發(fā)送過(guò)來(lái)的消息
  • brokerOuterAPI:也是一個(gè)netty服務(wù),用來(lái)對(duì)外發(fā)送消息,如向nameServer上報(bào)心跳消息
  • 啟動(dòng)定時(shí)任務(wù):brokernameServer發(fā)送注冊(cè)消息

這里我們重點(diǎn)來(lái)看定時(shí)任務(wù)是如何發(fā)送心跳發(fā)送的。

處理注冊(cè)消息發(fā)送的時(shí)間間隔如下:

Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)

這行代碼看著長(zhǎng),但意思就一句話(huà):時(shí)間間隔可以自行配置,但不能小于10s,不能大于60s,默認(rèn)是30s.

處理消息注冊(cè)的方法為BrokerController#registerBrokerAll(...),代碼如下:

public synchronized void registerBrokerAll(final boolean checkOrderConfig, 
        boolean oneway, boolean forceRegister) {
    TopicConfigSerializeWrapper topicConfigWrapper 
            = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
    // 處理topic相關(guān)配置
    if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
        || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
        ...
    }
    // 這里會(huì)判斷是否需要進(jìn)行注冊(cè)
    if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
        this.getBrokerAddr(),
        this.brokerConfig.getBrokerName(),
        this.brokerConfig.getBrokerId(),
        this.brokerConfig.getRegisterBrokerTimeoutMills())) {
        // 進(jìn)行注冊(cè)操作    
        doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
    }
}

這個(gè)方法就是用來(lái)處理注冊(cè)操作的,不過(guò)注冊(cè)前會(huì)先驗(yàn)證下是否需要注冊(cè),驗(yàn)證是否需要注冊(cè)的方法為BrokerController#needRegister, 代碼如下:

private boolean needRegister(final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final int timeoutMills) {
    TopicConfigSerializeWrapper topicConfigWrapper 
        = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
    // 判斷是否需要進(jìn)行注冊(cè)
    List&lt;Boolean&gt; changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, 
        brokerId, topicConfigWrapper, timeoutMills);
    // 有一個(gè)發(fā)生了變化,就表示需要注冊(cè)了    
    boolean needRegister = false;
    for (Boolean changed : changeList) {
        if (changed) {
            needRegister = true;
            break;
        }
    }
    return needRegister;
}

這個(gè)方法調(diào)用了brokerOuterAPI.needRegister(...)來(lái)判斷broker是否發(fā)生了變化,只要一個(gè)NameServer上發(fā)生了變化,就說(shuō)明需要進(jìn)行注冊(cè)操作。

brokerOuterAPI.needRegister(...)是如何判斷broker是否發(fā)生了變化的呢?繼續(xù)跟進(jìn)BrokerOuterAPI#needRegister

public List<Boolean> needRegister(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final int timeoutMills) {
    final List<Boolean> changedList = new CopyOnWriteArrayList<>();
    // 獲取所有的 nameServer
    List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
    if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
        final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
        // 遍歷所有的nameServer,逐一發(fā)送請(qǐng)求
        for (final String namesrvAddr : nameServerAddressList) {
            brokerOuterExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        QueryDataVersionRequestHeader requestHeader 
                            = new QueryDataVersionRequestHeader();
                        ...
                        // 向nameServer發(fā)送消息,命令是 RequestCode.QUERY_DATA_VERSION
                        RemotingCommand request = RemotingCommand
                            .createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);
                        // 把當(dāng)前的 DataVersion 發(fā)到 nameServer     
                        request.setBody(topicConfigWrapper.getDataVersion().encode());
                        // 發(fā)請(qǐng)求到nameServer
                        RemotingCommand response = remotingClient
                            .invokeSync(namesrvAddr, request, timeoutMills);
                        DataVersion nameServerDataVersion = null;
                        Boolean changed = false;
                        switch (response.getCode()) {
                            case ResponseCode.SUCCESS: {
                                QueryDataVersionResponseHeader queryDataVersionResponseHeader =
                                  (QueryDataVersionResponseHeader) response
                                  .decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);
                                changed = queryDataVersionResponseHeader.getChanged();
                                byte[] body = response.getBody();
                                if (body != null) {
                                    // 拿到 DataVersion
                                    nameServerDataVersion = DataVersion.decode(body, D
                                        ataVersion.class);
                                    // 這里是判斷的關(guān)鍵
                                    if (!topicConfigWrapper.getDataVersion()
                                        .equals(nameServerDataVersion)) {
                                        changed = true;
                                    }
                                }
                                if (changed == null || changed) {
                                    changedList.add(Boolean.TRUE);
                                }
                            }
                            default:
                                break;
                        }
                        ...
                    } catch (Exception e) {
                        ...
                    } finally {
                        countDownLatch.countDown();
                    }
                }
            });
        }
        try {
            countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.error("query dataversion from nameserver countDownLatch await Exception", e);
        }
    }
    return changedList;
}

這個(gè)方法里,先是遍歷所有的nameServer,向每個(gè)nameServer都發(fā)送一條codeRequestCode.QUERY_DATA_VERSION的參數(shù),參數(shù)為當(dāng)前brokerDataVersion,當(dāng)nameServer收到消息后,就返回nameServer中保存的、與當(dāng)前broker對(duì)應(yīng)的DataVersion,當(dāng)兩者版本不相等時(shí),就表明當(dāng)前broker發(fā)生了變化,需要重新注冊(cè)。

DataVersion是個(gè)啥呢?它的部分代碼如下:

public class DataVersion extends RemotingSerializable {
    // 時(shí)間戳
    private long timestamp = System.currentTimeMillis();
    // 計(jì)數(shù)器,可以理解為最近的版本號(hào)
    private AtomicLong counter = new AtomicLong(0);
    public void nextVersion() {
        this.timestamp = System.currentTimeMillis();
        this.counter.incrementAndGet();
    }
    /**
     * equals 方法,當(dāng) timestamp 與 counter 都相等時(shí),則兩者相等
     */
    @Override
    public boolean equals(final Object o) {
        if (this == o)
            return true;
        if (o == null || getClass() != o.getClass())
            return false;
        final DataVersion that = (DataVersion) o;
        if (timestamp != that.timestamp) {
            return false;
        }
        if (counter != null && that.counter != null) {
            return counter.longValue() == that.counter.longValue();
        }
        return (null == counter) && (null == that.counter);
    }
    ...
} 

DataVersionequals()方法來(lái)看,只有當(dāng)timestampcounter都相等時(shí),兩個(gè)DataVersion對(duì)象才相等。那這兩個(gè)值會(huì)在哪里被修改呢?從DataVersion#nextVersion方法的調(diào)用情況來(lái)看,引起這兩個(gè)值的變化主要有兩種:

  • broker 上新創(chuàng)建了一個(gè) topic
  • topic的發(fā)了的變化

在這兩種情況下,DataVersion#nextVersion方法被調(diào)用,從而引起DataVersion的改變。DataVersion改變了,就表明當(dāng)前broker需要向nameServer注冊(cè)了。

讓我們?cè)倩氐?code>BrokerController#registerBrokerAll(...)方法:

public synchronized void registerBrokerAll(final boolean checkOrderConfig, 
        boolean oneway, boolean forceRegister) {
    ...
    // 這里會(huì)判斷是否需要進(jìn)行注冊(cè)
    if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
        this.getBrokerAddr(),
        this.brokerConfig.getBrokerName(),
        this.brokerConfig.getBrokerId(),
        this.brokerConfig.getRegisterBrokerTimeoutMills())) {
        // 進(jìn)行注冊(cè)操作    
        doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
    }
}

處理注冊(cè)的方法為BrokerController#doRegisterBrokerAll,稍微看下它的流程:

private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
        TopicConfigSerializeWrapper topicConfigWrapper) {
    // 注冊(cè)
    List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
        this.brokerConfig.getBrokerClusterName(),
        this.getBrokerAddr(),
        this.brokerConfig.getBrokerName(),
        this.brokerConfig.getBrokerId(),
        this.getHAServerAddr(),
        // 這個(gè)對(duì)象里就包含了當(dāng)前broker的版本信息
        topicConfigWrapper,
        this.filterServerManager.buildNewFilterServerList(),
        oneway,
        this.brokerConfig.getRegisterBrokerTimeoutMills(),
        this.brokerConfig.isCompressedRegister());
    ...
}

繼續(xù)跟下去,最終調(diào)用的是BrokerOuterAPI#registerBroker方法:

private RegisterBrokerResult registerBroker(
    final String namesrvAddr,
    final boolean oneway,
    final int timeoutMills,
    final RegisterBrokerRequestHeader requestHeader,
    final byte[] body
) throws RemotingCommandException, MQBrokerException, RemotingConnectException, 
    RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
    // 構(gòu)建請(qǐng)求
    RemotingCommand request = RemotingCommand
        .createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
    request.setBody(body);
    // 處理發(fā)送操作:sendOneWay
    if (oneway) {
        try {
            // 注冊(cè)操作
            this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
        } catch (RemotingTooMuchRequestException e) {
            // Ignore
        }
        return null;
        ...
    }
    ....
}

所以,所謂的注冊(cè)操作,就是當(dāng)nameServer發(fā)送一條codeRequestCode.REGISTER_BROKER的消息,消息里會(huì)帶上當(dāng)前brokertopic信息、版本號(hào)等。

4.總結(jié)

本文主要分析了broker的啟動(dòng)流程,總的來(lái)說(shuō),啟動(dòng)流程分為3個(gè):

  • 解析配置文件,這一步會(huì)解析各種配置,并將其賦值到對(duì)應(yīng)的對(duì)象中
  • BrokerController創(chuàng)建及初始化:創(chuàng)建了BrokerController對(duì)象,并進(jìn)行初始化操作,所謂的初始化,就是加載配置文件中配置、創(chuàng)建線程池、注冊(cè)請(qǐng)求處理器、啟動(dòng)定時(shí)任務(wù)等
  • BrokerController啟動(dòng):這一步是啟動(dòng)broker的核心組件,如messageStore(消息存儲(chǔ))、remotingServer(netty服務(wù),用來(lái)處理producerconsumer請(qǐng)求)、brokerOuterAPI(netty服務(wù),用來(lái)向nameServer上報(bào)當(dāng)前broker信息)等。

在分析啟動(dòng)過(guò)程中,重點(diǎn)分析了兩類(lèi)消息的發(fā)送:

  • ShutdownHook中,broker會(huì)向nameServer發(fā)送注銷(xiāo)消息,這表明在broker關(guān)閉前,nameServer會(huì)清除當(dāng)前broker的注冊(cè)信息

  • broker啟動(dòng)后,會(huì)啟動(dòng)一個(gè)定時(shí)任務(wù),定期判斷是否需要向nameServer注冊(cè),判斷是否需要注冊(cè)時(shí),會(huì)向nameServer發(fā)送codeQUERY_DATA_VERSION的消息,從nameServer得到當(dāng)前broker的版本號(hào),該版本號(hào)與本地版本號(hào)不一致,就表示需要向broker重新注冊(cè)了,即發(fā)送注冊(cè)消息。

參考文章

RocketMQ4.8注釋github地址

以上就是RocketMQ源碼解析broker 啟動(dòng)流程的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ broker啟動(dòng)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 解決idea創(chuàng)建版本時(shí)只有Java21和Java17選項(xiàng)

    解決idea創(chuàng)建版本時(shí)只有Java21和Java17選項(xiàng)

    你是否在使用IntelliJ?IDEA創(chuàng)建新項(xiàng)目時(shí)遇到了只有Java?21和Java?17的選項(xiàng)?別擔(dān)心,我們的指南將為你提供解決方案,通過(guò)簡(jiǎn)單的步驟,你將能夠選擇你需要的任何Java版本,繼續(xù)閱讀,讓我們開(kāi)始吧!
    2024-03-03
  • springboot中的pom文件?project報(bào)錯(cuò)問(wèn)題

    springboot中的pom文件?project報(bào)錯(cuò)問(wèn)題

    這篇文章主要介紹了springboot中的pom文件?project報(bào)錯(cuò)問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-01-01
  • Java日期與時(shí)間類(lèi)原理解析

    Java日期與時(shí)間類(lèi)原理解析

    這篇文章主要介紹了Java日期與時(shí)間類(lèi)原理解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-01-01
  • 使用java實(shí)現(xiàn)備份和恢復(fù)SQLServer表數(shù)據(jù)

    使用java實(shí)現(xiàn)備份和恢復(fù)SQLServer表數(shù)據(jù)

    這篇文章主要為大家詳細(xì)介紹了如何使用java實(shí)現(xiàn)備份和恢復(fù)SQLServer表數(shù)據(jù),文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下
    2024-01-01
  • Java使用Arrays.asList報(bào)UnsupportedOperationException的解決

    Java使用Arrays.asList報(bào)UnsupportedOperationException的解決

    這篇文章主要介紹了Java使用Arrays.asList報(bào)UnsupportedOperationException的解決,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2021-04-04
  • Java Scala之模式匹配與隱式轉(zhuǎn)換

    Java Scala之模式匹配與隱式轉(zhuǎn)換

    在Java中我們有switch case default這三個(gè)組成的基礎(chǔ)語(yǔ)法,在Scala中我們是有match和case組成 default的作用由case代替,本文詳細(xì)介紹了Scala的模式匹配與隱式轉(zhuǎn)換,感興趣的可以參考本文
    2023-04-04
  • Java實(shí)現(xiàn)布隆過(guò)濾器的示例詳解

    Java實(shí)現(xiàn)布隆過(guò)濾器的示例詳解

    布隆過(guò)濾器(Bloom?Filter)是1970年由布隆提出來(lái)的,實(shí)際上是由一個(gè)很長(zhǎng)的二進(jìn)制數(shù)組+一系列hash算法映射函數(shù),用于判斷一個(gè)元素是否存在于集合中。本文主要介紹了Java實(shí)現(xiàn)布隆過(guò)濾器的示例代碼,希望對(duì)大家有所幫助
    2023-03-03
  • springmvc實(shí)現(xiàn)自定義類(lèi)型轉(zhuǎn)換器示例

    springmvc實(shí)現(xiàn)自定義類(lèi)型轉(zhuǎn)換器示例

    本篇文章主要介紹了springmvc實(shí)現(xiàn)自定義類(lèi)型轉(zhuǎn)換器示例,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2017-02-02
  • spring Boot查詢(xún)數(shù)據(jù)分頁(yè)顯示的方法實(shí)例

    spring Boot查詢(xún)數(shù)據(jù)分頁(yè)顯示的方法實(shí)例

    這篇文章主要給大家介紹了關(guān)于spring Boot查詢(xún)數(shù)據(jù)分頁(yè)顯示的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用spring Boot具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2020-08-08
  • springboot使用AOP+反射實(shí)現(xiàn)Excel數(shù)據(jù)的讀取

    springboot使用AOP+反射實(shí)現(xiàn)Excel數(shù)據(jù)的讀取

    本文主要介紹了springboot使用AOP+反射實(shí)現(xiàn)Excel數(shù)據(jù)的讀取,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2022-01-01

最新評(píng)論