RocketMQ源碼解析broker?啟動流程
1. 啟動入口
本系列RocketMQ4.8注釋github地址,希望對大家有所幫助,要是覺得可以的話麻煩給點一下Star哈
前面我們已經(jīng)分析完了NameServer和producer,從本文開始,我們將分析Broker。
broker的啟動類為org.apache.rocketmq.broker.BrokerStartup,代碼如下:
public class BrokerStartup {
...
public static void main(String[] args) {
start(createBrokerController(args));
}
...
}
在main()方法中,僅有一行代碼,這行代碼包含了兩個操作:
createBrokerController(...):創(chuàng)建BrokerControllerstart(...):啟動Broker
接下來我們就來分析這兩個操作。
2. 創(chuàng)建BrokerController
創(chuàng)建BrokerController的方法為BrokerStartup#createBrokerController,代碼如下:
/**
* 創(chuàng)建 broker 的配置參數(shù)
*/
public static BrokerController createBrokerController(String[] args) {
...
try {
//解析命令行參數(shù)
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
new PosixParser());
if (null == commandLine) {
System.exit(-1);
}
// 處理配置
final BrokerConfig brokerConfig = new BrokerConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
// tls安全相關(guān)
nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
// 配置端口
nettyServerConfig.setListenPort(10911);
// 消息存儲的配置
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
...
// 將命令行中的配置設置到brokerConfig對象中
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
// 檢查環(huán)境變量:ROCKETMQ_HOME
if (null == brokerConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match
the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
//省略一些配置
...
// 創(chuàng)建 brokerController
final BrokerController controller = new BrokerController(
brokerConfig,
nettyServerConfig,
nettyClientConfig,
messageStoreConfig);
controller.getConfiguration().registerConfig(properties);
// 初始化
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
// 關(guān)閉鉤子,在關(guān)閉前處理一些操作
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
private AtomicInteger shutdownTimes = new AtomicInteger(0);
@Override
public void run() {
synchronized (this) {
if (!this.hasShutdown) {
...
// 這里會發(fā)一條注銷消息給nameServer
controller.shutdown();
...
}
}
}
}, "ShutdownHook"));
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
這個方法的代碼有點長,但功能并不多,總的來說就三個功能:
- 處理配置:主要是處理
nettyServerConfig與nettyClientConfig的配置,這塊就是一些配置解析的操作,處理方式與NameServer很類似,這里就不多說了。 - 創(chuàng)建及初始化
controller:調(diào)用方法controller.initialize(),這塊內(nèi)容我們后面分析。 - 注冊關(guān)閉鉤子:調(diào)用
Runtime.getRuntime().addShutdownHook(...),可以在jvm進程關(guān)閉前進行一些操作。
2.1 controller實例化
BrokerController的創(chuàng)建及初始化是在BrokerStartup#createBrokerController方法中進行,我們先來看看它的構(gòu)造方法:
public BrokerController(
final BrokerConfig brokerConfig,
final NettyServerConfig nettyServerConfig,
final NettyClientConfig nettyClientConfig,
final MessageStoreConfig messageStoreConfig
) {
// 4個核心配置信息
this.brokerConfig = brokerConfig;
this.nettyServerConfig = nettyServerConfig;
this.nettyClientConfig = nettyClientConfig;
this.messageStoreConfig = messageStoreConfig;
// 管理consumer消費消息的offset
this.consumerOffsetManager = new ConsumerOffsetManager(this);
// 管理topic配置
this.topicConfigManager = new TopicConfigManager(this);
// 處理 consumer 拉消息請求的
this.pullMessageProcessor = new PullMessageProcessor(this);
this.pullRequestHoldService = new PullRequestHoldService(this);
// 消息送達的監(jiān)聽器
this.messageArrivingListener
= new NotifyMessageArrivingListener(this.pullRequestHoldService);
...
// 往外發(fā)消息的組件
this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
...
}
BrokerController的構(gòu)造方法很長,基本都是一些賦值操作,代碼中已列出關(guān)鍵項,這些包括:
- 核心配置賦值:主要是
brokerConfig/nettyServerConfig/nettyClientConfig/messageStoreConfig四個配置 ConsumerOffsetManager:管理consumer消費消息位置的偏移量,偏移量表示消費者組消費該topic消息的位置,后面再消費時,就從該位置后消費,避免重復消費消息,也避免了漏消費消息。topicConfigManager:topic配置管理器,就是用來管理topic配置的,如topic名稱,topic隊列數(shù)量pullMessageProcessor:消息處理器,用來處理消費者拉消息messageArrivingListener:消息送達的監(jiān)聽器,當生產(chǎn)者的消息送達時,由該監(jiān)聽器監(jiān)聽brokerOuterAPI:往外發(fā)消息的組件,如向NameServer發(fā)送注冊/注銷消息
以上這些組件的用處,這里先混個臉熟,我們后面再分析。
2.2 初始化controller
我們再來看看初始化操作,方法為BrokerController#initialize:
public boolean initialize() throws CloneNotSupportedException {
// 加載配置文件中的配置
boolean result = this.topicConfigManager.load();
result = result && this.consumerOffsetManager.load();
result = result && this.subscriptionGroupManager.load();
result = result && this.consumerFilterManager.load();
if (result) {
try {
// 消息存儲管理組件,管理磁盤上的消息
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager,
this.messageArrivingListener, this.brokerConfig);
// 啟用了DLeger,就創(chuàng)建DLeger相關(guān)組件
if (messageStoreConfig.isEnableDLegerCommitLog()) {
...
}
// broker統(tǒng)計組件
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//load plugin
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig,
brokerStatsManager, messageArrivingListener, brokerConfig);
this.messageStore = MessageStoreFactory.build(context, this.messageStore);
this.messageStore.getDispatcherList().addFirst(
new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
} catch (IOException e) {
result = false;
log.error("Failed to initialize", e);
}
}
// 加載磁盤上的記錄,如commitLog寫入的位置、消費者主題/隊列的信息
result = result && this.messageStore.load();
if (result) {
// 處理 nettyServer
this.remotingServer = new NettyRemotingServer(
this.nettyServerConfig, this.clientHousekeepingService);
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
this.fastRemotingServer = new NettyRemotingServer(
fastConfig, this.clientHousekeepingService);
// 創(chuàng)建線程池start... 這里會創(chuàng)建多種類型的線程池
...
// 處理consumer pull操作的線程池
this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getPullMessageThreadPoolNums(),
this.brokerConfig.getPullMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_"));
...
// 創(chuàng)建線程池end...
// 注冊處理器
this.registerProcessor();
// 啟動定時任務start... 這里會啟動好多的定時任務
...
// 定時將consumer消費到的offset進行持久化操作,即將數(shù)據(jù)保存到磁盤上
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
...
// 啟動定時任務end...
...
// 開啟 DLeger 的一些操作
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
...
}
// 處理tls配置
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
...
}
// 初始化一些操作
initialTransaction();
initialAcl();
initialRpcHooks();
}
return result;
}
這個還是很長,關(guān)鍵部分都做了注釋,該方法所做的工作如下:
- 加載配置文件中的配置
- 賦值與初始化操作
- 創(chuàng)建線程池
- 注冊處理器
- 啟動定時任務
這里我們來看下注冊處理器的操作this.registerProcessor():
2.2.1 注冊處理器:BrokerController#registerProcessor
this.registerProcessor()實際調(diào)用的方法是BrokerController#registerProcessor,代碼如下:
public void registerProcessor() {
/**
* SendMessageProcessor
*/
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
sendProcessor.registerSendMessageHook(sendMessageHookList);
sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor,
this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor,
this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor,
this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor,
this.sendMessageExecutor);
...
/**
* PullMessageProcessor
*/
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor,
this.pullMessageExecutor);
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
/**
* ReplyMessageProcessor
*/
ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);
replyMessageProcessor.registerSendMessageHook(sendMessageHookList);
...
}
這個方法里注冊了許許多多的處理器,這里僅列出了與消息相關(guān)的內(nèi)容,如發(fā)送消息、回復消息、拉取消息等,后面在處理producer/consumer的消息時,就會用到這些處理器,這里先不展開分析。
2.2.2 remotingServer注冊處理器:NettyRemotingServer#registerProcessor
我們來看下remotingServer注冊處理器的操作,方法為NettyRemotingServer#registerProcessor:
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
...
@Override
public void registerProcessor(int requestCode, NettyRequestProcessor processor,
ExecutorService executor) {
ExecutorService executorThis = executor;
if (null == executor) {
executorThis = this.publicExecutor;
}
Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor,
ExecutorService>(processor, executorThis);
// 注冊到processorTable 中
this.processorTable.put(requestCode, pair);
}
...
}
最終,這些處理器注冊到了processorTable中,它是NettyRemotingAbstract的成員變量,定義如下:
HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>>
這是一個hashMap的結(jié)構(gòu),key為code,value為Pair,該類中有兩個成員變量:NettyRequestProcessor、ExecutorService,code與NettyRequestProcessor的映射關(guān)系就是在hashMap里存儲的。
2.3 注冊關(guān)閉鉤子:Runtime.getRuntime().addShutdownHook(...)
接著我們來看看注冊關(guān)閉鉤子的操作:
// 關(guān)閉鉤子,在關(guān)閉前處理一些操作
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
private AtomicInteger shutdownTimes = new AtomicInteger(0);
@Override
public void run() {
synchronized (this) {
if (!this.hasShutdown) {
...
// 這里會發(fā)一條注銷消息給nameServer
controller.shutdown();
...
}
}
}
}, "ShutdownHook"));
跟進BrokerController#shutdown方法:
public void shutdown() {
// 調(diào)用各組件的shutdown方法
...
// 發(fā)送注銷消息到NameServer
this.unregisterBrokerAll();
...
// 持久化consumer的消費偏移量
this.consumerOffsetManager.persist();
// 又是調(diào)用各組件的shutdown方法
...
這個方法里會調(diào)用各組件的shutdown()方法、發(fā)送注銷消息給NameServer、持久化consumer的消費偏移量,這里我們主要看發(fā)送注銷消息的方法BrokerController#unregisterBrokerAll:
private void unregisterBrokerAll() {
// 發(fā)送一條注銷消息給nameServer
this.brokerOuterAPI.unregisterBrokerAll(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId());
}
繼續(xù)進入BrokerOuterAPI#unregisterBrokerAll:
public void unregisterBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId
) {
// 獲取所有的 nameServer,遍歷發(fā)送注銷消息
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null) {
for (String namesrvAddr : nameServerAddressList) {
try {
this.unregisterBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId);
log.info("unregisterBroker OK, NamesrvAddr: {}", namesrvAddr);
} catch (Exception e) {
log.warn("unregisterBroker Exception, {}", namesrvAddr, e);
}
}
}
}
這個方法里,會獲取到所有的nameServer,然后逐個發(fā)送注銷消息,繼續(xù)進入BrokerOuterAPI#unregisterBroker方法:
public void unregisterBroker(
final String namesrvAddr,
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId
) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException, MQBrokerException {
UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
// 發(fā)送的注銷消息:RequestCode.UNREGISTER_BROKER
RemotingCommand request = RemotingCommand.createRequestCommand(
c, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, 3000);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr);
}
最終調(diào)用的是RemotingClient#invokeSync進行消息發(fā)送,請求code是RequestCode.UNREGISTER_BROKER,這就與NameServer接收broker的注銷消息對應上了。
3. 啟動Broker:start(...)
我們再來看看Broker的啟動流程,處理方法為BrokerController#start:
public void start() throws Exception {
// 啟動各組件
// 啟動消息存儲相關(guān)組件
if (this.messageStore != null) {
this.messageStore.start();
}
// 啟動 remotingServer,其實就是啟動一個netty服務,用來接收producer傳來的消息
if (this.remotingServer != null) {
this.remotingServer.start();
}
...
// broker對外發(fā)放消息的組件,向nameServer上報存活消息時使用了它,也是一個netty服務
if (this.brokerOuterAPI != null) {
this.brokerOuterAPI.start();
}
...
// broker 核心的心跳注冊任務
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false,
brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
// brokerConfig.getRegisterNameServerPeriod() 值為 1000 * 30,最終計算得到默認30秒執(zhí)行一次
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)),
TimeUnit.MILLISECONDS);
...
}
這個方法主要就是啟動各組件了,這里列出了幾大重要組件的啟動:
messageStore:消息存儲組件,在這個組件里,會啟動消息存儲相關(guān)的線程,如消息的投遞操作、commitLog文件的flush操作、comsumeQueue文件的flush操作等remotingServer:netty服務,用來接收請求消息,如producer發(fā)送過來的消息brokerOuterAPI:也是一個netty服務,用來對外發(fā)送消息,如向nameServer上報心跳消息- 啟動定時任務:
broker向nameServer發(fā)送注冊消息
這里我們重點來看定時任務是如何發(fā)送心跳發(fā)送的。
處理注冊消息發(fā)送的時間間隔如下:
Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)
這行代碼看著長,但意思就一句話:時間間隔可以自行配置,但不能小于10s,不能大于60s,默認是30s.
處理消息注冊的方法為BrokerController#registerBrokerAll(...),代碼如下:
public synchronized void registerBrokerAll(final boolean checkOrderConfig,
boolean oneway, boolean forceRegister) {
TopicConfigSerializeWrapper topicConfigWrapper
= this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
// 處理topic相關(guān)配置
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
...
}
// 這里會判斷是否需要進行注冊
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getRegisterBrokerTimeoutMills())) {
// 進行注冊操作
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
}
}
這個方法就是用來處理注冊操作的,不過注冊前會先驗證下是否需要注冊,驗證是否需要注冊的方法為BrokerController#needRegister, 代碼如下:
private boolean needRegister(final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final int timeoutMills) {
TopicConfigSerializeWrapper topicConfigWrapper
= this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
// 判斷是否需要進行注冊
List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName,
brokerId, topicConfigWrapper, timeoutMills);
// 有一個發(fā)生了變化,就表示需要注冊了
boolean needRegister = false;
for (Boolean changed : changeList) {
if (changed) {
needRegister = true;
break;
}
}
return needRegister;
}
這個方法調(diào)用了brokerOuterAPI.needRegister(...)來判斷broker是否發(fā)生了變化,只要一個NameServer上發(fā)生了變化,就說明需要進行注冊操作。
brokerOuterAPI.needRegister(...)是如何判斷broker是否發(fā)生了變化的呢?繼續(xù)跟進BrokerOuterAPI#needRegister:
public List<Boolean> needRegister(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final TopicConfigSerializeWrapper topicConfigWrapper,
final int timeoutMills) {
final List<Boolean> changedList = new CopyOnWriteArrayList<>();
// 獲取所有的 nameServer
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
// 遍歷所有的nameServer,逐一發(fā)送請求
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
QueryDataVersionRequestHeader requestHeader
= new QueryDataVersionRequestHeader();
...
// 向nameServer發(fā)送消息,命令是 RequestCode.QUERY_DATA_VERSION
RemotingCommand request = RemotingCommand
.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);
// 把當前的 DataVersion 發(fā)到 nameServer
request.setBody(topicConfigWrapper.getDataVersion().encode());
// 發(fā)請求到nameServer
RemotingCommand response = remotingClient
.invokeSync(namesrvAddr, request, timeoutMills);
DataVersion nameServerDataVersion = null;
Boolean changed = false;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
QueryDataVersionResponseHeader queryDataVersionResponseHeader =
(QueryDataVersionResponseHeader) response
.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);
changed = queryDataVersionResponseHeader.getChanged();
byte[] body = response.getBody();
if (body != null) {
// 拿到 DataVersion
nameServerDataVersion = DataVersion.decode(body, D
ataVersion.class);
// 這里是判斷的關(guān)鍵
if (!topicConfigWrapper.getDataVersion()
.equals(nameServerDataVersion)) {
changed = true;
}
}
if (changed == null || changed) {
changedList.add(Boolean.TRUE);
}
}
default:
break;
}
...
} catch (Exception e) {
...
} finally {
countDownLatch.countDown();
}
}
});
}
try {
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("query dataversion from nameserver countDownLatch await Exception", e);
}
}
return changedList;
}
這個方法里,先是遍歷所有的nameServer,向每個nameServer都發(fā)送一條code為RequestCode.QUERY_DATA_VERSION的參數(shù),參數(shù)為當前broker的DataVersion,當nameServer收到消息后,就返回nameServer中保存的、與當前broker對應的DataVersion,當兩者版本不相等時,就表明當前broker發(fā)生了變化,需要重新注冊。
DataVersion是個啥呢?它的部分代碼如下:
public class DataVersion extends RemotingSerializable {
// 時間戳
private long timestamp = System.currentTimeMillis();
// 計數(shù)器,可以理解為最近的版本號
private AtomicLong counter = new AtomicLong(0);
public void nextVersion() {
this.timestamp = System.currentTimeMillis();
this.counter.incrementAndGet();
}
/**
* equals 方法,當 timestamp 與 counter 都相等時,則兩者相等
*/
@Override
public boolean equals(final Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
final DataVersion that = (DataVersion) o;
if (timestamp != that.timestamp) {
return false;
}
if (counter != null && that.counter != null) {
return counter.longValue() == that.counter.longValue();
}
return (null == counter) && (null == that.counter);
}
...
}
從DataVersion的equals()方法來看,只有當timestamp與counter都相等時,兩個DataVersion對象才相等。那這兩個值會在哪里被修改呢?從DataVersion#nextVersion方法的調(diào)用情況來看,引起這兩個值的變化主要有兩種:
broker上新創(chuàng)建了一個topictopic的發(fā)了的變化
在這兩種情況下,DataVersion#nextVersion方法被調(diào)用,從而引起DataVersion的改變。DataVersion改變了,就表明當前broker需要向nameServer注冊了。
讓我們再回到BrokerController#registerBrokerAll(...)方法:
public synchronized void registerBrokerAll(final boolean checkOrderConfig,
boolean oneway, boolean forceRegister) {
...
// 這里會判斷是否需要進行注冊
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getRegisterBrokerTimeoutMills())) {
// 進行注冊操作
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
}
}
處理注冊的方法為BrokerController#doRegisterBrokerAll,稍微看下它的流程:
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
TopicConfigSerializeWrapper topicConfigWrapper) {
// 注冊
List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.getHAServerAddr(),
// 這個對象里就包含了當前broker的版本信息
topicConfigWrapper,
this.filterServerManager.buildNewFilterServerList(),
oneway,
this.brokerConfig.getRegisterBrokerTimeoutMills(),
this.brokerConfig.isCompressedRegister());
...
}
繼續(xù)跟下去,最終調(diào)用的是BrokerOuterAPI#registerBroker方法:
private RegisterBrokerResult registerBroker(
final String namesrvAddr,
final boolean oneway,
final int timeoutMills,
final RegisterBrokerRequestHeader requestHeader,
final byte[] body
) throws RemotingCommandException, MQBrokerException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
// 構(gòu)建請求
RemotingCommand request = RemotingCommand
.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
request.setBody(body);
// 處理發(fā)送操作:sendOneWay
if (oneway) {
try {
// 注冊操作
this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
} catch (RemotingTooMuchRequestException e) {
// Ignore
}
return null;
...
}
....
}
所以,所謂的注冊操作,就是當nameServer發(fā)送一條code為RequestCode.REGISTER_BROKER的消息,消息里會帶上當前broker的topic信息、版本號等。
4.總結(jié)
本文主要分析了broker的啟動流程,總的來說,啟動流程分為3個:
- 解析配置文件,這一步會解析各種配置,并將其賦值到對應的對象中
BrokerController創(chuàng)建及初始化:創(chuàng)建了BrokerController對象,并進行初始化操作,所謂的初始化,就是加載配置文件中配置、創(chuàng)建線程池、注冊請求處理器、啟動定時任務等BrokerController啟動:這一步是啟動broker的核心組件,如messageStore(消息存儲)、remotingServer(netty服務,用來處理producer與consumer請求)、brokerOuterAPI(netty服務,用來向nameServer上報當前broker信息)等。
在分析啟動過程中,重點分析了兩類消息的發(fā)送:
在
ShutdownHook中,broker會向nameServer發(fā)送注銷消息,這表明在broker關(guān)閉前,nameServer會清除當前broker的注冊信息broker啟動后,會啟動一個定時任務,定期判斷是否需要向nameServer注冊,判斷是否需要注冊時,會向nameServer發(fā)送code為QUERY_DATA_VERSION的消息,從nameServer得到當前broker的版本號,該版本號與本地版本號不一致,就表示需要向broker重新注冊了,即發(fā)送注冊消息。
參考文章
以上就是RocketMQ源碼解析broker 啟動流程的詳細內(nèi)容,更多關(guān)于RocketMQ broker啟動的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
解決idea創(chuàng)建版本時只有Java21和Java17選項
你是否在使用IntelliJ?IDEA創(chuàng)建新項目時遇到了只有Java?21和Java?17的選項?別擔心,我們的指南將為你提供解決方案,通過簡單的步驟,你將能夠選擇你需要的任何Java版本,繼續(xù)閱讀,讓我們開始吧!2024-03-03
使用java實現(xiàn)備份和恢復SQLServer表數(shù)據(jù)
這篇文章主要為大家詳細介紹了如何使用java實現(xiàn)備份和恢復SQLServer表數(shù)據(jù),文中的示例代碼講解詳細,感興趣的小伙伴可以跟隨小編一起學習一下2024-01-01
Java使用Arrays.asList報UnsupportedOperationException的解決
這篇文章主要介紹了Java使用Arrays.asList報UnsupportedOperationException的解決,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2021-04-04
springmvc實現(xiàn)自定義類型轉(zhuǎn)換器示例
本篇文章主要介紹了springmvc實現(xiàn)自定義類型轉(zhuǎn)換器示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-02-02
spring Boot查詢數(shù)據(jù)分頁顯示的方法實例
這篇文章主要給大家介紹了關(guān)于spring Boot查詢數(shù)據(jù)分頁顯示的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家學習或者使用spring Boot具有一定的參考學習價值,需要的朋友們下面來一起學習學習吧2020-08-08
springboot使用AOP+反射實現(xiàn)Excel數(shù)據(jù)的讀取
本文主要介紹了springboot使用AOP+反射實現(xiàn)Excel數(shù)據(jù)的讀取,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-01-01

