RocketMQ設(shè)計(jì)之主從復(fù)制和讀寫(xiě)分離
一、主從復(fù)制
RocketMQ為了提高消費(fèi)的高可用性,避免Broker發(fā)生單點(diǎn)故障引起B(yǎng)roker上的消息無(wú)法及時(shí)消費(fèi),同時(shí)避免單個(gè)機(jī)器上硬盤(pán)壞損出現(xiàn)消費(fèi)數(shù)據(jù)丟失。
RocketMQ采用Broker數(shù)據(jù)主從復(fù)制機(jī)制,當(dāng)消息發(fā)送到Master服務(wù)器后會(huì)將消息同步到Slave服務(wù)器,如果Master服務(wù)器宕機(jī),消息消費(fèi)者還可以繼續(xù)從Slave拉取消息。
消息從Master服務(wù)器復(fù)制到Slave服務(wù)器上,有兩種復(fù)制方式:同步復(fù)制SYNC_MASTER
和異步復(fù)制ASYNC_MASTER
。
通過(guò)配置文件conf/broker.conf文件配置:
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. ?See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. ?You may obtain a copy of the License at # # ? ? http://www.apache.org/licenses/LICENSE-2.0 # # ?Unless required by applicable law or agreed to in writing, software # ?distributed under the License is distributed on an "AS IS" BASIS, # ?WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # ?See the License for the specific language governing permissions and # ?limitations under the License. brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH
對(duì)brokerRole參數(shù)進(jìn)行設(shè)置:
同步復(fù)制:Master和Slave都寫(xiě)成功后才返回客戶(hù)端寫(xiě)成功的狀態(tài)。
- 優(yōu)點(diǎn):Master服務(wù)器出現(xiàn)故障,Slave服務(wù)器上有全部數(shù)據(jù)的備份,很容易恢復(fù)到Master服務(wù)器。
- 缺點(diǎn):由于多了一個(gè)同步等待的步驟,增加數(shù)據(jù)寫(xiě)入延遲,降低系統(tǒng)吞吐量。
異步復(fù)制:僅Master服務(wù)器寫(xiě)成功即可返回給客戶(hù)端寫(xiě)成功的狀態(tài)。
- 優(yōu)點(diǎn):沒(méi)有同步等待的步驟,低延遲,高吞吐。
- 缺點(diǎn):如果Master服務(wù)器出現(xiàn)故障,有些數(shù)據(jù)可能未寫(xiě)入Slave服務(wù)器,未同步的數(shù)據(jù)可能丟失
實(shí)際應(yīng)用中,需要結(jié)合業(yè)務(wù)場(chǎng)景,合理設(shè)置刷盤(pán)方式和主從復(fù)制方式。不建議使用同步刷盤(pán)方式,因?yàn)樗l繁觸發(fā)寫(xiě)磁盤(pán)操作,性能下降很明顯。**通常把Master
和Slave
設(shè)置為異步刷盤(pán),同步復(fù)制,保證數(shù)據(jù)不丟失。**這樣即使一臺(tái)服務(wù)器出故障,仍然可以保證數(shù)據(jù)不丟失。
二、讀寫(xiě)分離
讀寫(xiě)分離機(jī)制是高性能、高可用架構(gòu)中常見(jiàn)的設(shè)計(jì),例如Mysql實(shí)現(xiàn)讀寫(xiě)分離機(jī)制,Client只能從Master服務(wù)器寫(xiě)數(shù)據(jù),可以從Master服務(wù)器和Slave服務(wù)器都讀數(shù)據(jù)。
RocketMQ的Consumer
在拉取消息時(shí),Broker會(huì)判斷Master服務(wù)器的消息堆積量來(lái)決定Consumer是否從Slave服務(wù)器拉取消息消費(fèi)。默認(rèn)一開(kāi)始從Master服務(wù)器拉群消息,如果Master服務(wù)器的消息堆積超過(guò)物理內(nèi)存40%,則會(huì)返回給Consumer的消息結(jié)果并告知Consumer,下次從其他Slave服務(wù)器上拉取消息。
RocketMQ 有屬于自己的一套讀寫(xiě)分離邏輯,會(huì)判斷主服務(wù)器的消息堆積量來(lái)決定消費(fèi)者是否向從服務(wù)器拉取消息消費(fèi)。
Consumer
在向 Broker 發(fā)送消息拉取請(qǐng)求時(shí),會(huì)根據(jù)篩選出來(lái)的消息隊(duì)列,判定是從Master,還是從Slave拉取消息,默認(rèn)是Master。
Broker 接收到消息消費(fèi)者拉取請(qǐng)求,在獲取本地堆積的消息量后,會(huì)計(jì)算服務(wù)器的消息堆積量是否大于物理內(nèi)存的一定值,如果是,則標(biāo)記下次從 Slave服務(wù)器拉取,計(jì)算 Slave服務(wù)器的 Broker Id,并響應(yīng)給消費(fèi)者。
Consumer在接收到 Broker的響應(yīng)后,會(huì)把消息隊(duì)列與建議下一次拉取節(jié)點(diǎn)的 Broker Id 關(guān)聯(lián)起來(lái),并緩存在內(nèi)存中,以便下次拉取消息時(shí),確定從哪個(gè)節(jié)點(diǎn)發(fā)送請(qǐng)求。
public class GetMessageResult { ? ? private final List<SelectMappedBufferResult> messageMapedList = ? ? ? ? new ArrayList<SelectMappedBufferResult>(100); ? ? private final List<ByteBuffer> messageBufferList = new ArrayList<ByteBuffer>(100); ? ? private GetMessageStatus status; ? ? private long nextBeginOffset; ? ? private long minOffset; ? ? private long maxOffset; ? ? private int bufferTotalSize = 0; ? ? // 標(biāo)識(shí)是否通過(guò)Slave拉拉取消息 ? ? private boolean suggestPullingFromSlave = false; ? ? private int msgCount4Commercial = 0; } // 針對(duì)消息堆積量過(guò)大會(huì)切換到Slave進(jìn)行查詢(xún)。 // maxOffsetPy 為當(dāng)前最大物理偏移量,maxPhyOffsetPulling 為本次消息拉取最大物理偏移量,他們的差即可表示消息堆積量。 // TOTAL_PHYSICAL_MEMORY_SIZE 表示當(dāng)前系統(tǒng)物理內(nèi)存,accessMessageInMemoryMaxRatio 的默認(rèn)值為 40, // 以上邏輯即可算出當(dāng)前消息堆積量是否大于物理內(nèi)存的 40%,如果大于則將 suggestPullingFromSlave 設(shè)置為 true。 long diff = maxOffsetPy - maxPhyOffsetPulling; long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE ? ? * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); getResult.setSuggestPullingFromSlave(diff > memory);
- 決定消費(fèi)者是否向從服務(wù)器拉取消息消費(fèi)的值存在
GetMessageResult
類(lèi)中。 suggestPullingFromSlave
的默認(rèn)值為 false,即默認(rèn)消費(fèi)者不會(huì)消費(fèi)從服務(wù)器,但它會(huì)在消費(fèi)者發(fā)送消息拉取請(qǐng)求時(shí),動(dòng)態(tài)改變?cè)撝?,Broker 接收、處理消費(fèi)者拉取消息請(qǐng)求。- 針對(duì)本MessageQueue消息堆積量過(guò)大會(huì)切換到Slave進(jìn)行查詢(xún),maxOffsetPy 為當(dāng)前最大物理偏移量,
maxPhyOffsetPulling
為本次消息拉取最大物理偏移量,他們的差即可表示消息堆積量,當(dāng)前消息堆積量是否大于物理內(nèi)存的 40%就會(huì)切換到Slave進(jìn)行查詢(xún)。
public class PullMessageResponseHeader implements CommandCustomHeader { ? ? // suggestWhichBrokerId標(biāo)識(shí)從哪個(gè)broker進(jìn)行查詢(xún) ? ? private Long suggestWhichBrokerId; ? ? private Long nextBeginOffset; ? ? private Long minOffset; ? ? private Long maxOffset; } public class PullMessageProcessor implements NettyRequestProcessor { ? ? private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) ? ? ? ? throws RemotingCommandException { ? ? ? ? RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class); ? ? ? ? final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader(); ? ? ? ? final PullMessageRequestHeader requestHeader = ? ? ? ? ? ? (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class); ? ? ? ? response.setOpaque(request.getOpaque()); ? ? ? ? final GetMessageResult getMessageResult = ? ? ? ? ? ? this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ? ? ? ? ? ? ? ? requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter); ? ? ? ? if (getMessageResult != null) { ? ? ? ? ? ? response.setRemark(getMessageResult.getStatus().name()); ? ? ? ? ? ? responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset()); ? ? ? ? ? ? responseHeader.setMinOffset(getMessageResult.getMinOffset()); ? ? ? ? ? ? responseHeader.setMaxOffset(getMessageResult.getMaxOffset()); ? ? ? ? ? ? // 建議從slave消費(fèi)消息 ? ? ? ? ? ? if (getMessageResult.isSuggestPullingFromSlave()) { ? ? ? ? ? ? ? ? // 從slave查詢(xún) ? ? ? ? ? ? ? ? responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()); ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ? // 從master查詢(xún) ? ? ? ? ? ? ? ? responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); ? ? ? ? ? ? } ? ? ? ? ? ? switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) { ? ? ? ? ? ? ? ? case ASYNC_MASTER: ? ? ? ? ? ? ? ? case SYNC_MASTER: ? ? ? ? ? ? ? ? ? ? break; ? ? ? ? ? ? ? ? case SLAVE: ? ? ? ? ? ? ? ? ? ? // 針對(duì)SLAVE需要判斷是否可讀,不可讀的情況下讀MASTER ? ? ? ? ? ? ? ? ? ? if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) { ? ? ? ? ? ? ? ? ? ? ? ? response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); ? ? ? ? ? ? ? ? ? ? ? ? responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? break; ? ? ? ? ? ? } ? ? ? ? ? ? if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) { ? ? ? ? ? ? ? ? // consume too slow ,redirect to another machine ? ? ? ? ? ? ? ? if (getMessageResult.isSuggestPullingFromSlave()) { ? ? ? ? ? ? ? ? ? ? responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? // consume ok ? ? ? ? ? ? ? ? else { ? ? ? ? ? ? ? ? ? ? responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId()); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ? responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); ? ? ? ? ? ? } ? ? ? ? } ? ? ? ? return response; ? ? } }
PullMessageResponseHeader
的suggestWhichBrokerId
標(biāo)識(shí)某個(gè)MessageQueue
的消息從具體的brokerId進(jìn)行查詢(xún)。
針對(duì)Slave不可讀的情況會(huì)設(shè)置為從MASTER_ID進(jìn)行查詢(xún)。
public class PullAPIWrapper { ? ? private final InternalLogger log = ClientLogger.getLog(); ? ? private final MQClientInstance mQClientFactory; ? ? private final String consumerGroup; ? ? private final boolean unitMode; ? ? private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable = ? ? ? ? new ConcurrentHashMap<MessageQueue, AtomicLong>(32); ? ? private volatile boolean connectBrokerByUser = false; ? ? private volatile long defaultBrokerId = MixAll.MASTER_ID; ? ? private Random random = new Random(System.currentTimeMillis()); ? ? private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>(); ? ? public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult, ? ? ? ? final SubscriptionData subscriptionData) { ? ? ? ? PullResultExt pullResultExt = (PullResultExt) pullResult; ? ? ? ? // 處理MessageQueue對(duì)應(yīng)拉取的brokerId ? ? ? ? this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId()); ? ? ? ? // 省略相關(guān)代碼 ? ? ? ? pullResultExt.setMessageBinary(null); ? ? ? ? return pullResult; ? ? } ? ? public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) { ? ? ? ? // 保存在pullFromWhichNodeTable對(duì)象中 ? ? ? ? AtomicLong suggest = this.pullFromWhichNodeTable.get(mq); ? ? ? ? if (null == suggest) { ? ? ? ? ? ? this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId)); ? ? ? ? } else { ? ? ? ? ? ? suggest.set(brokerId); ? ? ? ? } ? ? } }
Consumer
收到拉取響應(yīng)回來(lái)的數(shù)據(jù)后,會(huì)將下次建議拉取的 brokerId
緩存起來(lái)。
public class PullAPIWrapper { ? ? private final InternalLogger log = ClientLogger.getLog(); ? ? private final MQClientInstance mQClientFactory; ? ? private final String consumerGroup; ? ? private final boolean unitMode; ? ? private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable = ? ? ? ? new ConcurrentHashMap<MessageQueue, AtomicLong>(32); ? ? private volatile boolean connectBrokerByUser = false; ? ? private volatile long defaultBrokerId = MixAll.MASTER_ID; ? ? private Random random = new Random(System.currentTimeMillis()); ? ? private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>(); ? ? public PullResult pullKernelImpl( ? ? ? ? final MessageQueue mq, ? ? ? ? final String subExpression, ? ? ? ? final String expressionType, ? ? ? ? final long subVersion, ? ? ? ? final long offset, ? ? ? ? final int maxNums, ? ? ? ? final int sysFlag, ? ? ? ? final long commitOffset, ? ? ? ? final long brokerSuspendMaxTimeMillis, ? ? ? ? final long timeoutMillis, ? ? ? ? final CommunicationMode communicationMode, ? ? ? ? final PullCallback pullCallback ? ? ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { ? ? ? ? // 查找MessageQueue應(yīng)該從brokerName的哪個(gè)節(jié)點(diǎn)查詢(xún) ? ? ? ? FindBrokerResult findBrokerResult = ? ? ? ? ? ? this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), ? ? ? ? ? ? ? ? this.recalculatePullFromWhichNode(mq), false); ? ? ? ? if (null == findBrokerResult) { ? ? ? ? ? ? this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); ? ? ? ? ? ? findBrokerResult = ? ? ? ? ? ? ? ? this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), ? ? ? ? ? ? ? ? ? ? this.recalculatePullFromWhichNode(mq), false); ? ? ? ? } ? ? ? ? if (findBrokerResult != null) { ? ? ? ? ? ? { ? ? ? ? ? ? ? ? // check version ? ? ? ? ? ? ? ? if (!ExpressionType.isTagType(expressionType) ? ? ? ? ? ? ? ? ? ? && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) { ? ? ? ? ? ? ? ? ? ? throw new MQClientException("The broker[" + mq.getBrokerName() + ", " ? ? ? ? ? ? ? ? ? ? ? ? + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } ? ? ? ? ? ? int sysFlagInner = sysFlag; ? ? ? ? ? ? if (findBrokerResult.isSlave()) { ? ? ? ? ? ? ? ? sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner); ? ? ? ? ? ? } ? ? ? ? ? ? PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); ? ? ? ? ? ? requestHeader.setConsumerGroup(this.consumerGroup); ? ? ? ? ? ? requestHeader.setTopic(mq.getTopic()); ? ? ? ? ? ? requestHeader.setQueueId(mq.getQueueId()); ? ? ? ? ? ? requestHeader.setQueueOffset(offset); ? ? ? ? ? ? requestHeader.setMaxMsgNums(maxNums); ? ? ? ? ? ? requestHeader.setSysFlag(sysFlagInner); ? ? ? ? ? ? requestHeader.setCommitOffset(commitOffset); ? ? ? ? ? ? requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); ? ? ? ? ? ? requestHeader.setSubscription(subExpression); ? ? ? ? ? ? requestHeader.setSubVersion(subVersion); ? ? ? ? ? ? requestHeader.setExpressionType(expressionType); ? ? ? ? ? ? String brokerAddr = findBrokerResult.getBrokerAddr(); ? ? ? ? ? ? if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) { ? ? ? ? ? ? ? ? brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr); ? ? ? ? ? ? } ? ? ? ? ? ? PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( ? ? ? ? ? ? ? ? brokerAddr, ? ? ? ? ? ? ? ? requestHeader, ? ? ? ? ? ? ? ? timeoutMillis, ? ? ? ? ? ? ? ? communicationMode, ? ? ? ? ? ? ? ? pullCallback); ? ? ? ? ? ? return pullResult; ? ? ? ? } ? ? ? ? throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); ? ? } ? ? public long recalculatePullFromWhichNode(final MessageQueue mq) { ? ? ? ? if (this.isConnectBrokerByUser()) { ? ? ? ? ? ? return this.defaultBrokerId; ? ? ? ? } ? ? ? ? AtomicLong suggest = this.pullFromWhichNodeTable.get(mq); ? ? ? ? if (suggest != null) { ? ? ? ? ? ? return suggest.get(); ? ? ? ? } ? ? ? ? return MixAll.MASTER_ID; ? ? } }
Consumer
拉取消息的時(shí)候會(huì)從 pullFromWhichNodeTable
中取出拉取 brokerId確定去具體的broker進(jìn)行查詢(xún)。
到此這篇關(guān)于RocketMQ設(shè)計(jì)之主從復(fù)制和讀寫(xiě)分離的文章就介紹到這了,更多相關(guān)RocketMQ從復(fù)制和讀寫(xiě)分離內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Mybatis批量插入數(shù)據(jù)的兩種方式總結(jié)與對(duì)比
批量插入功能是我們?nèi)粘9ぷ髦斜容^常見(jiàn)的業(yè)務(wù)功能之一,下面這篇文章主要給大家介紹了關(guān)于Mybatis批量插入數(shù)據(jù)的兩種方式總結(jié)與對(duì)比的相關(guān)資料,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-01-01Java基礎(chǔ)之重載(Overload)與重寫(xiě)(Override)詳解
這篇文章主要介紹了Java基礎(chǔ)之重載(Overload)與重寫(xiě)(Override)詳解,文中有非常詳細(xì)的代碼示例,對(duì)正在學(xué)習(xí)java基礎(chǔ)的小伙伴們有非常好的幫助,需要的朋友可以參考下2021-04-04Java/Android 實(shí)現(xiàn)簡(jiǎn)單的HTTP服務(wù)器
這篇文章主要介紹了Java/Android 如何實(shí)現(xiàn)簡(jiǎn)單的HTTP服務(wù)器,幫助大家更好的進(jìn)行功能測(cè)試,感興趣的朋友可以了解下2020-10-10mybatis返回的map結(jié)果如何設(shè)置有序
這篇文章主要介紹了mybatis返回的map結(jié)果如何設(shè)置有序,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-01-01MyBatis通過(guò)JDBC數(shù)據(jù)驅(qū)動(dòng)生成的執(zhí)行語(yǔ)句問(wèn)題
這篇文章主要介紹了MyBatis通過(guò)JDBC數(shù)據(jù)驅(qū)動(dòng)生成的執(zhí)行語(yǔ)句問(wèn)題的相關(guān)資料,非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友可以參考下2016-08-08