欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ中的NameServer詳細解析

 更新時間:2024年01月03日 10:39:22   作者:潛水路人甲  
這篇文章主要介紹了RocketMQ中的NameServer詳細解析,NameServer是一個非常簡單的Topic路由注冊中心,支持Broker的動態(tài)注冊與發(fā)現(xiàn),因此不能保證NameServer的一致性,需要的朋友可以參考下

前言

NameServer是一個非常簡單的Topic路由注冊中心,支持Broker的動態(tài)注冊與發(fā)現(xiàn)。

Producer和Conumser通過NameServer可以知道整個Broker集群的路由信息,從而進行消息的投遞和消費。

NameServer各實例間相互不進行信息通訊,因此不能保證NameServer的一致性(Consistency),可以保證可用性(Availability)。

即選擇了CAP中的AP。NameServer只能保證最終一致性,關(guān)于怎么保證最終一致性后文再講。

現(xiàn)在先從NameServer的啟動開始。

NameServer為namesrv模塊

NamesrvStartup

public static NamesrvController main0(String[] args) {
//構(gòu)造NamesrvController 
            NamesrvController controller = createNamesrvController(args);
            start(controller);
            return controller;
}
    public static NamesrvController start(final NamesrvController controller) throws Exception {
//初始化
        boolean initResult = controller.initialize();
//啟動
        controller.start();
        return controller;
    }
 

NamesrvStartup作為NameServer的啟動類,主要做了三件事:

  • 構(gòu)造NamesrvController(NameServer控制器)
  • 加載初始化,由NamesrvController負責(zé)
  • 啟動remotingServer,開啟Netty服務(wù)

NamesrvController

public class NamesrvController {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
 
    //nameSrv的配置
    private final NamesrvConfig namesrvConfig;
    //netty的配置
    private final NettyServerConfig nettyServerConfig;
 
    //執(zhí)行單線程的任務(wù)調(diào)度,自定義編程名稱
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
        "NSScheduledThread"));
 
    //kv的配置管理
    private final KVConfigManager kvConfigManager;
 
    //路由管理器,有broker的ip和隊列信息,producer發(fā)送的queue信息,consumer的pull的queue信息
    private final RouteInfoManager routeInfoManager;
 
    //namesrv的netty的服務(wù)端實現(xiàn)
    private RemotingServer remotingServer;
 
    //處理接受到請求事件的回調(diào)監(jiān)聽服務(wù),主要處理netty的事件
    private BrokerHousekeepingService brokerHousekeepingService;
 
    private ExecutorService remotingExecutor;
 
    private Configuration configuration;
    private FileWatchService fileWatchService;
 
    public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
        this.namesrvConfig = namesrvConfig;
        this.nettyServerConfig = nettyServerConfig;
        //構(gòu)造kv配置的管理
        this.kvConfigManager = new KVConfigManager(this);
        //構(gòu)造路由信息管理
        this.routeInfoManager = new RouteInfoManager();
        //構(gòu)造網(wǎng)絡(luò)連接事件管理
        this.brokerHousekeepingService = new BrokerHousekeepingService(this);
        //配置
        this.configuration = new Configuration(
            log,
            this.namesrvConfig, this.nettyServerConfig
        );
        this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
    }

NamesrvController作為一個控制器主要負責(zé)NettyServer的創(chuàng)建,注冊requestProcessor,啟動NettyServer及各種task

    public boolean initialize() {
 
        //加載kv配置
        this.kvConfigManager.load();
        //初始化NettyServer
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
        //執(zhí)行器,用于接受請求并進行處理
        this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
        //注冊Netty的處理器,即remotingExecutor
        this.registerProcessor();
        //進行掃描未活躍的Broker的任務(wù)
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 
            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);
        
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 
            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);
        //...省略其他代碼
        return true;
    }
 
   
    private void registerProcessor() {
        if (namesrvConfig.isClusterTest()) {
            this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
                this.remotingExecutor);
        } else {
            //注冊request處理器
            this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
        }
    }  
  
 
    public void start() throws Exception {
        //啟動netty server
        this.remotingServer.start();
 
        if (this.fileWatchService != null) {
            this.fileWatchService.start();
        }
    }

NettyRequestProcessor

NettyRequestProcessor為請求處理器,上一步注冊的處理器,一般使用默認的處理器DefaultRequestProcessor,processRequest方法處理請求。

@Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        //...
        switch (request.getCode()) {
            case RequestCode.PUT_KV_CONFIG:
                return this.putKVConfig(ctx, request);
            case RequestCode.GET_KV_CONFIG:
                return this.getKVConfig(ctx, request);
            case RequestCode.DELETE_KV_CONFIG:
                return this.deleteKVConfig(ctx, request);
            case RequestCode.QUERY_DATA_VERSION:
                return queryBrokerTopicConfig(ctx, request);
            case RequestCode.REGISTER_BROKER:
                Version brokerVersion = MQVersion.value2Version(request.getVersion());
                if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                    return this.registerBrokerWithFilterServer(ctx, request);
                } else {
                    return this.registerBroker(ctx, request);
                }
            //...省略其他代碼
        }
        return null;
    }

到此這篇關(guān)于RocketMQ中的NameServer詳細解析的文章就介紹到這了,更多相關(guān)RocketMQ中的NameServer內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論