欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ設計之主從復制和讀寫分離

 更新時間:2022年03月21日 10:46:35   作者:周杰倫本人  
這篇文章主要介紹了RocketMQ設計之主從復制和讀寫分離,RocketMQ提高消費避免Broker發(fā)生單點故障引起B(yǎng)roker上的消息無法及時消費,下文關于了RocketMQ的相關內容,需要的小伙伴可以參考一下

一、主從復制

RocketMQ為了提高消費的高可用性,避免Broker發(fā)生單點故障引起B(yǎng)roker上的消息無法及時消費,同時避免單個機器上硬盤壞損出現(xiàn)消費數(shù)據(jù)丟失。

RocketMQ采用Broker數(shù)據(jù)主從復制機制,當消息發(fā)送到Master服務器后會將消息同步到Slave服務器,如果Master服務器宕機,消息消費者還可以繼續(xù)從Slave拉取消息。

消息從Master服務器復制到Slave服務器上,有兩種復制方式:同步復制SYNC_MASTER和異步復制ASYNC_MASTER。

通過配置文件conf/broker.conf文件配置:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. ?See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. ?You may obtain a copy of the License at
#
# ? ? http://www.apache.org/licenses/LICENSE-2.0
#
# ?Unless required by applicable law or agreed to in writing, software
# ?distributed under the License is distributed on an "AS IS" BASIS,
# ?WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# ?See the License for the specific language governing permissions and
# ?limitations under the License.

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH

對brokerRole參數(shù)進行設置:

同步復制:Master和Slave都寫成功后才返回客戶端寫成功的狀態(tài)。

  • 優(yōu)點:Master服務器出現(xiàn)故障,Slave服務器上有全部數(shù)據(jù)的備份,很容易恢復到Master服務器。
  • 缺點:由于多了一個同步等待的步驟,增加數(shù)據(jù)寫入延遲,降低系統(tǒng)吞吐量。

異步復制:僅Master服務器寫成功即可返回給客戶端寫成功的狀態(tài)。

  • 優(yōu)點:沒有同步等待的步驟,低延遲,高吞吐。
  • 缺點:如果Master服務器出現(xiàn)故障,有些數(shù)據(jù)可能未寫入Slave服務器,未同步的數(shù)據(jù)可能丟失

實際應用中,需要結合業(yè)務場景,合理設置刷盤方式和主從復制方式。不建議使用同步刷盤方式,因為它頻繁觸發(fā)寫磁盤操作,性能下降很明顯。**通常把MasterSlave設置為異步刷盤,同步復制,保證數(shù)據(jù)不丟失。**這樣即使一臺服務器出故障,仍然可以保證數(shù)據(jù)不丟失。

二、讀寫分離

讀寫分離機制是高性能、高可用架構中常見的設計,例如Mysql實現(xiàn)讀寫分離機制,Client只能從Master服務器寫數(shù)據(jù),可以從Master服務器和Slave服務器都讀數(shù)據(jù)。

RocketMQ的Consumer在拉取消息時,Broker會判斷Master服務器的消息堆積量來決定Consumer是否從Slave服務器拉取消息消費。默認一開始從Master服務器拉群消息,如果Master服務器的消息堆積超過物理內存40%,則會返回給Consumer的消息結果并告知Consumer,下次從其他Slave服務器上拉取消息。

RocketMQ 有屬于自己的一套讀寫分離邏輯,會判斷主服務器的消息堆積量來決定消費者是否向從服務器拉取消息消費。

Consumer在向 Broker 發(fā)送消息拉取請求時,會根據(jù)篩選出來的消息隊列,判定是從Master,還是從Slave拉取消息,默認是Master。

Broker 接收到消息消費者拉取請求,在獲取本地堆積的消息量后,會計算服務器的消息堆積量是否大于物理內存的一定值,如果是,則標記下次從 Slave服務器拉取,計算 Slave服務器的 Broker Id,并響應給消費者。

Consumer在接收到 Broker的響應后,會把消息隊列與建議下一次拉取節(jié)點的 Broker Id 關聯(lián)起來,并緩存在內存中,以便下次拉取消息時,確定從哪個節(jié)點發(fā)送請求。

public class GetMessageResult {

? ? private final List<SelectMappedBufferResult> messageMapedList =
? ? ? ? new ArrayList<SelectMappedBufferResult>(100);
? ? private final List<ByteBuffer> messageBufferList = new ArrayList<ByteBuffer>(100);
? ? private GetMessageStatus status;
? ? private long nextBeginOffset;
? ? private long minOffset;
? ? private long maxOffset;
? ? private int bufferTotalSize = 0;
? ? // 標識是否通過Slave拉拉取消息
? ? private boolean suggestPullingFromSlave = false;
? ? private int msgCount4Commercial = 0;
}

// 針對消息堆積量過大會切換到Slave進行查詢。
// maxOffsetPy 為當前最大物理偏移量,maxPhyOffsetPulling 為本次消息拉取最大物理偏移量,他們的差即可表示消息堆積量。
// TOTAL_PHYSICAL_MEMORY_SIZE 表示當前系統(tǒng)物理內存,accessMessageInMemoryMaxRatio 的默認值為 40,
// 以上邏輯即可算出當前消息堆積量是否大于物理內存的 40%,如果大于則將 suggestPullingFromSlave 設置為 true。

long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
? ? * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);
  • 決定消費者是否向從服務器拉取消息消費的值存在 GetMessageResult 類中。
  • suggestPullingFromSlave的默認值為 false,即默認消費者不會消費從服務器,但它會在消費者發(fā)送消息拉取請求時,動態(tài)改變該值,Broker 接收、處理消費者拉取消息請求。
  • 針對本MessageQueue消息堆積量過大會切換到Slave進行查詢,maxOffsetPy 為當前最大物理偏移量,maxPhyOffsetPulling 為本次消息拉取最大物理偏移量,他們的差即可表示消息堆積量,當前消息堆積量是否大于物理內存的 40%就會切換到Slave進行查詢。
public class PullMessageResponseHeader implements CommandCustomHeader {
? ? // suggestWhichBrokerId標識從哪個broker進行查詢
? ? private Long suggestWhichBrokerId;
? ? private Long nextBeginOffset;
? ? private Long minOffset;
? ? private Long maxOffset;
}


public class PullMessageProcessor implements NettyRequestProcessor {

? ? private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
? ? ? ? throws RemotingCommandException {
? ? ? ? RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
? ? ? ? final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
? ? ? ? final PullMessageRequestHeader requestHeader =
? ? ? ? ? ? (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);

? ? ? ? response.setOpaque(request.getOpaque());

? ? ? ? final GetMessageResult getMessageResult =
? ? ? ? ? ? this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
? ? ? ? ? ? ? ? requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);

? ? ? ? if (getMessageResult != null) {
? ? ? ? ? ? response.setRemark(getMessageResult.getStatus().name());
? ? ? ? ? ? responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
? ? ? ? ? ? responseHeader.setMinOffset(getMessageResult.getMinOffset());
? ? ? ? ? ? responseHeader.setMaxOffset(getMessageResult.getMaxOffset());

? ? ? ? ? ? // 建議從slave消費消息
? ? ? ? ? ? if (getMessageResult.isSuggestPullingFromSlave()) {
? ? ? ? ? ? ? ? // 從slave查詢
? ? ? ? ? ? ? ? responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? // 從master查詢
? ? ? ? ? ? ? ? responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
? ? ? ? ? ? }

? ? ? ? ? ? switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
? ? ? ? ? ? ? ? case ASYNC_MASTER:
? ? ? ? ? ? ? ? case SYNC_MASTER:
? ? ? ? ? ? ? ? ? ? break;
? ? ? ? ? ? ? ? case SLAVE:
? ? ? ? ? ? ? ? ? ? // 針對SLAVE需要判斷是否可讀,不可讀的情況下讀MASTER
? ? ? ? ? ? ? ? ? ? if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
? ? ? ? ? ? ? ? ? ? ? ? response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
? ? ? ? ? ? ? ? ? ? ? ? responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? break;
? ? ? ? ? ? }

? ? ? ? ? ? if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
? ? ? ? ? ? ? ? // consume too slow ,redirect to another machine
? ? ? ? ? ? ? ? if (getMessageResult.isSuggestPullingFromSlave()) {
? ? ? ? ? ? ? ? ? ? responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? // consume ok
? ? ? ? ? ? ? ? else {
? ? ? ? ? ? ? ? ? ? responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
? ? ? ? ? ? ? ? }
? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
? ? ? ? ? ? }
? ? ? ? }

? ? ? ? return response;
? ? }
}

PullMessageResponseHeadersuggestWhichBrokerId標識某個MessageQueue的消息從具體的brokerId進行查詢。
針對Slave不可讀的情況會設置為從MASTER_ID進行查詢。

public class PullAPIWrapper {
? ? private final InternalLogger log = ClientLogger.getLog();
? ? private final MQClientInstance mQClientFactory;
? ? private final String consumerGroup;
? ? private final boolean unitMode;
? ? private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
? ? ? ? new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
? ? private volatile boolean connectBrokerByUser = false;
? ? private volatile long defaultBrokerId = MixAll.MASTER_ID;
? ? private Random random = new Random(System.currentTimeMillis());
? ? private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();

? ? public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
? ? ? ? final SubscriptionData subscriptionData) {
? ? ? ? PullResultExt pullResultExt = (PullResultExt) pullResult;

? ? ? ? // 處理MessageQueue對應拉取的brokerId
? ? ? ? this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());

? ? ? ? // 省略相關代碼

? ? ? ? pullResultExt.setMessageBinary(null);

? ? ? ? return pullResult;
? ? }

? ? public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) {
? ? ? ? // 保存在pullFromWhichNodeTable對象中
? ? ? ? AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
? ? ? ? if (null == suggest) {
? ? ? ? ? ? this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId));
? ? ? ? } else {
? ? ? ? ? ? suggest.set(brokerId);
? ? ? ? }
? ? }
}

Consumer收到拉取響應回來的數(shù)據(jù)后,會將下次建議拉取的 brokerId緩存起來。

public class PullAPIWrapper {
? ? private final InternalLogger log = ClientLogger.getLog();
? ? private final MQClientInstance mQClientFactory;
? ? private final String consumerGroup;
? ? private final boolean unitMode;
? ? private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
? ? ? ? new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
? ? private volatile boolean connectBrokerByUser = false;
? ? private volatile long defaultBrokerId = MixAll.MASTER_ID;
? ? private Random random = new Random(System.currentTimeMillis());
? ? private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();

? ? public PullResult pullKernelImpl(
? ? ? ? final MessageQueue mq,
? ? ? ? final String subExpression,
? ? ? ? final String expressionType,
? ? ? ? final long subVersion,
? ? ? ? final long offset,
? ? ? ? final int maxNums,
? ? ? ? final int sysFlag,
? ? ? ? final long commitOffset,
? ? ? ? final long brokerSuspendMaxTimeMillis,
? ? ? ? final long timeoutMillis,
? ? ? ? final CommunicationMode communicationMode,
? ? ? ? final PullCallback pullCallback
? ? ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

? ? ? ? // 查找MessageQueue應該從brokerName的哪個節(jié)點查詢
? ? ? ? FindBrokerResult findBrokerResult =
? ? ? ? ? ? this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
? ? ? ? ? ? ? ? this.recalculatePullFromWhichNode(mq), false);

? ? ? ? if (null == findBrokerResult) {
? ? ? ? ? ? this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
? ? ? ? ? ? findBrokerResult =
? ? ? ? ? ? ? ? this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
? ? ? ? ? ? ? ? ? ? this.recalculatePullFromWhichNode(mq), false);
? ? ? ? }

? ? ? ? if (findBrokerResult != null) {
? ? ? ? ? ? {
? ? ? ? ? ? ? ? // check version
? ? ? ? ? ? ? ? if (!ExpressionType.isTagType(expressionType)
? ? ? ? ? ? ? ? ? ? && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
? ? ? ? ? ? ? ? ? ? throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
? ? ? ? ? ? ? ? ? ? ? ? + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? int sysFlagInner = sysFlag;

? ? ? ? ? ? if (findBrokerResult.isSlave()) {
? ? ? ? ? ? ? ? sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
? ? ? ? ? ? }

? ? ? ? ? ? PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
? ? ? ? ? ? requestHeader.setConsumerGroup(this.consumerGroup);
? ? ? ? ? ? requestHeader.setTopic(mq.getTopic());
? ? ? ? ? ? requestHeader.setQueueId(mq.getQueueId());
? ? ? ? ? ? requestHeader.setQueueOffset(offset);
? ? ? ? ? ? requestHeader.setMaxMsgNums(maxNums);
? ? ? ? ? ? requestHeader.setSysFlag(sysFlagInner);
? ? ? ? ? ? requestHeader.setCommitOffset(commitOffset);
? ? ? ? ? ? requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
? ? ? ? ? ? requestHeader.setSubscription(subExpression);
? ? ? ? ? ? requestHeader.setSubVersion(subVersion);
? ? ? ? ? ? requestHeader.setExpressionType(expressionType);

? ? ? ? ? ? String brokerAddr = findBrokerResult.getBrokerAddr();
? ? ? ? ? ? if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
? ? ? ? ? ? ? ? brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
? ? ? ? ? ? }

? ? ? ? ? ? PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
? ? ? ? ? ? ? ? brokerAddr,
? ? ? ? ? ? ? ? requestHeader,
? ? ? ? ? ? ? ? timeoutMillis,
? ? ? ? ? ? ? ? communicationMode,
? ? ? ? ? ? ? ? pullCallback);

? ? ? ? ? ? return pullResult;
? ? ? ? }

? ? ? ? throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
? ? }


? ? public long recalculatePullFromWhichNode(final MessageQueue mq) {
? ? ? ? if (this.isConnectBrokerByUser()) {
? ? ? ? ? ? return this.defaultBrokerId;
? ? ? ? }

? ? ? ? AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
? ? ? ? if (suggest != null) {
? ? ? ? ? ? return suggest.get();
? ? ? ? }

? ? ? ? return MixAll.MASTER_ID;
? ? }
}

Consumer拉取消息的時候會從 pullFromWhichNodeTable 中取出拉取 brokerId確定去具體的broker進行查詢。

到此這篇關于RocketMQ設計之主從復制和讀寫分離的文章就介紹到這了,更多相關RocketMQ從復制和讀寫分離內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • 一文帶你初識java中的String類

    一文帶你初識java中的String類

    String代表字符串,Java 程序中的所有字符串字面值(如 "abc" )都作為此類的實例實現(xiàn),這篇文章主要給大家介紹了關于java中String類的相關資料,需要的朋友可以參考下
    2021-10-10
  • Mybatis批量插入數(shù)據(jù)的兩種方式總結與對比

    Mybatis批量插入數(shù)據(jù)的兩種方式總結與對比

    批量插入功能是我們日常工作中比較常見的業(yè)務功能之一,下面這篇文章主要給大家介紹了關于Mybatis批量插入數(shù)據(jù)的兩種方式總結與對比的相關資料,文中通過實例代碼介紹的非常詳細,需要的朋友可以參考下
    2023-01-01
  • 淺談Java絕對布局

    淺談Java絕對布局

    這篇文章主要介紹了Java當中的絕對布局,還舉了一個簡單的實例,需要的朋友可以參考下。
    2017-08-08
  • Java集合Set的簡單使用解析

    Java集合Set的簡單使用解析

    這篇文章主要介紹了Java集合Set的簡單使用解析,Set接口是Collection的子接口,Set接口相較于Collection接口沒有提供額外的方法,Set 集合不允許包含相同的元素,如果試把兩個相同的元素加入同一個 Set 集合中,則添加操作失敗,需要的朋友可以參考下
    2023-11-11
  • MyBatis配置與CRUD超詳細講解

    MyBatis配置與CRUD超詳細講解

    這篇文章主要介紹了MyBatis配置與CRUD,CRUD是指在做計算處理時的增加(Create)、讀取(Read)、更新(Update)和刪除(Delete)幾個單詞的首字母簡寫。CRUD主要被用在描述軟件系統(tǒng)中數(shù)據(jù)庫或者持久層的基本操作功能
    2023-02-02
  • Java基礎之重載(Overload)與重寫(Override)詳解

    Java基礎之重載(Overload)與重寫(Override)詳解

    這篇文章主要介紹了Java基礎之重載(Overload)與重寫(Override)詳解,文中有非常詳細的代碼示例,對正在學習java基礎的小伙伴們有非常好的幫助,需要的朋友可以參考下
    2021-04-04
  • Java/Android 實現(xiàn)簡單的HTTP服務器

    Java/Android 實現(xiàn)簡單的HTTP服務器

    這篇文章主要介紹了Java/Android 如何實現(xiàn)簡單的HTTP服務器,幫助大家更好的進行功能測試,感興趣的朋友可以了解下
    2020-10-10
  • mybatis返回的map結果如何設置有序

    mybatis返回的map結果如何設置有序

    這篇文章主要介紹了mybatis返回的map結果如何設置有序,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-01-01
  • Java超詳細介紹抽象類與接口的使用

    Java超詳細介紹抽象類與接口的使用

    在類中沒有包含足夠的信息來描繪一個具體的對象,這樣的類稱為抽象類,接口是Java中最重要的概念之一,它可以被理解為一種特殊的類,不同的是接口的成員沒有執(zhí)行體,是由全局常量和公共的抽象方法所組成,本文給大家介紹Java抽象類和接口,感興趣的朋友一起看看吧
    2022-05-05
  • MyBatis通過JDBC數(shù)據(jù)驅動生成的執(zhí)行語句問題

    MyBatis通過JDBC數(shù)據(jù)驅動生成的執(zhí)行語句問題

    這篇文章主要介紹了MyBatis通過JDBC數(shù)據(jù)驅動生成的執(zhí)行語句問題的相關資料,非常不錯,具有參考借鑒價值,需要的朋友可以參考下
    2016-08-08

最新評論