RocketMQ中的NameServer詳細解析
前言
NameServer是一個非常簡單的Topic路由注冊中心,支持Broker的動態(tài)注冊與發(fā)現(xiàn)。
Producer和Conumser通過NameServer可以知道整個Broker集群的路由信息,從而進行消息的投遞和消費。
NameServer各實例間相互不進行信息通訊,因此不能保證NameServer的一致性(Consistency),可以保證可用性(Availability)。
即選擇了CAP中的AP。NameServer只能保證最終一致性,關(guān)于怎么保證最終一致性后文再講。
現(xiàn)在先從NameServer的啟動開始。
NameServer為namesrv模塊
NamesrvStartup
public static NamesrvController main0(String[] args) { //構(gòu)造NamesrvController NamesrvController controller = createNamesrvController(args); start(controller); return controller; } public static NamesrvController start(final NamesrvController controller) throws Exception { //初始化 boolean initResult = controller.initialize(); //啟動 controller.start(); return controller; }
NamesrvStartup作為NameServer的啟動類,主要做了三件事:
- 構(gòu)造NamesrvController(NameServer控制器)
- 加載初始化,由NamesrvController負責(zé)
- 啟動remotingServer,開啟Netty服務(wù)
NamesrvController
public class NamesrvController { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); //nameSrv的配置 private final NamesrvConfig namesrvConfig; //netty的配置 private final NettyServerConfig nettyServerConfig; //執(zhí)行單線程的任務(wù)調(diào)度,自定義編程名稱 private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( "NSScheduledThread")); //kv的配置管理 private final KVConfigManager kvConfigManager; //路由管理器,有broker的ip和隊列信息,producer發(fā)送的queue信息,consumer的pull的queue信息 private final RouteInfoManager routeInfoManager; //namesrv的netty的服務(wù)端實現(xiàn) private RemotingServer remotingServer; //處理接受到請求事件的回調(diào)監(jiān)聽服務(wù),主要處理netty的事件 private BrokerHousekeepingService brokerHousekeepingService; private ExecutorService remotingExecutor; private Configuration configuration; private FileWatchService fileWatchService; public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) { this.namesrvConfig = namesrvConfig; this.nettyServerConfig = nettyServerConfig; //構(gòu)造kv配置的管理 this.kvConfigManager = new KVConfigManager(this); //構(gòu)造路由信息管理 this.routeInfoManager = new RouteInfoManager(); //構(gòu)造網(wǎng)絡(luò)連接事件管理 this.brokerHousekeepingService = new BrokerHousekeepingService(this); //配置 this.configuration = new Configuration( log, this.namesrvConfig, this.nettyServerConfig ); this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath"); }
NamesrvController作為一個控制器主要負責(zé)NettyServer的創(chuàng)建,注冊requestProcessor,啟動NettyServer及各種task
public boolean initialize() { //加載kv配置 this.kvConfigManager.load(); //初始化NettyServer this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); //執(zhí)行器,用于接受請求并進行處理 this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); //注冊Netty的處理器,即remotingExecutor this.registerProcessor(); //進行掃描未活躍的Broker的任務(wù) this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES); //...省略其他代碼 return true; } private void registerProcessor() { if (namesrvConfig.isClusterTest()) { this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()), this.remotingExecutor); } else { //注冊request處理器 this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor); } } public void start() throws Exception { //啟動netty server this.remotingServer.start(); if (this.fileWatchService != null) { this.fileWatchService.start(); } }
NettyRequestProcessor
NettyRequestProcessor為請求處理器,上一步注冊的處理器,一般使用默認的處理器DefaultRequestProcessor,processRequest方法處理請求。
@Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { //... switch (request.getCode()) { case RequestCode.PUT_KV_CONFIG: return this.putKVConfig(ctx, request); case RequestCode.GET_KV_CONFIG: return this.getKVConfig(ctx, request); case RequestCode.DELETE_KV_CONFIG: return this.deleteKVConfig(ctx, request); case RequestCode.QUERY_DATA_VERSION: return queryBrokerTopicConfig(ctx, request); case RequestCode.REGISTER_BROKER: Version brokerVersion = MQVersion.value2Version(request.getVersion()); if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { return this.registerBrokerWithFilterServer(ctx, request); } else { return this.registerBroker(ctx, request); } //...省略其他代碼 } return null; }
到此這篇關(guān)于RocketMQ中的NameServer詳細解析的文章就介紹到這了,更多相關(guān)RocketMQ中的NameServer內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java多線程批量處理百萬級的數(shù)據(jù)方法示例
這篇文章主要介紹了java多線程批量處理百萬級的數(shù)據(jù)的相關(guān)資料,文中通過代碼介紹的非常詳細,對大家學(xué)習(xí)或者使用java多線程具有一定的參考借鑒價值,需要的朋友可以參考下2025-02-02淺談java中Math.random()與java.util.random()的區(qū)別
下面小編就為大家?guī)硪黄獪\談java中Math.random()與java.util.random()的區(qū)別。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-09-09POI XSSFSheet shiftRows bug問題解決
這篇文章主要介紹了POI XSSFSheet shiftRows bug問題解決,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-07-07通過代理類實現(xiàn)java連接數(shù)據(jù)庫(使用dao層操作數(shù)據(jù))實例分享
java通過代理類實現(xiàn)數(shù)據(jù)庫DAO操作代碼分享,大家參考使用吧2013-12-12spring5 SAXParseException:cvc-elt.1: 找不到元素“beans 的聲明詳解
這篇文章主要給大家介紹了關(guān)于spring5 SAXParseException:cvc-elt.1: 找不到元素“beans 聲明的相關(guān)資料,需要的朋友可以參考下2020-08-08Spring Boot mybatis-config 和 log4j 輸出sql 日志的方式
這篇文章主要介紹了Spring Boot mybatis-config 和 log4j 輸出sql 日志的方式,本文通過實例圖文相結(jié)合給大家介紹的非常詳細,需要的朋友可以參考下2021-07-07RabbitMQ的Direct Exchange模式實現(xiàn)的消息發(fā)布案例(示例代碼)
本文介紹了RabbitMQ的DirectExchange模式下的消息發(fā)布和消費的實現(xiàn),詳細說明了如何在DirectExchange模式中進行消息的發(fā)送和接收,以及消息處理的基本方法,感興趣的朋友跟隨小編一起看看吧2024-09-09