欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot集成RocketMQ發(fā)送事務(wù)消息的原理解析

 更新時間:2022年06月30日 15:48:40   作者:劍圣無痕  
RocketMQ 的事務(wù)消息提供類似 X/Open XA 的分布事務(wù)功能,通過事務(wù)消息能達(dá)到分布式事務(wù)的最終一致,這篇文章主要介紹了SpringBoot集成RocketMQ發(fā)送事務(wù)消息,需要的朋友可以參考下

簡介

RocketMQ 事務(wù)消息(Transactional Message)是指應(yīng)用本地事務(wù)和發(fā)送消息操作可以被定義到全局事務(wù)中,要么同時成功,要么同時失敗。RocketMQ 的事務(wù)消息提供類似 X/Open XA 的分布事務(wù)功能,通過事務(wù)消息能達(dá)到分布式事務(wù)的最終一致。

原理

RocketMQ事務(wù)消息通過異步確保方式,保證事務(wù)的最終一致性。設(shè)計的思想可以借鑒兩個階段提交事務(wù)。其執(zhí)行流程圖如下:

圖片.png

  • 發(fā)送方向MQ服務(wù)端發(fā)送消息。
  • MQ Server將消息持久化成功之后,向發(fā)送方 ACK 確認(rèn)消息已經(jīng)發(fā)送成功,此時消息為半消息。
  • 發(fā)送方開始執(zhí)行本地事務(wù)邏輯。
  • 發(fā)送方根據(jù)本地事務(wù)執(zhí)行結(jié)果向 MQ Server 提交二次確認(rèn)(Commit 或是 Rollback),MQ Server 收到 Commit 狀態(tài)則將半消息標(biāo)記為可投遞,訂閱方最終將收到該消息;MQ Server 收到 Rollback 狀態(tài)則刪除半消息,訂閱方將不會接受該消息。
  • 在斷網(wǎng)或者是應(yīng)用重啟的特殊情況下,上述步驟4提交的二次確認(rèn)最終未到達(dá) MQ Server,經(jīng)過固定時間后 MQ Server 將對該消息發(fā)起消息回查。
  • 發(fā)送方收到消息回查后,需要檢查對應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。
  • 發(fā)送方根據(jù)檢查得到的本地事務(wù)的最終狀態(tài)再次提交二次確認(rèn),MQ Server 仍按照步驟4對半消息進(jìn)行操作。

具體實現(xiàn)

消費者

@Component
public class TransactionProduce
{
    private Logger logger = LoggerFactory.getLogger(getClass());
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    public void sendTransactionMessage(String msg)
    {
        logger.info("start sendTransMessage hashKey:{}",msg);
       
         Message message =new Message();
         message.setBody("this is tx message".getBytes());
         TransactionSendResult result=rocketMQTemplate.sendMessageInTransaction("test-tx-rocketmq", 
                 MessageBuilder.withPayload(message).build(), msg);
         
         //發(fā)送狀態(tài)
         String sendStatus = result.getSendStatus().name();
         // 本地事務(wù)執(zhí)行狀態(tài)
         String localTxState = result.getLocalTransactionState().name();
         logger.info("send tx message sendStatus:{},localTXState:{}",sendStatus,localTxState);
    } 
}

說明:發(fā)送事務(wù)消息采用的是sendMessageInTransaction方法,返回結(jié)果為TransactionSendResult對象,該對象中包含了事務(wù)發(fā)送的狀態(tài)、本地事務(wù)執(zhí)行的狀態(tài)等。

消費者

@Component
@RocketMQMessageListener(consumerGroup="test-txRocketmq-group",topic="test-tx-rocketmq", messageModel = MessageModel.CLUSTERING)
public class TransactionConsumer implements RocketMQListener<String>
{
    private Logger logger =LoggerFactory.getLogger(getClass());
    @Override
    public void onMessage(String message)
    {
        logger.info("send transaction mssage parma is:{}", message);
    }
}

說明:發(fā)送事務(wù)消息的消費者與普通的消費者一樣沒有太大的區(qū)別。

生產(chǎn)者消息監(jiān)聽器

發(fā)送事務(wù)消息除了生產(chǎn)者和消費者以外,我們還需要創(chuàng)建生產(chǎn)者的消息監(jiān)聽器,來監(jiān)聽本地事務(wù)執(zhí)行的狀態(tài)和檢查本地事務(wù)狀態(tài)。

@RocketMQTransactionListener
public class TransactionMsgListener implements RocketMQLocalTransactionListener
{
    private Logger logger = LoggerFactory.getLogger(getClass());
    /**
     * 執(zhí)行本地事務(wù)
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg,
            Object obj)
    {
        logger.info("start invoke local rocketMQ transaction");
        RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT;
        
        try
        {
            //處理業(yè)務(wù)
            String jsonStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
            logger.info("invoke msg content:{}",jsonStr);
        }
        catch (Exception e)
        {
            logger.error("invoke local mq trans error",e);
            resultState = RocketMQLocalTransactionState.UNKNOWN;
        }
        
        return resultState;
    }

    /**
     * 檢查本地事務(wù)的狀態(tài)
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg)
    {
        logger.info("start check Local rocketMQ transaction");
        
        RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT;
        
        try
        {
            String jsonStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
            logger.info("check trans msg content:{}",jsonStr);
        }
        catch (Exception e)
        {
            resultState  = RocketMQLocalTransactionState.ROLLBACK;
        }
        return resultState;
    }
}

說明:RocketMQ本地事務(wù)狀態(tài)由如下幾種:

  • RocketMQLocalTransactionState.COMMIT:提交事務(wù),允許消費者消費此消息。
  • RocketMQLocalTransactionState.ROLLBACK: 回滾事務(wù),消息將被刪除,不允許被消費。
  • RocketMQLocalTransactionState.UNKNOWN:中間狀態(tài),代表需要進(jìn)行檢查來確定狀態(tài)。

注意:Spring Boot2.0的版本之后,@RocketMQTransactionListener 已經(jīng)沒有了txProducerGroup屬性,且sendMessageInTransaction方法也將其移除。所以在同一項目中只能有一個@RocketMQTransactionListener,不能出現(xiàn)多個,否則會報如下錯誤:

java.lang.IllegalStateException: rocketMQTemplate already exists RocketMQLocalTransactionListener

消息事務(wù)測試

正常測試

c.s.fw.mq.produce.TransactionProduce - product start sendTransMessage msg:{"userId":"zhangsann"}
c.s.f.m.p.TransactionMsgListener - start invoke local rocketMQ transaction
c.s.f.m.p.TransactionMsgListener - invoke local transaction msg content:{"topic":null,"flag":0,"properties":null,"body":"dGhpcyBpcyB0eCBtZXNzYWdl","transactionId":null,"keys":null,"tags":null,"delayTimeLevel":0,"waitStoreMsgOK":true,"buyerId":null}
c.s.fw.mq.produce.TransactionProduce - send tx message sendStatus:SEND_OK,localTXState:COMMIT_MESSAGE
c.s.f.m.consumer.TransactionConsumer - send transaction mssage parma is:{"topic":null,"flag":0,"properties":null,"body":"dGhpcyBpcyB0eCBtZXNzYWdl","transactionId":null,"keys":null,"tags":null,"delayTimeLevel":0,"waitStoreMsgOK":true,"buyerId":null}

說明:通過日志我們可以看出,執(zhí)行的流程與上述的一致,執(zhí)行成功后,消息執(zhí)行成功返回的結(jié)果為SEND_OK,本地事務(wù)執(zhí)行的狀態(tài)為COMMIT_MESSAGE。

異常測試

如果在執(zhí)行本地消息時出現(xiàn)異常,那么執(zhí)行結(jié)果會是怎樣?修改下本地事務(wù)執(zhí)行的方法,讓其出現(xiàn)異常。

代碼調(diào)整

  @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg,
            Object obj)
    {
        logger.info("start invoke local rocketMQ transaction");
        RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT;
        
        try
        {
            //處理業(yè)務(wù)
            String jsonStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
            logger.info("invoke local transaction msg content:{}",jsonStr);
             int c=1/0;
        }
        catch (Exception e)
        {
            logger.error("invoke local mq trans error",e);
            resultState = RocketMQLocalTransactionState.UNKNOWN;
        }
        
        return resultState;
    }

執(zhí)行結(jié)果

c.s.fw.mq.produce.TransactionProduce - send tx message sendStatus:SEND_OK,localTXState:UNKNOW

從執(zhí)行的結(jié)果可以看出,消息執(zhí)行成功返回的結(jié)果為SEND_OK,本地事務(wù)執(zhí)行的狀態(tài)為:UNKNOW.所以消費端無法消費此消息。

總結(jié)

到此這篇關(guān)于SpringBoot集成RocketMQ發(fā)送事務(wù)消息的文章就介紹到這了,更多相關(guān)SpringBoot集成RocketMQ事務(wù)消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • java使用Rxtx實現(xiàn)串口通信調(diào)試工具

    java使用Rxtx實現(xiàn)串口通信調(diào)試工具

    這篇文章主要為大家詳細(xì)介紹了java使用Rxtx實現(xiàn)簡單串口通信調(diào)試工具,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2018-12-12
  • SpringBoot業(yè)務(wù)邏輯異常的處理方法介紹

    SpringBoot業(yè)務(wù)邏輯異常的處理方法介紹

    本篇文章為大家展示了如何在SpringBoot中統(tǒng)一處理邏輯異常,內(nèi)容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細(xì)介紹希望你能有所收獲
    2022-09-09
  • Java中如何檢查數(shù)組是否包含某整數(shù)

    Java中如何檢查數(shù)組是否包含某整數(shù)

    這篇文章主要介紹了在?Java?中檢查數(shù)組是否包含某整數(shù),在本文中,我們使用了幾個內(nèi)置的方法,如anyMatch()、contains()、binarySearch()等,我們將在給定的數(shù)組中找到一個值,結(jié)合示例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下
    2023-05-05
  • IDEA 2020版本最新破解教程可激活至2089年(推薦)

    IDEA 2020版本最新破解教程可激活至2089年(推薦)

    這篇文章主要介紹了IDEA 2020版本最新破解教程可激活至2089年,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-09-09
  • springboot實現(xiàn)rabbitmq的隊列初始化和綁定

    springboot實現(xiàn)rabbitmq的隊列初始化和綁定

    這篇文章主要介紹了springboot實現(xiàn)rabbitmq的隊列初始化和綁定,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2018-10-10
  • SprintBoot深入淺出講解場景啟動器Starter

    SprintBoot深入淺出講解場景啟動器Starter

    本篇文章將和大家分享一下 Spring Boot 框架中的 Starters 場景啟動器的內(nèi)容,關(guān)于 Starters 具體是用來做什么的,以及在開發(fā) Spring Boot項目前,要如何自定義一個 Starters 場景啟動器
    2022-06-06
  • 使用gRPC微服務(wù)的內(nèi)部通信優(yōu)化

    使用gRPC微服務(wù)的內(nèi)部通信優(yōu)化

    這篇文章主要為大家介紹了微服務(wù)優(yōu)化之使用gRPC做微服務(wù)的內(nèi)部通信,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步
    2022-03-03
  • 談?wù)勗贘ava發(fā)送郵件中遇到的的問題

    談?wù)勗贘ava發(fā)送郵件中遇到的的問題

    本文介紹了在利用Java發(fā)送郵件過程中遇到的的兩個問題,以及如何解決這兩個問題。如果大家也遇到了這些問題,可以來參考借鑒。
    2016-08-08
  • SpringBoot集成Tomcat服務(wù)架構(gòu)配置

    SpringBoot集成Tomcat服務(wù)架構(gòu)配置

    這篇文章主要為大家介紹了SpringBoot集成Tomcat服務(wù)架構(gòu)配置,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-02-02
  • 解決IDEA中多模塊下Mybatis逆向工程不生成相應(yīng)文件的情況

    解決IDEA中多模塊下Mybatis逆向工程不生成相應(yīng)文件的情況

    這篇文章主要介紹了解決IDEA中多模塊下Mybatis逆向工程不生成相應(yīng)文件的情況,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-01-01

最新評論