欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ事務(wù)消息保證消息的可靠性和一致性

 更新時間:2023年04月23日 11:36:12   作者:Acqierement  
RocketMQ事務(wù)消息是一種能夠保證消息傳遞的可靠性和一致性的消息傳遞模式。它通過引入“半消息”和“事務(wù)狀態(tài)”機(jī)制,實現(xiàn)了消息發(fā)送和本地事務(wù)執(zhí)行的原子性,從而確保了消息的可靠性和一致性

這篇講解一下rocketMq的事務(wù)消息的原理

在發(fā)送事務(wù)消息的時候,會加一個標(biāo)識,表示這個消息是事務(wù)消息。broker接收到消息后,在我們之前看的代碼里org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage會判斷是否是事務(wù)消息。

if (sendTransactionPrepareMessage) {
    asyncPutMessageFuture = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
    asyncPutMessageFuture = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}

sendTransactionPrepareMessage=true表示是事務(wù)消息,所以走了一個單獨的邏輯。

    public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {
        return store.asyncPutMessage(parseHalfMessageInner(messageInner));
    }

這里parseHalfMessageInner這個方法里面開始了偷梁換柱,把topic和queueId都改了,把原本的信息先存在變量里面。所以實際上這個消息發(fā)到了半消息專有的topic里面,topic名字叫做RMQ_SYS_TRANS_HALF_TOPIC

    private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
            String.valueOf(msgInner.getQueueId()));
        msgInner.setSysFlag(
            MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
        msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
        msgInner.setQueueId(0);
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
        return msgInner;
    }

然后其他代碼還是和普通的消息一樣,就是把事務(wù)消息做了轉(zhuǎn)發(fā),存在了RMQ_SYS_TRANS_HALF_TOPIC里面。

到這里發(fā)送半消息就成功了,然后最后客戶端發(fā)送了半消息之后,會查一下本地事務(wù)的情況是否完成。這里有3種情況:commit、rollback、未知。完成和回滾都是確認(rèn)的狀態(tài),這個比較好處理,比較難的是未知。我們先看能得到確認(rèn)結(jié)果的情況。

如果完成和回滾,會給客戶端發(fā)送結(jié)束事務(wù)的消息,這個消息叫END_TRANSACTION,包括消息里面包括了之前發(fā)送的半消息的id和offset。

broker處理的代碼在org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest中。就是根據(jù)offset拿到半消息,然后如果是commit,就是把原本的topic和queueId還原,發(fā)到原本的隊列里面,這樣就可以正常消費了。然后把這個半消息“刪除”。如果是rollBack,也是拿到這個半消息,然后直接“刪除”就可以了。接下來看一下怎么“刪除”。

為什么我刪除會打引號呢?因為半消息其實就是跟正常的消息一樣,存在commitLog文件里面,mq的設(shè)計,就沒有刪除這個功能。所以所謂的刪除其實就是把這個消息消費掉,不做任何處理,就是刪除了。

想象一下,這個半消息有commit/rollBack/未知,3種狀態(tài),未知的肯定不能刪除,那他怎么知道哪些消息是可以刪除的呢?總不能所有的都再去客戶端查一下事務(wù)的結(jié)果吧?mq怎么做的呢?前面提到的刪除其實就是把這些commit和rollBack處理過后的半消息,再保存起來,后面消費半消息的數(shù)據(jù)的時候,只要從里面查一下是否需要刪除就可以了。

這里又有一個問題,怎么把需要刪除的半消息存起來呢?mq存儲數(shù)據(jù)就是commitLog,所以其實這些需要刪除的數(shù)據(jù),就是又發(fā)到了一個特定的topic里面。這個topic名字是RMQ_SYS_TRANS_OP_HALF_TOPIC。主意區(qū)分,原本半消息的topic名字是half_topic,這個topic名字是op_half_topic,存儲的是處理過后,可以刪除的半消息。

所以說前面提到的帶引號的“刪除”,就是把消息發(fā)到op_half_topic就表示是刪除了,這個op_half_topic消息的內(nèi)容就是half_topic的offset。那么現(xiàn)在需要有個地方,來消費half_topic,然后判斷是否存在于op_half_topic,如果是表示可以刪除了,如果不是,就接著保存起來。

處理邏輯就在TransactionalMessageCheckService這個定時任務(wù)中。具體是在TransactionalMessageServiceImpl#check方法里面

    @Override
    public void check(long transactionTimeout, int transactionCheckMax,
        AbstractTransactionalMessageCheckListener listener) {
        try {
            String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
            // 先拿到半消息
            Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
            if (msgQueues == null || msgQueues.size() == 0) {
                log.warn("The queue of topic is empty :" + topic);
                return;
            }
            log.debug("Check topic={}, queues={}", topic, msgQueues);
            for (MessageQueue messageQueue : msgQueues) {
                long startTime = System.currentTimeMillis();
                MessageQueue opQueue = getOpQueue(messageQueue);
                // 拿到半消息的最小偏移量
                long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
                // 拿到op_half的最小偏移量
                long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
                log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
                if (halfOffset < 0 || opOffset < 0) {
                    log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
                        halfOffset, opOffset);
                    continue;
                }
                List<Long> doneOpOffset = new ArrayList<>();
                HashMap<Long, Long> removeMap = new HashMap<>();
                // 拉取op的消息(32條),op消息內(nèi)容是half的offset,跟half_topic的最小offset比較,如果op的小于最小的,就說明已經(jīng)處理過了,放在doneOpOffset,反之,則說明還沒處理過,就先放在removeMap里面
                PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
                if (null == pullResult) {
                    log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
                        messageQueue, halfOffset, opOffset);
                    continue;
                }
                // single thread
                int getMessageNullCount = 1;
                long newOffset = halfOffset;
                long i = halfOffset;
                // 然后對half_topic進(jìn)行處理
                while (true) {
                    if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
                        log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
                        break;
                    }
                    // 如果這個offset已經(jīng)處理過了,就接著處理下一個
                    if (removeMap.containsKey(i)) {
                        log.debug("Half offset {} has been committed/rolled back", i);
                        Long removedOpOffset = removeMap.remove(i);
                        doneOpOffset.add(removedOpOffset);
                    } else {
                        // 如果沒有處理過,就要把數(shù)據(jù)撈出來重新投遞
                        GetResult getResult = getHalfMsg(messageQueue, i);
                        MessageExt msgExt = getResult.getMsg();
                        if (msgExt == null) {
                            if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
                                break;
                            }
                            if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
                                log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
                                    messageQueue, getMessageNullCount, getResult.getPullResult());
                                break;
                            } else {
                                log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
                                    i, messageQueue, getMessageNullCount, getResult.getPullResult());
                                i = getResult.getPullResult().getNextBeginOffset();
                                newOffset = i;
                                continue;
                            }
                        }
                        if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
                            listener.resolveDiscardMsg(msgExt);
                            newOffset = i + 1;
                            i++;
                            continue;
                        }
                        if (msgExt.getStoreTimestamp() >= startTime) {
                            log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
                                new Date(msgExt.getStoreTimestamp()));
                            break;
                        }
                        long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
                        long checkImmunityTime = transactionTimeout;
                        String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
                        if (null != checkImmunityTimeStr) {
                            checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
                            if (valueOfCurrentMinusBorn < checkImmunityTime) {
                                if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
                                    newOffset = i + 1;
                                    i++;
                                    continue;
                                }
                            }
                        } else {
                            if (0 <= valueOfCurrentMinusBorn && valueOfCurrentMinusBorn < checkImmunityTime) {
                                log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
                                    checkImmunityTime, new Date(msgExt.getBornTimestamp()));
                                break;
                            }
                        }
                        List<MessageExt> opMsg = pullResult.getMsgFoundList();
                        boolean isNeedCheck = opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime
                            || opMsg != null && opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout
                            || valueOfCurrentMinusBorn <= -1;
                        if (isNeedCheck) {
                            // 重新投遞
                            if (!putBackHalfMsgQueue(msgExt, i)) {
                                continue;
                            }
                            // 再重新確認(rèn)事務(wù)
                            listener.resolveHalfMsg(msgExt);
                        } else {
                            pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
                            log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
                                messageQueue, pullResult);
                            continue;
                        }
                    }
                    newOffset = i + 1;
                    i++;
                }
                // 更新offset
                if (newOffset != halfOffset) {
                    transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
                }
                long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
                if (newOpOffset != opOffset) {
                    transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
                }
            }
        } catch (Throwable e) {
            log.error("Check error", e);
        }
    }

我講解一下這個代碼做了啥。我們先明確這個代碼是要實現(xiàn)什么功能。就是消費half_topic,然后去根據(jù)op_half_topic的數(shù)據(jù)來判斷half_topc的消息是否被處理過,處理過了就直接忽略、丟棄,如果沒有處理過,就“保留”這個消息,等待后面事務(wù)確認(rèn)了再處理。

這里“保留”我也是加了引號,因為mq消費是一條一條按順序消費,如果中間有一個數(shù)據(jù)卡住了,后面數(shù)據(jù)就沒法消費了。所以這里“保留”,其實也是消費了,只是他消費到了不確定結(jié)果的消息,他是重新投遞到了half_topic,來實現(xiàn)“保留”的目的。

好了,明確了這個代碼實現(xiàn)的功能,我們來一步步看一下細(xì)節(jié)。

首先是拿到half_topic和op_half_topic的offset,知道現(xiàn)在是消費到了哪里。然后去拉取op_half_topic,每次32條,op_half消息內(nèi)容存的是half_topic的offset,只要判斷這條op_half里面的offset小于half_topic的offset,就表示已經(jīng)消費過了,放在doneOpOffset的list里面,如果op_half保存的offset大于half_topic的offset,就表示還沒消費,放入removeMap,就表示這個半消息可以放心刪除了。

這一步,通過消費op_half,跟half_topic的minOffset做比較,構(gòu)建了doneOpOffset,和removeMap。

然后就是消費half_topic的消息,只要判斷每條消息的offset是否在removeMap中,就表示可以刪除,放入doneOpOffset中,直接消費下一條數(shù)據(jù),所以這里其實也不用真的拉取half_topic的消息,只要用offset來判斷就行,消費過了,offset+1,就可以去判斷下一條消息。

如果half_topic的offset沒有在removeMap中,就表示暫時還不知道結(jié)果,這時候就重新發(fā)送到half_topic,重新投遞之后,然后給客戶端發(fā)送一個檢查事務(wù)的請求,客戶端檢測過后,還是用之前的END_TRANSACTION命令,再發(fā)給broker,broker就會放到op_half里面,等于就是重新發(fā)了一個半消息的流程,實現(xiàn)了閉環(huán)。

最后就是更新兩個topic的offset了。之前的doneOpOffset保存下來,就是為了更新op_half的offset,只有都處理過了,才會更新,如果中間有一個沒有處理,就會阻塞在那條消息。

總結(jié):

所以現(xiàn)在的情況是這樣的,對于half_topic的半消息如果有結(jié)果就忽略,如果沒有結(jié)果就重新投遞,不會阻塞,所以half_topic的offset會一直往后更新。但是op_half要等所有的都done了,才會更新offset。假設(shè)一種情況,如果op_offset1對于的是half_offset1這個消息,然后half_offset1剛好被消費,重新投遞了。這是op_offset1找不到對應(yīng)的半消息,所以不會被消費。但是不會被卡主,等到下次的時候,op_offset1這個數(shù)據(jù)的offset已經(jīng)小于half_offset1這個消息的offset,所以這個op_offset1也會當(dāng)做已經(jīng)處理過了。

可以看到整個過程其實很巧妙,大家可以結(jié)合代碼捋一捋。

到此這篇關(guān)于RocketMQ事務(wù)消息保證消息的可靠性和一致性的文章就介紹到這了,更多相關(guān)RocketMQ事務(wù)消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java Spring開發(fā)環(huán)境搭建及簡單入門示例教程

    Java Spring開發(fā)環(huán)境搭建及簡單入門示例教程

    這篇文章主要介紹了Java Spring開發(fā)環(huán)境搭建及簡單入門示例,結(jié)合實例形式分析了spring環(huán)境搭建、配置、使用方法及相關(guān)注意事項,需要的朋友可以參考下
    2017-11-11
  • Java使用Junit4.jar進(jìn)行單元測試的方法

    Java使用Junit4.jar進(jìn)行單元測試的方法

    今天通過本文給大家介紹Java使用Junit4.jar進(jìn)行單元測試的方法,本文通過圖文實例相結(jié)合給大家介紹的非常詳細(xì),需要的朋友參考下吧
    2021-11-11
  • Spring Boot整合Spring Security簡單實現(xiàn)登入登出從零搭建教程

    Spring Boot整合Spring Security簡單實現(xiàn)登入登出從零搭建教程

    這篇文章主要給大家介紹了關(guān)于Spring Boot整合Spring Security簡單實現(xiàn)登入登出從零搭建的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面來一起看看吧
    2018-09-09
  • Java?循環(huán)隊列/環(huán)形隊列的實現(xiàn)流程

    Java?循環(huán)隊列/環(huán)形隊列的實現(xiàn)流程

    循環(huán)隊列又叫環(huán)形隊列,是一種特殊的隊列。循環(huán)隊列解決了隊列出隊時需要將所有數(shù)據(jù)前移一位的問題。本文將帶大家詳細(xì)了解循環(huán)隊列如何實現(xiàn),需要的朋友可以參考一下
    2022-02-02
  • lombok插件無法使用的原因及解決方案

    lombok插件無法使用的原因及解決方案

    這篇文章主要介紹了lombok插件無法使用的原因及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-08-08
  • Spring注解之@Import的簡單介紹

    Spring注解之@Import的簡單介紹

    @Import是Spring基于Java注解配置的主要組成部分,下面這篇文章主要給大家介紹了關(guān)于Spring注解之@Import的簡單介紹,文中通過示例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2022-12-12
  • Mybatis中使用updateBatch進(jìn)行批量更新

    Mybatis中使用updateBatch進(jìn)行批量更新

    這篇文章主要介紹了Mybatis中使用updateBatch進(jìn)行批量更新的相關(guān)資料,有逐條更新,sql批量更新等,具體實例代碼大家參考下本文
    2018-04-04
  • Spring boot項目redisTemplate實現(xiàn)輕量級消息隊列的方法

    Spring boot項目redisTemplate實現(xiàn)輕量級消息隊列的方法

    這篇文章主要給大家介紹了關(guān)于Spring boot項目redisTemplate實現(xiàn)輕量級消息隊列的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家學(xué)習(xí)或者使用Spring boot具有一定的參考學(xué)習(xí)價值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-04-04
  • iOS獲取AppIcon and LaunchImage''s name(app圖標(biāo)和啟動圖片名字)

    iOS獲取AppIcon and LaunchImage''s name(app圖標(biāo)和啟動圖片名字)

    這篇文章主要介紹了iOS獲取AppIcon and LaunchImage's name(app圖標(biāo)和啟動圖片名字)的相關(guān)資料,非常不錯,具有參考借鑒價值,感興趣的朋友一起學(xué)習(xí)吧
    2016-08-08
  • 圖解Spring Security 中用戶是如何實現(xiàn)登錄的

    圖解Spring Security 中用戶是如何實現(xiàn)登錄的

    這篇文章主要介紹了圖解Spring Security 中用戶是如何實現(xiàn)登錄的,文中通過示例代碼和圖片介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-07-07

最新評論