RocketMQ之NameServer架構(gòu)設(shè)計及啟動關(guān)閉流程源碼分析
NameServer
1.架構(gòu)設(shè)計
消息中間件的設(shè)計思路一般都是基于主題訂閱與發(fā)布的機制,RocketMQ也不例外。RocketMQ中,消息生產(chǎn)者(Producer)發(fā)送某主題的消息到消息服務(wù)器,消息服務(wù)器對消息進行持久化存儲,而消息消費者(Consumer)訂閱所需要的主題,消息服務(wù)器根據(jù)訂閱信息(路由信息)將消息推送至消息消費者(Push模式)或者消息消費者主動向消息服務(wù)器進行拉?。≒ull模式),從而實現(xiàn)消息生產(chǎn)者與消息消費者之間解耦。
為了避免消息服務(wù)器單點故障而導(dǎo)致的系統(tǒng)癱瘓,消息服務(wù)器常常會集群分布,部署多臺服務(wù)器共同處理消息并且承擔(dān)消息的存儲,消息生產(chǎn)者如何知道要將消息發(fā)送至哪臺服務(wù)器和消息消費者如何知道要從哪臺消息服務(wù)器進行消息的拉取等等問題,都要由NameServer
來處理,其實NameServer
充當(dāng)?shù)慕巧cZookeeper
十分相似。
Broker
消息服務(wù)器啟動時,需要向NameServer
集群進行信息注冊,消息生產(chǎn)者Producer
發(fā)送消息之前主動向NameServer獲取Broker服務(wù)器地址列表,然后根據(jù)負載均衡算法從列表中選出一臺服務(wù)器進行消息的發(fā)送。NameServer與每臺Broker保持長連接,并每隔30s
對Broker存活狀態(tài)進行檢測,如果檢測到Broker宕機并且長時間沒有進行連接重試,則會將該Broker從路由注冊表中刪除,以此保證Broker集群的高可用,但是路由變化不會立馬對生產(chǎn)者進行通知,需要Producer一段時間之后重新向NameServer進行獲取并更新路由信息。這也是NameServer與Zookeeper的不同,NameServer這樣的設(shè)計降低了整個NameServer實現(xiàn)的復(fù)雜度,整個NameServer代碼實現(xiàn)不超過一千行,簡單而高效!
以下是NameServer整個項目預(yù)覽:
可以看到NameServer主要有以下幾個作用:
配置信息管理
請求處理
路由信息管理
2.核心類與配置
NamesrvController
NameserController 是 NameServer 模塊的核心控制類。
private final NamesrvConfig namesrvConfig;//主要指定 nameserver 的相關(guān)配置屬性 private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("NSScheduledThread"));//NameServer定時任務(wù)執(zhí)行線程池-->每隔10s掃描broker,對存活的Broker信息進行維護并且打印KVConfig private final KVConfigManager kvConfigManager;//讀取或變更NameServer的配置屬性,加載 NamesrvConfig中配置到內(nèi)存 private final RouteInfoManager routeInfoManager;//NameServer 數(shù)據(jù)的載體,記錄 Broker、Topic 等信息。 private final NettyServerConfig nettyServerConfig;//與網(wǎng)絡(luò)通訊相關(guān)的配置 private RemotingServer remotingServer;//網(wǎng)絡(luò)通信服務(wù) private ExecutorService remotingExecutor;//網(wǎng)絡(luò)通信服務(wù)
NamesrvConfig
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json"; private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties"; private String productEnvName = "center"; private boolean clusterTest = false; private boolean orderMessageEnable = false;
rocketmqHome
:rocketmq主目錄
kvConfigPath
:NameServer存儲KV配置屬性的持久化路徑
configStorePath
:nameServer默認配置文件路徑
orderMessageEnable
:是否支持順序消息
NettyServerConfig
private int listenPort = 8888; private int serverWorkerThreads = 8; private int serverCallbackExecutorThreads = 0; private int serverSelectorThreads = 3; private int serverOnewaySemaphoreValue = 256; private int serverAsyncSemaphoreValue = 64; private int serverChannelMaxIdleTimeSeconds = 120; private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize; private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize; private boolean serverPooledByteBufAllocatorEnable = true;
listenPort
:NameServer監(jiān)聽端口,該值默認會被初始化為9876
serverWorkerThreads
:Netty業(yè)務(wù)線程池線程個數(shù)
serverCallbackExecutorThreads
:Netty public任務(wù)線程池線程個數(shù),Netty網(wǎng)絡(luò)設(shè)計,根據(jù)業(yè)務(wù)類型會創(chuàng)建不同的線程池,比如處理消息發(fā)送、消息消費、心跳檢測等。
serverSelectorThreads
:IO線程池個數(shù),主要是NameServer、Broker端解析請求、返回相應(yīng)的線程個數(shù),這類線程主要是處理網(wǎng)路請求的,解析請求包,然后轉(zhuǎn)發(fā)到各個業(yè)務(wù)線程池完成具體的操作,然后將結(jié)果返回給調(diào)用方;
serverOnewaySemaphoreValue
:send oneway消息請求;
serverAsyncSemaphoreValue
:異步消息發(fā)送最大并發(fā)數(shù);
serverChannelMaxIdleTimeSeconds
:網(wǎng)絡(luò)連接最大的空閑時間,默認120s。
serverSocketSndBufSize
:網(wǎng)絡(luò)socket發(fā)送端緩沖區(qū)大小。
serverSocketRcvBufSize
: 網(wǎng)絡(luò)socket接收端緩存區(qū)大小。
serverPooledByteBufAllocatorEnable
:ByteBuffer是否開啟緩存;
useEpollNativeSelector
:是否啟用Epoll IO模型。
RouteInfoManager
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2; private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
BROKER_CHANNEL_EXPIRED_TIME:NameServer與Broker空閑連接時長,在2 min
NameServer之內(nèi)沒有收到Broker的心跳包,則NameServer會關(guān)閉與該Broker的連接并刪除Broker的路由信息。
lock
:讀寫鎖,用來保護以下用于存儲關(guān)鍵信息的非線程安全容器HashMap。
topicQueueTable
:用于存儲主題與隊列的映射關(guān)系,記錄一個主題topic的隊列分布在哪些Broker上。以下是QueueData
屬性值:
private String brokerName; //broker名稱 private int readQueueNums; //讀隊列個數(shù) private int writeQueueNums; //寫隊列個數(shù) private int perm; //操作權(quán)限 private int topicSysFlag; //同步復(fù)制還是異步復(fù)制的標(biāo)識
brokerAddrTable:用于記錄所有Broker信息。以下是BrokerData屬性值:
private String cluster; //當(dāng)前Broker所屬集群 private String brokerName; //Broker名稱 //BrokerId=0表示主節(jié)點,BrokerId>0表示從節(jié)點 //記錄BrokerId與對應(yīng)節(jié)點地址的映射信息 private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
clusterAddrTable:用于記錄Broker集群信息
brokerLiveTable:用于記錄活躍狀態(tài)的Broker,NameServer每隔10s對所有Broker進行掃描,如果有Broker宕機,會將該Broker從該表中刪去,以此維護可用的Broker列表信息。以下是BrokerLiveInfo的屬性值:
private long lastUpdateTimestamp; //上次發(fā)送心跳包的時間戳 private DataVersion dataVersion; //記錄數(shù)據(jù)版本信息 private Channel channel; private String haServerAddr; //Master節(jié)點地址
3.啟動與關(guān)閉流程
NameServer啟動時序圖:
啟動類:org.apache.rocketmq.namesrv.NamesrvStartup.java
3.1.步驟一
解析配置文件,填充NamesrvConfig
、NettyServerConfig
并創(chuàng)建NamesrvController
:
啟動類:
public static void main(String[] args) { main0(args); } public static NamesrvController main0(String[] args) { try { //創(chuàng)建NamesrvController的入口 NamesrvController controller = createNamesrvController(args); start(controller); String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); log.info(tip); System.out.printf("%s%n", tip); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); } return null; }
NamesrvController#createNamesrvController:
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException { //.... //創(chuàng)建namesrvConfig final NamesrvConfig namesrvConfig = new NamesrvConfig(); //創(chuàng)建nettyServerConfig final NettyServerConfig nettyServerConfig = new NettyServerConfig(); //設(shè)置默認端口9876 nettyServerConfig.setListenPort(9876); //-c 指定屬性配置文件的位置 if (commandLine.hasOption('c')) { String file = commandLine.getOptionValue('c'); if (file != null) { InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig); namesrvConfig.setConfigStorePath(file); System.out.printf("load config properties file OK, %s%n", file); in.close(); } } //-p 屬性名=屬性值 if (commandLine.hasOption('p')) { InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME); MixAll.printObjectProperties(console, namesrvConfig); MixAll.printObjectProperties(console, nettyServerConfig); System.exit(0); } //將啟動參數(shù)填充到namesrvConfig中 MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig); //如果未指定'ROCKETMQ_HOME'環(huán)境變量 if (null == namesrvConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV); System.exit(-2); } //.... //打印配置信息日志 MixAll.printObjectProperties(log, namesrvConfig); MixAll.printObjectProperties(log, nettyServerConfig); //根據(jù)namesrvConfig和nettyServerConfig創(chuàng)建NamesrvController final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig); // 將配置存入controller.configuration以防止配置丟失 controller.getConfiguration().registerConfig(properties); return controller; }
3.2.步驟二
根據(jù)配置創(chuàng)建好NamesrvController之后,對其進行初始化:
//NamesrvStartup#start public static NamesrvController start(final NamesrvController controller) throws Exception { //進行簡單的檢查 if (null == controller) { throw new IllegalArgumentException("NamesrvController is null"); } //controller初始化 boolean initResult = controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); } //.... controller.start(); //開啟遠程服務(wù)-this.remotingServer.start(); return controller; } //NamesrvController#initialize public boolean initialize() { //加載配置管理器 this.kvConfigManager.load(); //創(chuàng)建Netty遠程服務(wù) this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); //創(chuàng)建遠程服務(wù)線程池 this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); //注冊線程池 this.registerProcessor(); //定時任務(wù)線程池--->每隔十秒掃描活躍狀態(tài)異常的Broker信息 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { /** * 對Not Active Broker 進行掃描 */ @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); //定時任務(wù)線程池--->每隔十秒打印KVConfig信息 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES); //.... return true; }
3.3.步驟三
在JVM進程關(guān)閉之前,先將線程池關(guān)閉,及時釋放資源。
public static NamesrvController start(final NamesrvController controller) throws Exception { //.... //JVM進程關(guān)閉之前,將線程池關(guān)閉,資源釋放 Runtime.getRuntime().addShutdownHook/*注冊JVM鉤子函數(shù)*/(new ShutdownHookThread(log, new Callable<Void>() { @Override public Void call() throws Exception { controller.shutdown(); return null; } })); //.... }
以上僅供個人學(xué)習(xí)使用,如有不足請指正!
以上就是RocketMQ之NameServer架構(gòu)設(shè)計及啟動關(guān)閉流程源碼分析的詳細內(nèi)容,更多關(guān)于RocketMQ之NameServer架構(gòu)設(shè)計及啟動關(guān)閉的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java數(shù)據(jù)類型分類與基本數(shù)據(jù)類型轉(zhuǎn)換
這篇文章主要介紹了Java數(shù)據(jù)類型分類與基本數(shù)據(jù)類型轉(zhuǎn)換,Java的數(shù)據(jù)類型主要分為兩類,基本數(shù)據(jù)類型、引用數(shù)據(jù)類型,下文詳細介紹,感興趣的朋友可以參考一下2022-07-07Java多例Bean的應(yīng)用場景-easyExcel導(dǎo)入
EasyExcel 是一個基于 Java 的簡單、省內(nèi)存的讀寫 Excel 的開源項目。這篇文章主要介紹了用easyExcel導(dǎo)入Java Bean的應(yīng)用場景,感興趣的朋友可以參考閱讀2023-04-04Spring?Mvc中CommonsMultipartFile的特性實例詳解
這篇文章主要給大家介紹了關(guān)于Spring?Mvc中CommonsMultipartFile特性的相關(guān)資料,SpringMVC擁有強大的靈活性,非侵入性和可配置性,文中通過代碼介紹的非常詳細,需要的朋友可以參考下2023-11-11使用JPA自定義VO接收返回結(jié)果集(unwrap)
這篇文章主要介紹了使用JPA自定義VO接收返回結(jié)果集(unwrap),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-11-11springboot2.x使用Jsoup防XSS攻擊的實現(xiàn)
這篇文章主要介紹了springboot2.x使用Jsoup防XSS攻擊的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-04-04springboot如何配置上傳文件的maxRequestSize
這篇文章主要介紹了springboot如何配置上傳文件的maxRequestSize,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03