欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

深入淺出RocketMQ的事務(wù)消息

 更新時間:2023年04月09日 12:00:28   作者:小王曾是少年  
RocketMQ事務(wù)消息(Transactional?Message)是指應(yīng)用本地事務(wù)和發(fā)送消息操作可以被定義到全局事務(wù)中,要么同時成功,要么同時失敗。本文主要介紹了RocketMQ事務(wù)消息的相關(guān)知識,需要的可以參考一下

事務(wù)消息發(fā)送流程

半消息實(shí)現(xiàn)了分布式環(huán)境下的數(shù)據(jù)一致性的處理,生產(chǎn)者發(fā)送事務(wù)消息的流程如上圖所示,通過對源碼的學(xué)習(xí),我們可以弄清楚下面幾點(diǎn),也是半消息機(jī)制的核心:

1.為什么prepare消息不會被Consumer消費(fèi)?

2.事務(wù)消息是如何提交和回滾的?

3.定時回查本地事務(wù)狀態(tài)的實(shí)現(xiàn)細(xì)節(jié)。

發(fā)送事務(wù)消息源碼分析

發(fā)送事務(wù)消息方法TransactionMQProducer.sendMessageInTransaction:

  • msg:消息
  • tranExecuter:本地事務(wù)執(zhí)行器
  • arg:本地事務(wù)執(zhí)行器參數(shù)
public TransactionSendResult sendMessageInTransaction(final Message msg,
        final LocalTransactionExecuter localTransactionExecuter, final Object arg)
        throws MQClientException {
        TransactionListener transactionListener = getCheckListener();
        if (null == localTransactionExecuter && null == transactionListener) {
            throw new MQClientException("tranExecutor is null", null);
        }

        // 忽視消息延遲的屬性
        if (msg.getDelayTimeLevel() != 0) {
            MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        }

        Validators.checkMessage(msg, this.defaultMQProducer);
		
		// 發(fā)送半消息
        SendResult sendResult = null;
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
        try {
            sendResult = this.send(msg);
        } catch (Exception e) {
            throw new MQClientException("send message Exception", e);
        }
		
		// 處理發(fā)送半消息的結(jié)果
        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable localException = null;
        switch (sendResult.getSendStatus()) {
        	// 發(fā)送半消息成功,執(zhí)行本地事務(wù)邏輯
            case SEND_OK: {
                try {
                    if (sendResult.getTransactionId() != null) {
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }
                    String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                    if (null != transactionId && !"".equals(transactionId)) {
                        msg.setTransactionId(transactionId);
                    }
                    // 執(zhí)行本地事務(wù)邏輯
                    if (null != localTransactionExecuter) {
                        localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                    } else if (transactionListener != null) {
                        log.debug("Used new transaction API");
                        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                    }
                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }

                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                        log.info("executeLocalTransactionBranch return {}", localTransactionState);
                        log.info(msg.toString());
                    }
                } catch (Throwable e) {
                    log.info("executeLocalTransactionBranch exception", e);
                    log.info(msg.toString());
                    localException = e;
                }
            }
            break;
            // 發(fā)送半消息失敗,標(biāo)記本地事務(wù)狀態(tài)為回滾
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
            default:
                break;
        }
		
		// 結(jié)束事務(wù),設(shè)置消息 COMMIT / ROLLBACK
        try {
            this.endTransaction(msg, sendResult, localTransactionState, localException);
        } catch (Exception e) {
            log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
        }
		
		// 返回事務(wù)發(fā)送結(jié)果
        TransactionSendResult transactionSendResult = new TransactionSendResult();
        transactionSendResult.setSendStatus(sendResult.getSendStatus());
        transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
        
        // 提取Prepared消息的uniqID
        transactionSendResult.setMsgId(sendResult.getMsgId());
        transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
        transactionSendResult.setTransactionId(sendResult.getTransactionId());
        transactionSendResult.setLocalTransactionState(localTransactionState);
        return transactionSendResult;
    }

該方法的入?yún)幸粋€需要用戶實(shí)現(xiàn)本地事務(wù)的LocalTransactionExecuter executer,executer中會進(jìn)行事務(wù)操作以保證本地事務(wù)和消息發(fā)送這兩個操作的原子性。

由上面的源碼可知:

Producer會首先發(fā)送一個半消息到Broker中:

  • 半消息發(fā)送成功,執(zhí)行事務(wù)
  • 半消息發(fā)送失敗,不執(zhí)行事務(wù)

半消息發(fā)送到Broker后不會被Consumer消費(fèi)掉的原因有以下兩點(diǎn):

  • Broker在將消息寫入CommitLog時會判斷消息類型,如果是prepare或者rollback消息,ConsumeQueue的offset不變
  • Broker在構(gòu)造ConsumeQueue時會判斷是否是處于prepare或者rollback狀態(tài)的消息,如果是則不會將該消息放入ConsumeQueue里,Consumer在拉取消息時也就不會拉取到這條消息

Producer會根據(jù)半消息的發(fā)送結(jié)果和本地任務(wù)執(zhí)行結(jié)果來決定如何處理事務(wù)(commit或rollback),方法最后調(diào)用了endTransaction來處理事務(wù)的執(zhí)行結(jié)果,源碼如下:

  • sendResult:發(fā)送半消息的結(jié)果
  • localTransactionState:本地事務(wù)狀態(tài)
  • localException:執(zhí)行本地事務(wù)邏輯產(chǎn)生的異常
  • RemotingException:遠(yuǎn)程調(diào)用異常
  • MQBrokerException:Broker異常
  • InterruptedException:當(dāng)線程中斷異常
  • UnknownHostException:未知host異常
public void endTransaction(
        final Message msg,
        final SendResult sendResult,
        final LocalTransactionState localTransactionState,
        final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
        // 解碼消息id
        final MessageId id;
        if (sendResult.getOffsetMsgId() != null) {
            id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
        } else {
            id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
        }

		// 創(chuàng)建請求
        String transactionId = sendResult.getTransactionId();
        final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
        EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
        requestHeader.setTransactionId(transactionId);
        requestHeader.setCommitLogOffset(id.getOffset());
        switch (localTransactionState) {
            case COMMIT_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                break;
            case ROLLBACK_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                break;
            case UNKNOW:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                break;
            default:
                break;
        }

        doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
        requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
        requestHeader.setMsgId(sendResult.getMsgId());
        String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;

		// 提交 commit / rollback 消息 
        this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
            this.defaultMQProducer.getSendMsgTimeout());
    }

該方法是將事務(wù)執(zhí)行的結(jié)果發(fā)送給Broker,再由Broker決定是否進(jìn)行消息投遞,執(zhí)行步驟如下:

1.收到消息后先檢查是否是事務(wù)消息,如果不是事務(wù)消息則直接返回

2.根據(jù)請求頭里的offset查詢半消息,如果查詢結(jié)果為空則直接返回

3.根據(jù)半消息構(gòu)造新消息,新構(gòu)造的消息會被重新寫入到CommitLog里,rollback消息的消息體為空

4.如果是rollback消息,則該消息不會被投遞

具體原因上文中已經(jīng)分析過:只有commit消息才會被Broker投遞給consumer

RocketMQ會將commit消息和rollback消息都寫入到commitLog里,但rollback消息的消息體為空且不會被投遞,CommitLog在刪除過期消息時才會將其刪除。當(dāng)事務(wù)commit成功之后,RocketMQ會重新封裝半消息并將其投遞給Consumer端消費(fèi)。

事務(wù)消息回查

Broker發(fā)起

相較于普通消息,事務(wù)消息主要依賴下面三個類:

1.TransactionStateService:事務(wù)狀態(tài)服務(wù),負(fù)責(zé)對事務(wù)消息進(jìn)行管理,包括存儲和更新事務(wù)消息狀態(tài)、回查狀態(tài)等

2.TranStateTable:事務(wù)消息狀態(tài)存儲表,基于MappedFileQueue實(shí)現(xiàn)

3.TranRedoLog:TranStateTable的日志,每次寫入操作都會記錄日志,當(dāng)Broker宕機(jī)時,可以利用這個文件做數(shù)據(jù)恢復(fù)

存儲半消息到CommitLog時,使用offset索引到對應(yīng)的TranStateTable的位置

到此這篇關(guān)于深入淺出RocketMQ的事務(wù)消息的文章就介紹到這了,更多相關(guān)RocketMQ事務(wù)消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 一文了解為什么Java中只有值傳遞

    一文了解為什么Java中只有值傳遞

    Java?傳參是值傳遞還是引用傳遞?這個問題很基礎(chǔ),但是許多人都有點(diǎn)懵。本文就來通過一些示例帶大家詳細(xì)了解一下,需要的可以參考一下
    2022-07-07
  • Java數(shù)據(jù)結(jié)構(gòu)之圖的兩種搜索算法詳解

    Java數(shù)據(jù)結(jié)構(gòu)之圖的兩種搜索算法詳解

    在很多情況下,我們需要遍歷圖,得到圖的一些性質(zhì)。有關(guān)圖的搜索,最經(jīng)典的算法有深度優(yōu)先搜索和廣度優(yōu)先搜索,接下來我們分別講解這兩種搜索算法,需要的可以參考一下
    2022-11-11
  • SpringBoot使用SchedulingConfigurer實(shí)現(xiàn)多個定時任務(wù)多機(jī)器部署問題(推薦)

    SpringBoot使用SchedulingConfigurer實(shí)現(xiàn)多個定時任務(wù)多機(jī)器部署問題(推薦)

    這篇文章主要介紹了SpringBoot使用SchedulingConfigurer實(shí)現(xiàn)多個定時任務(wù)多機(jī)器部署問題,定時任務(wù)多機(jī)器部署解決方案,方式一拆分,單獨(dú)拆分出來,單獨(dú)跑一個應(yīng)用,方式二是基于aop攔截處理(搶占執(zhí)行),只要有一個執(zhí)行,其它都不執(zhí)行,需要的朋友可以參考下
    2023-01-01
  • 詳解Java內(nèi)存溢出的幾種情況

    詳解Java內(nèi)存溢出的幾種情況

    這篇文章主要介紹了詳解Java內(nèi)存溢出的幾種情況,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-06-06
  • JAVA中的字符串常量池使用操作代碼

    JAVA中的字符串常量池使用操作代碼

    Java中的字符串常量池是Java堆中的一塊特殊存儲區(qū)域,用于存儲字符串。它的實(shí)現(xiàn)是為了提高字符串操作的性能并節(jié)省內(nèi)存,這篇文章主要介紹了JAVA中的字符串常量池,需要的朋友可以參考下
    2022-12-12
  • java中HashSet的特點(diǎn)及實(shí)例用法

    java中HashSet的特點(diǎn)及實(shí)例用法

    在本篇文章里小編給大家整理的是一篇關(guān)于java中HashSet的特點(diǎn)及實(shí)例用法,有興趣的朋友們可以學(xué)習(xí)下。
    2021-04-04
  • Java?Synchronize底層原理總結(jié)

    Java?Synchronize底層原理總結(jié)

    這篇文章主要給大家總結(jié)了Java?Synchronize底層原理,文中的圖文講解介紹的非常詳細(xì),對我們學(xué)習(xí)Java?Synchronize有一定的幫助,需要的朋友可以參考下
    2023-06-06
  • Java運(yùn)行時jar終端輸出的中文日志亂碼兩種解決方式

    Java運(yùn)行時jar終端輸出的中文日志亂碼兩種解決方式

    jar包啟動,今天java開發(fā)過來找,說jar包啟動日志是亂碼,這篇文章主要給大家介紹了關(guān)于Java運(yùn)行時jar終端輸出的中文日志亂碼的兩種解決方式,文中通過圖文介紹的非常詳細(xì),需要的朋友可以參考下
    2024-01-01
  • Mybatis in條件傳參的三種實(shí)現(xiàn)方式(直接$,List,[])

    Mybatis in條件傳參的三種實(shí)現(xiàn)方式(直接$,List,[])

    這篇文章主要介紹了Mybatis in條件傳參的三種實(shí)現(xiàn)方式(直接$,List,[]),具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-12-12
  • Netty粘包拆包及使用原理詳解

    Netty粘包拆包及使用原理詳解

    Netty是由JBOSS提供的一個java開源框架,現(xiàn)為?Github上的獨(dú)立項(xiàng)目。Netty提供異步的、事件驅(qū)動的網(wǎng)絡(luò)應(yīng)用程序框架和工具,用以快速開發(fā)高性能、高可靠性的網(wǎng)絡(luò)服務(wù)器和客戶端程序,這篇文章主要介紹了Netty粘包拆包及使用原理
    2022-08-08

最新評論