RocketMQ中的通信模塊詳解
通信機(jī)制
RocketMQ消息隊列集群主要包括NameServer、Broker(Master/Slave)、Producer、Consumer4個角色,基本通訊流程如下:
(1) Broker啟動后需要完成一次將自己注冊至NameServer的操作;隨后每隔30s時間定時向NameServer上報Topic路由信息。
(2) 消息生產(chǎn)者Producer作為客戶端發(fā)送消息時候,需要根據(jù)消息的Topic從本地緩存的TopicPublishInfoTable獲取路由信息。如果沒有則更新路由信息會從NameServer上重新拉取,同時Producer會默認(rèn)每隔30s向NameServer拉取一次路由信息。
(3) 消息生產(chǎn)者Producer根據(jù)2)中獲取的路由信息選擇一個隊列(MessageQueue)進(jìn)行消息發(fā)送;Broker作為消息的接收者接收消息并落盤存儲。
(4) 消息消費(fèi)者Consumer根據(jù)2)中獲取的路由信息,并再完成客戶端的負(fù)載均衡后,選擇其中的某一個或者某幾個消息隊列來拉取消息并進(jìn)行消費(fèi)。
從上面1)~3)中可以看出在消息生產(chǎn)者, Broker和NameServer之間都會發(fā)生通信(這里只說了MQ的部分通信),因此如何設(shè)計一個良好的網(wǎng)絡(luò)通信模塊在MQ中至關(guān)重要,它將決定RocketMQ集群整體的消息傳輸能力與最終的性能。
rocketmq-remoting 模塊是 RocketMQ消息隊列中負(fù)責(zé)網(wǎng)絡(luò)通信的模塊,它幾乎被其他所有需要網(wǎng)絡(luò)通信的模塊(諸如rocketmq-client、rocketmq-broker、rocketmq-namesrv)所依賴和引用。為了實現(xiàn)客戶端與服務(wù)器之間高效的數(shù)據(jù)請求與接收,RocketMQ消息隊列自定義了通信協(xié)議并在Netty的基礎(chǔ)之上擴(kuò)展了通信模塊。
Remoting通信類
通信類結(jié)構(gòu):
NettyRemotingServer
NettyRemotingServer為服務(wù)端實現(xiàn)類,在NamesrvController中被構(gòu)造。
NettyRemotingServer構(gòu)造時主要工作是初始化下列屬性,構(gòu)造時判斷useEpoll來決定EventLoopGroup的實現(xiàn)。
private final ServerBootstrap serverBootstrap; private final EventLoopGroup eventLoopGroupSelector; private final EventLoopGroup eventLoopGroupBoss; private final NettyServerConfig nettyServerConfig; private final ExecutorService publicExecutor; private final ChannelEventListener channelEventListener;
NettyRemotingServer啟動
@Override public void start() { this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyServerConfig.getServerWorkerThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet()); } }); prepareSharableHandlers(); ServerBootstrap childHandler = this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, false) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { //添加handler,握手、編解碼、idle檢測、連接管理、消息處理 ch.pipeline() .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler) .addLast(defaultEventExecutorGroup, encoder, new NettyDecoder(), new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), connectionManageHandler, serverHandler ); } }); if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) { childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } try { ChannelFuture sync = this.serverBootstrap.bind().sync(); InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress(); this.port = addr.getPort(); } catch (InterruptedException e1) { throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1); } if (this.channelEventListener != null) { this.nettyEventExecutor.start(); } this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { NettyRemotingServer.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); }
主要關(guān)注initChannel方法中添加的handler,NettyConnectManageHandler
//消息處理核心類 @ChannelHandler.Sharable class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> { @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { processMessageReceived(ctx, msg); } } //連接管理處理類 @ChannelHandler.Sharable class NettyConnectManageHandler extends ChannelDuplexHandler { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress); super.channelActive(ctx); if (NettyRemotingServer.this.channelEventListener != null) { NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel())); } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.info("NETTY SERVER PIPELINE: channelInactive, the channel[{}]", remoteAddress); super.channelInactive(ctx); if (NettyRemotingServer.this.channelEventListener != null) { NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel())); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state().equals(IdleState.ALL_IDLE)) { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress); RemotingUtil.closeChannel(ctx.channel()); if (NettyRemotingServer.this.channelEventListener != null) { NettyRemotingServer.this .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel())); } } } ctx.fireUserEventTriggered(evt); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", remoteAddress); log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause); if (NettyRemotingServer.this.channelEventListener != null) { NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel())); } RemotingUtil.closeChannel(ctx.channel()); } }
到此這篇關(guān)于RocketMQ中的通信模塊詳解的文章就介紹到這了,更多相關(guān)RocketMQ通信模塊內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot結(jié)果封裝和異常攔截的實現(xiàn)示例
SpringBoot 項目中,我們通常需要將結(jié)果數(shù)據(jù)封裝成特定的格式,以方便客戶端進(jìn)行處理,本文主要介紹了SpringBoot?優(yōu)雅的結(jié)果封裝和異常攔截,感興趣的可以了解一下2023-08-08踩坑之spring事務(wù),非事務(wù)方法與事務(wù)方法執(zhí)行相互調(diào)用方式
這篇文章主要介紹了踩坑之spring事務(wù),非事務(wù)方法與事務(wù)方法執(zhí)行相互調(diào)用方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-07-07Spring中ApplicationContextAware的使用方法詳解
ApplicationContextAware?通過它Spring容器會自動把上下文環(huán)境對象調(diào)用ApplicationContextAware接口中的setApplicationContext方法,這篇文章主要介紹了Spring中ApplicationContextAware的作用,需要的朋友可以參考下2023-03-03Springboot四種事件監(jiān)聽的實現(xiàn)方式詳解
這篇文章主要介紹了Springboot四種事件監(jiān)聽的實現(xiàn)方式,事件監(jiān)聽是一種機(jī)制,可以定義和觸發(fā)自定義的事件,以及在應(yīng)用程序中注冊監(jiān)聽器來響應(yīng)這些事件,需要的朋友可以參考下2022-06-06Java實現(xiàn)入?yún)?shù)據(jù)批量數(shù)據(jù)校驗詳解
在業(yè)務(wù)處理中一般入?yún)⑹菃螚l數(shù)據(jù),這樣數(shù)據(jù)校驗比較容易,但是這種方法對于集合數(shù)據(jù)的校驗不適用,下面我們就來看看如何對入?yún)?shù)據(jù)進(jìn)行批量數(shù)據(jù)校驗吧2024-02-02