RocketMQ中的NameServer詳細解析
前言
NameServer是一個非常簡單的Topic路由注冊中心,支持Broker的動態(tài)注冊與發(fā)現(xiàn)。
Producer和Conumser通過NameServer可以知道整個Broker集群的路由信息,從而進行消息的投遞和消費。
NameServer各實例間相互不進行信息通訊,因此不能保證NameServer的一致性(Consistency),可以保證可用性(Availability)。
即選擇了CAP中的AP。NameServer只能保證最終一致性,關于怎么保證最終一致性后文再講。
現(xiàn)在先從NameServer的啟動開始。
NameServer為namesrv模塊

NamesrvStartup
public static NamesrvController main0(String[] args) {
//構造NamesrvController
NamesrvController controller = createNamesrvController(args);
start(controller);
return controller;
}
public static NamesrvController start(final NamesrvController controller) throws Exception {
//初始化
boolean initResult = controller.initialize();
//啟動
controller.start();
return controller;
}
NamesrvStartup作為NameServer的啟動類,主要做了三件事:
- 構造NamesrvController(NameServer控制器)
- 加載初始化,由NamesrvController負責
- 啟動remotingServer,開啟Netty服務
NamesrvController
public class NamesrvController {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
//nameSrv的配置
private final NamesrvConfig namesrvConfig;
//netty的配置
private final NettyServerConfig nettyServerConfig;
//執(zhí)行單線程的任務調(diào)度,自定義編程名稱
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"NSScheduledThread"));
//kv的配置管理
private final KVConfigManager kvConfigManager;
//路由管理器,有broker的ip和隊列信息,producer發(fā)送的queue信息,consumer的pull的queue信息
private final RouteInfoManager routeInfoManager;
//namesrv的netty的服務端實現(xiàn)
private RemotingServer remotingServer;
//處理接受到請求事件的回調(diào)監(jiān)聽服務,主要處理netty的事件
private BrokerHousekeepingService brokerHousekeepingService;
private ExecutorService remotingExecutor;
private Configuration configuration;
private FileWatchService fileWatchService;
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
this.namesrvConfig = namesrvConfig;
this.nettyServerConfig = nettyServerConfig;
//構造kv配置的管理
this.kvConfigManager = new KVConfigManager(this);
//構造路由信息管理
this.routeInfoManager = new RouteInfoManager();
//構造網(wǎng)絡連接事件管理
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
//配置
this.configuration = new Configuration(
log,
this.namesrvConfig, this.nettyServerConfig
);
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}NamesrvController作為一個控制器主要負責NettyServer的創(chuàng)建,注冊requestProcessor,啟動NettyServer及各種task
public boolean initialize() {
//加載kv配置
this.kvConfigManager.load();
//初始化NettyServer
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
//執(zhí)行器,用于接受請求并進行處理
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
//注冊Netty的處理器,即remotingExecutor
this.registerProcessor();
//進行掃描未活躍的Broker的任務
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
//...省略其他代碼
return true;
}
private void registerProcessor() {
if (namesrvConfig.isClusterTest()) {
this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
this.remotingExecutor);
} else {
//注冊request處理器
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
}
}
public void start() throws Exception {
//啟動netty server
this.remotingServer.start();
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}NettyRequestProcessor
NettyRequestProcessor為請求處理器,上一步注冊的處理器,一般使用默認的處理器DefaultRequestProcessor,processRequest方法處理請求。
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
//...
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.QUERY_DATA_VERSION:
return queryBrokerTopicConfig(ctx, request);
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
//...省略其他代碼
}
return null;
}到此這篇關于RocketMQ中的NameServer詳細解析的文章就介紹到這了,更多相關RocketMQ中的NameServer內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
java多線程批量處理百萬級的數(shù)據(jù)方法示例
這篇文章主要介紹了java多線程批量處理百萬級的數(shù)據(jù)的相關資料,文中通過代碼介紹的非常詳細,對大家學習或者使用java多線程具有一定的參考借鑒價值,需要的朋友可以參考下2025-02-02
淺談java中Math.random()與java.util.random()的區(qū)別
下面小編就為大家?guī)硪黄獪\談java中Math.random()與java.util.random()的區(qū)別。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-09-09
POI XSSFSheet shiftRows bug問題解決
這篇文章主要介紹了POI XSSFSheet shiftRows bug問題解決,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-07-07
通過代理類實現(xiàn)java連接數(shù)據(jù)庫(使用dao層操作數(shù)據(jù))實例分享
java通過代理類實現(xiàn)數(shù)據(jù)庫DAO操作代碼分享,大家參考使用吧2013-12-12
spring5 SAXParseException:cvc-elt.1: 找不到元素“beans 的聲明詳解
這篇文章主要給大家介紹了關于spring5 SAXParseException:cvc-elt.1: 找不到元素“beans 聲明的相關資料,需要的朋友可以參考下2020-08-08
Spring Boot mybatis-config 和 log4j 輸出sql 日志的方式
這篇文章主要介紹了Spring Boot mybatis-config 和 log4j 輸出sql 日志的方式,本文通過實例圖文相結(jié)合給大家介紹的非常詳細,需要的朋友可以參考下2021-07-07
RabbitMQ的Direct Exchange模式實現(xiàn)的消息發(fā)布案例(示例代碼)
本文介紹了RabbitMQ的DirectExchange模式下的消息發(fā)布和消費的實現(xiàn),詳細說明了如何在DirectExchange模式中進行消息的發(fā)送和接收,以及消息處理的基本方法,感興趣的朋友跟隨小編一起看看吧2024-09-09

