欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ之NameServer架構(gòu)設(shè)計及啟動關(guān)閉流程源碼分析

 更新時間:2021年11月15日 14:07:28   作者:又蠢又笨的懶羊羊程序猿  
這篇文章主要為大家介紹了RocketMQ之NameServer架構(gòu)設(shè)計及啟動關(guān)閉流程源碼分析詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步

NameServer

1.架構(gòu)設(shè)計

消息中間件的設(shè)計思路一般都是基于主題訂閱與發(fā)布的機制,RocketMQ也不例外。RocketMQ中,消息生產(chǎn)者(Producer)發(fā)送某主題的消息到消息服務(wù)器,消息服務(wù)器對消息進行持久化存儲,而消息消費者(Consumer)訂閱所需要的主題,消息服務(wù)器根據(jù)訂閱信息(路由信息)將消息推送至消息消費者(Push模式)或者消息消費者主動向消息服務(wù)器進行拉?。≒ull模式),從而實現(xiàn)消息生產(chǎn)者與消息消費者之間解耦。

為了避免消息服務(wù)器單點故障而導(dǎo)致的系統(tǒng)癱瘓,消息服務(wù)器常常會集群分布,部署多臺服務(wù)器共同處理消息并且承擔(dān)消息的存儲,消息生產(chǎn)者如何知道要將消息發(fā)送至哪臺服務(wù)器和消息消費者如何知道要從哪臺消息服務(wù)器進行消息的拉取等等問題,都要由NameServer來處理,其實NameServer充當(dāng)?shù)慕巧cZookeeper十分相似。

在這里插入圖片描述

Broker消息服務(wù)器啟動時,需要向NameServer集群進行信息注冊,消息生產(chǎn)者Producer發(fā)送消息之前主動向NameServer獲取Broker服務(wù)器地址列表,然后根據(jù)負載均衡算法從列表中選出一臺服務(wù)器進行消息的發(fā)送。NameServer與每臺Broker保持長連接,并每隔30s對Broker存活狀態(tài)進行檢測,如果檢測到Broker宕機并且長時間沒有進行連接重試,則會將該Broker從路由注冊表中刪除,以此保證Broker集群的高可用,但是路由變化不會立馬對生產(chǎn)者進行通知,需要Producer一段時間之后重新向NameServer進行獲取并更新路由信息。這也是NameServer與Zookeeper的不同,NameServer這樣的設(shè)計降低了整個NameServer實現(xiàn)的復(fù)雜度,整個NameServer代碼實現(xiàn)不超過一千行,簡單而高效!

以下是NameServer整個項目預(yù)覽:

在這里插入圖片描述

可以看到NameServer主要有以下幾個作用:

配置信息管理

請求處理

路由信息管理

2.核心類與配置

NamesrvController

NameserController 是 NameServer 模塊的核心控制類。

private final NamesrvConfig namesrvConfig;//主要指定 nameserver 的相關(guān)配置屬性
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("NSScheduledThread"));//NameServer定時任務(wù)執(zhí)行線程池-->每隔10s掃描broker,對存活的Broker信息進行維護并且打印KVConfig
private final KVConfigManager kvConfigManager;//讀取或變更NameServer的配置屬性,加載 NamesrvConfig中配置到內(nèi)存
private final RouteInfoManager routeInfoManager;//NameServer 數(shù)據(jù)的載體,記錄 Broker、Topic 等信息。

private final NettyServerConfig nettyServerConfig;//與網(wǎng)絡(luò)通訊相關(guān)的配置
private RemotingServer remotingServer;//網(wǎng)絡(luò)通信服務(wù)
private ExecutorService remotingExecutor;//網(wǎng)絡(luò)通信服務(wù)

NamesrvConfig

private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
private String productEnvName = "center";
private boolean clusterTest = false;
private boolean orderMessageEnable = false;

rocketmqHome:rocketmq主目錄

kvConfigPath:NameServer存儲KV配置屬性的持久化路徑

configStorePath:nameServer默認配置文件路徑

orderMessageEnable:是否支持順序消息

NettyServerConfig

private int listenPort = 8888;
private int serverWorkerThreads = 8;
private int serverCallbackExecutorThreads = 0;
private int serverSelectorThreads = 3;
private int serverOnewaySemaphoreValue = 256;
private int serverAsyncSemaphoreValue = 64;
private int serverChannelMaxIdleTimeSeconds = 120;

private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
private boolean serverPooledByteBufAllocatorEnable = true;

listenPort:NameServer監(jiān)聽端口,該值默認會被初始化為9876
serverWorkerThreads:Netty業(yè)務(wù)線程池線程個數(shù)
serverCallbackExecutorThreads:Netty public任務(wù)線程池線程個數(shù),Netty網(wǎng)絡(luò)設(shè)計,根據(jù)業(yè)務(wù)類型會創(chuàng)建不同的線程池,比如處理消息發(fā)送、消息消費、心跳檢測等。
serverSelectorThreads:IO線程池個數(shù),主要是NameServer、Broker端解析請求、返回相應(yīng)的線程個數(shù),這類線程主要是處理網(wǎng)路請求的,解析請求包,然后轉(zhuǎn)發(fā)到各個業(yè)務(wù)線程池完成具體的操作,然后將結(jié)果返回給調(diào)用方;
serverOnewaySemaphoreValue:send oneway消息請求;
serverAsyncSemaphoreValue:異步消息發(fā)送最大并發(fā)數(shù);
serverChannelMaxIdleTimeSeconds :網(wǎng)絡(luò)連接最大的空閑時間,默認120s。
serverSocketSndBufSize:網(wǎng)絡(luò)socket發(fā)送端緩沖區(qū)大小。
serverSocketRcvBufSize: 網(wǎng)絡(luò)socket接收端緩存區(qū)大小。
serverPooledByteBufAllocatorEnable:ByteBuffer是否開啟緩存;
useEpollNativeSelector:是否啟用Epoll IO模型。

RouteInfoManager

private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

BROKER_CHANNEL_EXPIRED_TIME:NameServer與Broker空閑連接時長,在2 minNameServer之內(nèi)沒有收到Broker的心跳包,則NameServer會關(guān)閉與該Broker的連接并刪除Broker的路由信息。

lock:讀寫鎖,用來保護以下用于存儲關(guān)鍵信息的非線程安全容器HashMap。

topicQueueTable:用于存儲主題與隊列的映射關(guān)系,記錄一個主題topic的隊列分布在哪些Broker上。以下是QueueData屬性值:

private String brokerName;			//broker名稱
private int readQueueNums;			//讀隊列個數(shù)
private int writeQueueNums;			//寫隊列個數(shù)
private int perm;				   //操作權(quán)限	
private int topicSysFlag;			//同步復(fù)制還是異步復(fù)制的標(biāo)識

brokerAddrTable:用于記錄所有Broker信息。以下是BrokerData屬性值:

private String cluster;				//當(dāng)前Broker所屬集群
private String brokerName;			//Broker名稱
//BrokerId=0表示主節(jié)點,BrokerId>0表示從節(jié)點
//記錄BrokerId與對應(yīng)節(jié)點地址的映射信息
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;

clusterAddrTable:用于記錄Broker集群信息

brokerLiveTable:用于記錄活躍狀態(tài)的Broker,NameServer每隔10s對所有Broker進行掃描,如果有Broker宕機,會將該Broker從該表中刪去,以此維護可用的Broker列表信息。以下是BrokerLiveInfo的屬性值:

private long lastUpdateTimestamp;		//上次發(fā)送心跳包的時間戳
private DataVersion dataVersion;		//記錄數(shù)據(jù)版本信息
private Channel channel;
private String haServerAddr;			//Master節(jié)點地址

3.啟動與關(guān)閉流程

NameServer啟動時序圖:

在這里插入圖片描述

啟動類:org.apache.rocketmq.namesrv.NamesrvStartup.java

3.1.步驟一

解析配置文件,填充NamesrvConfig、NettyServerConfig并創(chuàng)建NamesrvController

啟動類:

public static void main(String[] args) {
    main0(args);
}
public static NamesrvController main0(String[] args) {

    try {
        //創(chuàng)建NamesrvController的入口
        NamesrvController controller = createNamesrvController(args);
        start(controller);
        String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
        log.info(tip);
        System.out.printf("%s%n", tip);
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }
    return null;
}

NamesrvController#createNamesrvController:

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
    //....    
    //創(chuàng)建namesrvConfig
    final NamesrvConfig namesrvConfig = new NamesrvConfig();
    //創(chuàng)建nettyServerConfig
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();
    
    //設(shè)置默認端口9876
    nettyServerConfig.setListenPort(9876);
    //-c 指定屬性配置文件的位置
    if (commandLine.hasOption('c')) {
        String file = commandLine.getOptionValue('c');
        if (file != null) {
            InputStream in = new BufferedInputStream(new FileInputStream(file));
            properties = new Properties();
            properties.load(in);
            MixAll.properties2Object(properties, namesrvConfig);
            MixAll.properties2Object(properties, nettyServerConfig);

            namesrvConfig.setConfigStorePath(file);

            System.out.printf("load config properties file OK, %s%n", file);
            in.close();
        }
    }
    //-p 屬性名=屬性值
    if (commandLine.hasOption('p')) {
        InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
        MixAll.printObjectProperties(console, namesrvConfig);
        MixAll.printObjectProperties(console, nettyServerConfig);
        System.exit(0);
    }
    //將啟動參數(shù)填充到namesrvConfig中
    MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

    //如果未指定'ROCKETMQ_HOME'環(huán)境變量
    if (null == namesrvConfig.getRocketmqHome()) {
        System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
        System.exit(-2);
    }
	//....
    //打印配置信息日志
    MixAll.printObjectProperties(log, namesrvConfig);
    MixAll.printObjectProperties(log, nettyServerConfig);
    //根據(jù)namesrvConfig和nettyServerConfig創(chuàng)建NamesrvController
    final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
    // 將配置存入controller.configuration以防止配置丟失
    controller.getConfiguration().registerConfig(properties);
    return controller;
}

3.2.步驟二

根據(jù)配置創(chuàng)建好NamesrvController之后,對其進行初始化:

//NamesrvStartup#start
public static NamesrvController start(final NamesrvController controller) throws Exception {

    //進行簡單的檢查
    if (null == controller) {
        throw new IllegalArgumentException("NamesrvController is null");
    }	
    //controller初始化
    boolean initResult = controller.initialize();
    if (!initResult) {
        controller.shutdown();
        System.exit(-3);
    }    
    //....
    controller.start();		//開啟遠程服務(wù)-this.remotingServer.start();

    return controller;
}
//NamesrvController#initialize
public boolean initialize() {

    	//加載配置管理器
        this.kvConfigManager.load();
		//創(chuàng)建Netty遠程服務(wù)
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
		//創(chuàng)建遠程服務(wù)線程池
        this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

        //注冊線程池
        this.registerProcessor();
        //定時任務(wù)線程池--->每隔十秒掃描活躍狀態(tài)異常的Broker信息
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            /**
             * 對Not Active Broker 進行掃描
             */
            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);
        //定時任務(wù)線程池--->每隔十秒打印KVConfig信息
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);
       //....
        return true;
    }

3.3.步驟三

在JVM進程關(guān)閉之前,先將線程池關(guān)閉,及時釋放資源。

public static NamesrvController start(final NamesrvController controller) throws Exception {
	//....    
    //JVM進程關(guān)閉之前,將線程池關(guān)閉,資源釋放
    Runtime.getRuntime().addShutdownHook/*注冊JVM鉤子函數(shù)*/(new ShutdownHookThread(log, new Callable<Void>() {
        @Override
        public Void call() throws Exception {
            controller.shutdown();
            return null;
        }
    }));	
    //....
}

以上僅供個人學(xué)習(xí)使用,如有不足請指正!

以上就是RocketMQ之NameServer架構(gòu)設(shè)計及啟動關(guān)閉流程源碼分析的詳細內(nèi)容,更多關(guān)于RocketMQ之NameServer架構(gòu)設(shè)計及啟動關(guān)閉的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Arthas-java程序運行時debug工具使用

    Arthas-java程序運行時debug工具使用

    這篇文章主要介紹了Arthas-java程序運行時debug工具使用,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-11-11
  • Java數(shù)據(jù)類型分類與基本數(shù)據(jù)類型轉(zhuǎn)換

    Java數(shù)據(jù)類型分類與基本數(shù)據(jù)類型轉(zhuǎn)換

    這篇文章主要介紹了Java數(shù)據(jù)類型分類與基本數(shù)據(jù)類型轉(zhuǎn)換,Java的數(shù)據(jù)類型主要分為兩類,基本數(shù)據(jù)類型、引用數(shù)據(jù)類型,下文詳細介紹,感興趣的朋友可以參考一下
    2022-07-07
  • Java多例Bean的應(yīng)用場景-easyExcel導(dǎo)入

    Java多例Bean的應(yīng)用場景-easyExcel導(dǎo)入

    EasyExcel 是一個基于 Java 的簡單、省內(nèi)存的讀寫 Excel 的開源項目。這篇文章主要介紹了用easyExcel導(dǎo)入Java Bean的應(yīng)用場景,感興趣的朋友可以參考閱讀
    2023-04-04
  • Spring?Mvc中CommonsMultipartFile的特性實例詳解

    Spring?Mvc中CommonsMultipartFile的特性實例詳解

    這篇文章主要給大家介紹了關(guān)于Spring?Mvc中CommonsMultipartFile特性的相關(guān)資料,SpringMVC擁有強大的靈活性,非侵入性和可配置性,文中通過代碼介紹的非常詳細,需要的朋友可以參考下
    2023-11-11
  • 使用JPA自定義VO接收返回結(jié)果集(unwrap)

    使用JPA自定義VO接收返回結(jié)果集(unwrap)

    這篇文章主要介紹了使用JPA自定義VO接收返回結(jié)果集(unwrap),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-11-11
  • java 驗證碼的生成實現(xiàn)

    java 驗證碼的生成實現(xiàn)

    這篇文章主要介紹了java 驗證碼的生成實現(xiàn)的相關(guān)資料,需要的朋友可以參考下
    2017-08-08
  • 淺談fastjson的常用使用方法

    淺談fastjson的常用使用方法

    下面小編就為大家?guī)硪黄獪\談fastjson的常用使用方法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2016-08-08
  • springboot2.x使用Jsoup防XSS攻擊的實現(xiàn)

    springboot2.x使用Jsoup防XSS攻擊的實現(xiàn)

    這篇文章主要介紹了springboot2.x使用Jsoup防XSS攻擊的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-04-04
  • springboot3解決跨域的幾種方式小結(jié)

    springboot3解決跨域的幾種方式小結(jié)

    這篇文章主要介紹了springboot3解決跨域的幾種方式,文中通過代碼示例給大家介紹的非常詳細,對大家的解決跨域有一定的幫助,需要的朋友可以參考下
    2024-03-03
  • springboot如何配置上傳文件的maxRequestSize

    springboot如何配置上傳文件的maxRequestSize

    這篇文章主要介紹了springboot如何配置上傳文件的maxRequestSize,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-03-03

最新評論