欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ中的通信模塊詳解

 更新時間:2024年01月03日 10:58:31   作者:潛水路人甲  
這篇文章主要介紹了RocketMQ中的通信模塊詳解,RocketMQ消息隊列集群主要包括NameServer、Broker(Master/Slave)、Producer、Consumer4個角色,本文我們簡單來講解一下,需要的朋友可以參考下

 通信機(jī)制

RocketMQ消息隊列集群主要包括NameServer、Broker(Master/Slave)、Producer、Consumer4個角色,基本通訊流程如下:

(1) Broker啟動后需要完成一次將自己注冊至NameServer的操作;隨后每隔30s時間定時向NameServer上報Topic路由信息。

(2) 消息生產(chǎn)者Producer作為客戶端發(fā)送消息時候,需要根據(jù)消息的Topic從本地緩存的TopicPublishInfoTable獲取路由信息。如果沒有則更新路由信息會從NameServer上重新拉取,同時Producer會默認(rèn)每隔30s向NameServer拉取一次路由信息。

(3) 消息生產(chǎn)者Producer根據(jù)2)中獲取的路由信息選擇一個隊列(MessageQueue)進(jìn)行消息發(fā)送;Broker作為消息的接收者接收消息并落盤存儲。

(4) 消息消費(fèi)者Consumer根據(jù)2)中獲取的路由信息,并再完成客戶端的負(fù)載均衡后,選擇其中的某一個或者某幾個消息隊列來拉取消息并進(jìn)行消費(fèi)。

從上面1)~3)中可以看出在消息生產(chǎn)者, Broker和NameServer之間都會發(fā)生通信(這里只說了MQ的部分通信),因此如何設(shè)計一個良好的網(wǎng)絡(luò)通信模塊在MQ中至關(guān)重要,它將決定RocketMQ集群整體的消息傳輸能力與最終的性能。

rocketmq-remoting 模塊是 RocketMQ消息隊列中負(fù)責(zé)網(wǎng)絡(luò)通信的模塊,它幾乎被其他所有需要網(wǎng)絡(luò)通信的模塊(諸如rocketmq-client、rocketmq-broker、rocketmq-namesrv)所依賴和引用。為了實現(xiàn)客戶端與服務(wù)器之間高效的數(shù)據(jù)請求與接收,RocketMQ消息隊列自定義了通信協(xié)議并在Netty的基礎(chǔ)之上擴(kuò)展了通信模塊。

Remoting通信類

通信類結(jié)構(gòu):

NettyRemotingServer

NettyRemotingServer為服務(wù)端實現(xiàn)類,在NamesrvController中被構(gòu)造。

NettyRemotingServer構(gòu)造時主要工作是初始化下列屬性,構(gòu)造時判斷useEpoll來決定EventLoopGroup的實現(xiàn)。

    private final ServerBootstrap serverBootstrap;
    private final EventLoopGroup eventLoopGroupSelector;
    private final EventLoopGroup eventLoopGroupBoss;
    private final NettyServerConfig nettyServerConfig;
 
    private final ExecutorService publicExecutor;
    private final ChannelEventListener channelEventListener;

NettyRemotingServer啟動

    @Override
    public void start() {
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {
 
                private AtomicInteger threadIndex = new AtomicInteger(0);
 
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                }
            });
 
        prepareSharableHandlers();
 
        ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
					//添加handler,握手、編解碼、idle檢測、連接管理、消息處理
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                            .addLast(defaultEventExecutorGroup,
                                encoder,
                                new NettyDecoder(),
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                connectionManageHandler,
                                serverHandler
                            );
                    }
                });
 
        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }
 
        try {
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }
 
        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }
 
        this.timer.scheduleAtFixedRate(new TimerTask() {
 
            @Override
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }

主要關(guān)注initChannel方法中添加的handler,NettyConnectManageHandler  

    //消息處理核心類   
    @ChannelHandler.Sharable
    class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            processMessageReceived(ctx, msg);
        }
    }
    //連接管理處理類
    @ChannelHandler.Sharable
    class NettyConnectManageHandler extends ChannelDuplexHandler {
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);
            super.channelActive(ctx);
            if (NettyRemotingServer.this.channelEventListener != null) {
                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
            }
        }
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.info("NETTY SERVER PIPELINE: channelInactive, the channel[{}]", remoteAddress);
            super.channelInactive(ctx);
            if (NettyRemotingServer.this.channelEventListener != null) {
                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
            }
        }
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent) evt;
                if (event.state().equals(IdleState.ALL_IDLE)) {
                    final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                    log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
                    RemotingUtil.closeChannel(ctx.channel());
                    if (NettyRemotingServer.this.channelEventListener != null) {
                        NettyRemotingServer.this
                            .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
                    }
                }
            }
            ctx.fireUserEventTriggered(evt);
        }
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", remoteAddress);
            log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause);
            if (NettyRemotingServer.this.channelEventListener != null) {
                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
            }
            RemotingUtil.closeChannel(ctx.channel());
        }
    }

到此這篇關(guān)于RocketMQ中的通信模塊詳解的文章就介紹到這了,更多相關(guān)RocketMQ通信模塊內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java 中Map 的用法詳解

    Java 中Map 的用法詳解

    本文主要介紹java 中的Map 接口, 這里對Map 接口下的幾個類做了詳細(xì)介紹,希望對學(xué)習(xí)java 編程的小伙伴有所幫助
    2016-07-07
  • SpringBoot結(jié)果封裝和異常攔截的實現(xiàn)示例

    SpringBoot結(jié)果封裝和異常攔截的實現(xiàn)示例

    SpringBoot 項目中,我們通常需要將結(jié)果數(shù)據(jù)封裝成特定的格式,以方便客戶端進(jìn)行處理,本文主要介紹了SpringBoot?優(yōu)雅的結(jié)果封裝和異常攔截,感興趣的可以了解一下
    2023-08-08
  • JavaWeb項目Servlet無法訪問問題解決

    JavaWeb項目Servlet無法訪問問題解決

    這篇文章主要介紹了JavaWeb項目Servlet無法訪問問題解決,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2019-11-11
  • 踩坑之spring事務(wù),非事務(wù)方法與事務(wù)方法執(zhí)行相互調(diào)用方式

    踩坑之spring事務(wù),非事務(wù)方法與事務(wù)方法執(zhí)行相互調(diào)用方式

    這篇文章主要介紹了踩坑之spring事務(wù),非事務(wù)方法與事務(wù)方法執(zhí)行相互調(diào)用方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-07-07
  • Spring中ApplicationContextAware的使用方法詳解

    Spring中ApplicationContextAware的使用方法詳解

    ApplicationContextAware?通過它Spring容器會自動把上下文環(huán)境對象調(diào)用ApplicationContextAware接口中的setApplicationContext方法,這篇文章主要介紹了Spring中ApplicationContextAware的作用,需要的朋友可以參考下
    2023-03-03
  • Springboot四種事件監(jiān)聽的實現(xiàn)方式詳解

    Springboot四種事件監(jiān)聽的實現(xiàn)方式詳解

    這篇文章主要介紹了Springboot四種事件監(jiān)聽的實現(xiàn)方式,事件監(jiān)聽是一種機(jī)制,可以定義和觸發(fā)自定義的事件,以及在應(yīng)用程序中注冊監(jiān)聽器來響應(yīng)這些事件,需要的朋友可以參考下
    2022-06-06
  • Java如何將時間戳格式化為日期字符串

    Java如何將時間戳格式化為日期字符串

    這篇文章主要介紹了Java如何將時間戳格式化為日期字符串問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-04-04
  • Spring中集成Groovy的四種方式(小結(jié))

    Spring中集成Groovy的四種方式(小結(jié))

    這篇文章主要介紹了Spring中集成Groovy的四種方式,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-09-09
  • Java創(chuàng)建線程池的幾種方式代碼示例

    Java創(chuàng)建線程池的幾種方式代碼示例

    這篇文章主要介紹了Java中創(chuàng)建線程池的四種方式,包括使用Executors類、ThreadPoolExecutor類、Future和Callable接口以及Spring的ThreadPoolTaskExecutor,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2025-01-01
  • Java實現(xiàn)入?yún)?shù)據(jù)批量數(shù)據(jù)校驗詳解

    Java實現(xiàn)入?yún)?shù)據(jù)批量數(shù)據(jù)校驗詳解

    在業(yè)務(wù)處理中一般入?yún)⑹菃螚l數(shù)據(jù),這樣數(shù)據(jù)校驗比較容易,但是這種方法對于集合數(shù)據(jù)的校驗不適用,下面我們就來看看如何對入?yún)?shù)據(jù)進(jìn)行批量數(shù)據(jù)校驗吧
    2024-02-02

最新評論