RocketMQ設計之主從復制和讀寫分離
一、主從復制
RocketMQ為了提高消費的高可用性,避免Broker發(fā)生單點故障引起B(yǎng)roker上的消息無法及時消費,同時避免單個機器上硬盤壞損出現(xiàn)消費數(shù)據(jù)丟失。
RocketMQ采用Broker數(shù)據(jù)主從復制機制,當消息發(fā)送到Master服務器后會將消息同步到Slave服務器,如果Master服務器宕機,消息消費者還可以繼續(xù)從Slave拉取消息。
消息從Master服務器復制到Slave服務器上,有兩種復制方式:同步復制SYNC_MASTER
和異步復制ASYNC_MASTER
。
通過配置文件conf/broker.conf文件配置:
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. ?See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. ?You may obtain a copy of the License at # # ? ? http://www.apache.org/licenses/LICENSE-2.0 # # ?Unless required by applicable law or agreed to in writing, software # ?distributed under the License is distributed on an "AS IS" BASIS, # ?WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # ?See the License for the specific language governing permissions and # ?limitations under the License. brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH
對brokerRole參數(shù)進行設置:
同步復制:Master和Slave都寫成功后才返回客戶端寫成功的狀態(tài)。
- 優(yōu)點:Master服務器出現(xiàn)故障,Slave服務器上有全部數(shù)據(jù)的備份,很容易恢復到Master服務器。
- 缺點:由于多了一個同步等待的步驟,增加數(shù)據(jù)寫入延遲,降低系統(tǒng)吞吐量。
異步復制:僅Master服務器寫成功即可返回給客戶端寫成功的狀態(tài)。
- 優(yōu)點:沒有同步等待的步驟,低延遲,高吞吐。
- 缺點:如果Master服務器出現(xiàn)故障,有些數(shù)據(jù)可能未寫入Slave服務器,未同步的數(shù)據(jù)可能丟失
實際應用中,需要結合業(yè)務場景,合理設置刷盤方式和主從復制方式。不建議使用同步刷盤方式,因為它頻繁觸發(fā)寫磁盤操作,性能下降很明顯。**通常把Master
和Slave
設置為異步刷盤,同步復制,保證數(shù)據(jù)不丟失。**這樣即使一臺服務器出故障,仍然可以保證數(shù)據(jù)不丟失。
二、讀寫分離
讀寫分離機制是高性能、高可用架構中常見的設計,例如Mysql實現(xiàn)讀寫分離機制,Client只能從Master服務器寫數(shù)據(jù),可以從Master服務器和Slave服務器都讀數(shù)據(jù)。
RocketMQ的Consumer
在拉取消息時,Broker會判斷Master服務器的消息堆積量來決定Consumer是否從Slave服務器拉取消息消費。默認一開始從Master服務器拉群消息,如果Master服務器的消息堆積超過物理內存40%,則會返回給Consumer的消息結果并告知Consumer,下次從其他Slave服務器上拉取消息。
RocketMQ 有屬于自己的一套讀寫分離邏輯,會判斷主服務器的消息堆積量來決定消費者是否向從服務器拉取消息消費。
Consumer
在向 Broker 發(fā)送消息拉取請求時,會根據(jù)篩選出來的消息隊列,判定是從Master,還是從Slave拉取消息,默認是Master。
Broker 接收到消息消費者拉取請求,在獲取本地堆積的消息量后,會計算服務器的消息堆積量是否大于物理內存的一定值,如果是,則標記下次從 Slave服務器拉取,計算 Slave服務器的 Broker Id,并響應給消費者。
Consumer在接收到 Broker的響應后,會把消息隊列與建議下一次拉取節(jié)點的 Broker Id 關聯(lián)起來,并緩存在內存中,以便下次拉取消息時,確定從哪個節(jié)點發(fā)送請求。
public class GetMessageResult { ? ? private final List<SelectMappedBufferResult> messageMapedList = ? ? ? ? new ArrayList<SelectMappedBufferResult>(100); ? ? private final List<ByteBuffer> messageBufferList = new ArrayList<ByteBuffer>(100); ? ? private GetMessageStatus status; ? ? private long nextBeginOffset; ? ? private long minOffset; ? ? private long maxOffset; ? ? private int bufferTotalSize = 0; ? ? // 標識是否通過Slave拉拉取消息 ? ? private boolean suggestPullingFromSlave = false; ? ? private int msgCount4Commercial = 0; } // 針對消息堆積量過大會切換到Slave進行查詢。 // maxOffsetPy 為當前最大物理偏移量,maxPhyOffsetPulling 為本次消息拉取最大物理偏移量,他們的差即可表示消息堆積量。 // TOTAL_PHYSICAL_MEMORY_SIZE 表示當前系統(tǒng)物理內存,accessMessageInMemoryMaxRatio 的默認值為 40, // 以上邏輯即可算出當前消息堆積量是否大于物理內存的 40%,如果大于則將 suggestPullingFromSlave 設置為 true。 long diff = maxOffsetPy - maxPhyOffsetPulling; long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE ? ? * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); getResult.setSuggestPullingFromSlave(diff > memory);
- 決定消費者是否向從服務器拉取消息消費的值存在
GetMessageResult
類中。 suggestPullingFromSlave
的默認值為 false,即默認消費者不會消費從服務器,但它會在消費者發(fā)送消息拉取請求時,動態(tài)改變該值,Broker 接收、處理消費者拉取消息請求。- 針對本MessageQueue消息堆積量過大會切換到Slave進行查詢,maxOffsetPy 為當前最大物理偏移量,
maxPhyOffsetPulling
為本次消息拉取最大物理偏移量,他們的差即可表示消息堆積量,當前消息堆積量是否大于物理內存的 40%就會切換到Slave進行查詢。
public class PullMessageResponseHeader implements CommandCustomHeader { ? ? // suggestWhichBrokerId標識從哪個broker進行查詢 ? ? private Long suggestWhichBrokerId; ? ? private Long nextBeginOffset; ? ? private Long minOffset; ? ? private Long maxOffset; } public class PullMessageProcessor implements NettyRequestProcessor { ? ? private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) ? ? ? ? throws RemotingCommandException { ? ? ? ? RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class); ? ? ? ? final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader(); ? ? ? ? final PullMessageRequestHeader requestHeader = ? ? ? ? ? ? (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class); ? ? ? ? response.setOpaque(request.getOpaque()); ? ? ? ? final GetMessageResult getMessageResult = ? ? ? ? ? ? this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ? ? ? ? ? ? ? ? requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter); ? ? ? ? if (getMessageResult != null) { ? ? ? ? ? ? response.setRemark(getMessageResult.getStatus().name()); ? ? ? ? ? ? responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset()); ? ? ? ? ? ? responseHeader.setMinOffset(getMessageResult.getMinOffset()); ? ? ? ? ? ? responseHeader.setMaxOffset(getMessageResult.getMaxOffset()); ? ? ? ? ? ? // 建議從slave消費消息 ? ? ? ? ? ? if (getMessageResult.isSuggestPullingFromSlave()) { ? ? ? ? ? ? ? ? // 從slave查詢 ? ? ? ? ? ? ? ? responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()); ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ? // 從master查詢 ? ? ? ? ? ? ? ? responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); ? ? ? ? ? ? } ? ? ? ? ? ? switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) { ? ? ? ? ? ? ? ? case ASYNC_MASTER: ? ? ? ? ? ? ? ? case SYNC_MASTER: ? ? ? ? ? ? ? ? ? ? break; ? ? ? ? ? ? ? ? case SLAVE: ? ? ? ? ? ? ? ? ? ? // 針對SLAVE需要判斷是否可讀,不可讀的情況下讀MASTER ? ? ? ? ? ? ? ? ? ? if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) { ? ? ? ? ? ? ? ? ? ? ? ? response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); ? ? ? ? ? ? ? ? ? ? ? ? responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? break; ? ? ? ? ? ? } ? ? ? ? ? ? if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) { ? ? ? ? ? ? ? ? // consume too slow ,redirect to another machine ? ? ? ? ? ? ? ? if (getMessageResult.isSuggestPullingFromSlave()) { ? ? ? ? ? ? ? ? ? ? responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? // consume ok ? ? ? ? ? ? ? ? else { ? ? ? ? ? ? ? ? ? ? responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId()); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ? responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); ? ? ? ? ? ? } ? ? ? ? } ? ? ? ? return response; ? ? } }
PullMessageResponseHeader
的suggestWhichBrokerId
標識某個MessageQueue
的消息從具體的brokerId進行查詢。
針對Slave不可讀的情況會設置為從MASTER_ID進行查詢。
public class PullAPIWrapper { ? ? private final InternalLogger log = ClientLogger.getLog(); ? ? private final MQClientInstance mQClientFactory; ? ? private final String consumerGroup; ? ? private final boolean unitMode; ? ? private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable = ? ? ? ? new ConcurrentHashMap<MessageQueue, AtomicLong>(32); ? ? private volatile boolean connectBrokerByUser = false; ? ? private volatile long defaultBrokerId = MixAll.MASTER_ID; ? ? private Random random = new Random(System.currentTimeMillis()); ? ? private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>(); ? ? public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult, ? ? ? ? final SubscriptionData subscriptionData) { ? ? ? ? PullResultExt pullResultExt = (PullResultExt) pullResult; ? ? ? ? // 處理MessageQueue對應拉取的brokerId ? ? ? ? this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId()); ? ? ? ? // 省略相關代碼 ? ? ? ? pullResultExt.setMessageBinary(null); ? ? ? ? return pullResult; ? ? } ? ? public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) { ? ? ? ? // 保存在pullFromWhichNodeTable對象中 ? ? ? ? AtomicLong suggest = this.pullFromWhichNodeTable.get(mq); ? ? ? ? if (null == suggest) { ? ? ? ? ? ? this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId)); ? ? ? ? } else { ? ? ? ? ? ? suggest.set(brokerId); ? ? ? ? } ? ? } }
Consumer
收到拉取響應回來的數(shù)據(jù)后,會將下次建議拉取的 brokerId
緩存起來。
public class PullAPIWrapper { ? ? private final InternalLogger log = ClientLogger.getLog(); ? ? private final MQClientInstance mQClientFactory; ? ? private final String consumerGroup; ? ? private final boolean unitMode; ? ? private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable = ? ? ? ? new ConcurrentHashMap<MessageQueue, AtomicLong>(32); ? ? private volatile boolean connectBrokerByUser = false; ? ? private volatile long defaultBrokerId = MixAll.MASTER_ID; ? ? private Random random = new Random(System.currentTimeMillis()); ? ? private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>(); ? ? public PullResult pullKernelImpl( ? ? ? ? final MessageQueue mq, ? ? ? ? final String subExpression, ? ? ? ? final String expressionType, ? ? ? ? final long subVersion, ? ? ? ? final long offset, ? ? ? ? final int maxNums, ? ? ? ? final int sysFlag, ? ? ? ? final long commitOffset, ? ? ? ? final long brokerSuspendMaxTimeMillis, ? ? ? ? final long timeoutMillis, ? ? ? ? final CommunicationMode communicationMode, ? ? ? ? final PullCallback pullCallback ? ? ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { ? ? ? ? // 查找MessageQueue應該從brokerName的哪個節(jié)點查詢 ? ? ? ? FindBrokerResult findBrokerResult = ? ? ? ? ? ? this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), ? ? ? ? ? ? ? ? this.recalculatePullFromWhichNode(mq), false); ? ? ? ? if (null == findBrokerResult) { ? ? ? ? ? ? this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); ? ? ? ? ? ? findBrokerResult = ? ? ? ? ? ? ? ? this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), ? ? ? ? ? ? ? ? ? ? this.recalculatePullFromWhichNode(mq), false); ? ? ? ? } ? ? ? ? if (findBrokerResult != null) { ? ? ? ? ? ? { ? ? ? ? ? ? ? ? // check version ? ? ? ? ? ? ? ? if (!ExpressionType.isTagType(expressionType) ? ? ? ? ? ? ? ? ? ? && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) { ? ? ? ? ? ? ? ? ? ? throw new MQClientException("The broker[" + mq.getBrokerName() + ", " ? ? ? ? ? ? ? ? ? ? ? ? + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } ? ? ? ? ? ? int sysFlagInner = sysFlag; ? ? ? ? ? ? if (findBrokerResult.isSlave()) { ? ? ? ? ? ? ? ? sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner); ? ? ? ? ? ? } ? ? ? ? ? ? PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); ? ? ? ? ? ? requestHeader.setConsumerGroup(this.consumerGroup); ? ? ? ? ? ? requestHeader.setTopic(mq.getTopic()); ? ? ? ? ? ? requestHeader.setQueueId(mq.getQueueId()); ? ? ? ? ? ? requestHeader.setQueueOffset(offset); ? ? ? ? ? ? requestHeader.setMaxMsgNums(maxNums); ? ? ? ? ? ? requestHeader.setSysFlag(sysFlagInner); ? ? ? ? ? ? requestHeader.setCommitOffset(commitOffset); ? ? ? ? ? ? requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); ? ? ? ? ? ? requestHeader.setSubscription(subExpression); ? ? ? ? ? ? requestHeader.setSubVersion(subVersion); ? ? ? ? ? ? requestHeader.setExpressionType(expressionType); ? ? ? ? ? ? String brokerAddr = findBrokerResult.getBrokerAddr(); ? ? ? ? ? ? if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) { ? ? ? ? ? ? ? ? brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr); ? ? ? ? ? ? } ? ? ? ? ? ? PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( ? ? ? ? ? ? ? ? brokerAddr, ? ? ? ? ? ? ? ? requestHeader, ? ? ? ? ? ? ? ? timeoutMillis, ? ? ? ? ? ? ? ? communicationMode, ? ? ? ? ? ? ? ? pullCallback); ? ? ? ? ? ? return pullResult; ? ? ? ? } ? ? ? ? throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); ? ? } ? ? public long recalculatePullFromWhichNode(final MessageQueue mq) { ? ? ? ? if (this.isConnectBrokerByUser()) { ? ? ? ? ? ? return this.defaultBrokerId; ? ? ? ? } ? ? ? ? AtomicLong suggest = this.pullFromWhichNodeTable.get(mq); ? ? ? ? if (suggest != null) { ? ? ? ? ? ? return suggest.get(); ? ? ? ? } ? ? ? ? return MixAll.MASTER_ID; ? ? } }
Consumer
拉取消息的時候會從 pullFromWhichNodeTable
中取出拉取 brokerId確定去具體的broker進行查詢。
到此這篇關于RocketMQ設計之主從復制和讀寫分離的文章就介紹到這了,更多相關RocketMQ從復制和讀寫分離內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Mybatis批量插入數(shù)據(jù)的兩種方式總結與對比
批量插入功能是我們日常工作中比較常見的業(yè)務功能之一,下面這篇文章主要給大家介紹了關于Mybatis批量插入數(shù)據(jù)的兩種方式總結與對比的相關資料,文中通過實例代碼介紹的非常詳細,需要的朋友可以參考下2023-01-01Java基礎之重載(Overload)與重寫(Override)詳解
這篇文章主要介紹了Java基礎之重載(Overload)與重寫(Override)詳解,文中有非常詳細的代碼示例,對正在學習java基礎的小伙伴們有非常好的幫助,需要的朋友可以參考下2021-04-04Java/Android 實現(xiàn)簡單的HTTP服務器
這篇文章主要介紹了Java/Android 如何實現(xiàn)簡單的HTTP服務器,幫助大家更好的進行功能測試,感興趣的朋友可以了解下2020-10-10MyBatis通過JDBC數(shù)據(jù)驅動生成的執(zhí)行語句問題
這篇文章主要介紹了MyBatis通過JDBC數(shù)據(jù)驅動生成的執(zhí)行語句問題的相關資料,非常不錯,具有參考借鑒價值,需要的朋友可以參考下2016-08-08