欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ事務(wù)消息原理與使用詳解

 更新時間:2023年02月13日 11:41:23   作者:每天都要進步一點點  
RocketMQ事務(wù)消息(Transactional Message)是指應(yīng)用本地事務(wù)和發(fā)送消息操作可以被定義到全局事務(wù)中,要么同時成功,要么同時失敗。RocketMQ的事務(wù)消息提供類似 X/Open XA 的分布式事務(wù)功能,通過事務(wù)消息能達到分布式事務(wù)的最終一致

一、RocketMQ事務(wù)消息概要

RocketMQ事務(wù)消息(Transactional Message)是指應(yīng)用本地事務(wù)和發(fā)送消息操作可以被定義到全局事務(wù)中,要么同時成功,要么同時失敗。RocketMQ的事務(wù)消息提供類似 X/Open XA 的分布式事務(wù)功能,通過事務(wù)消息能達到分布式事務(wù)的最終一致。

Apache RocketMQ在4.3.0版中已經(jīng)支持分布式事務(wù)消息,采用了2PC(兩階段提交)+ 補償機制(事務(wù)狀態(tài)回查)的思想來實現(xiàn)了提交事務(wù)消息,同時增加一個補償邏輯來處理二階段超時或者失敗的消息,如下圖所示。

我們可以看到,事務(wù)消息主要分為兩個流程:

(1)、正常事務(wù)消息的發(fā)送及提交

a、生產(chǎn)者發(fā)送half消息到Broker服務(wù)端(半消息);

半消息是一種特殊的消息類型,該狀態(tài)的消息暫時不能被Consumer消費。當一條事務(wù)消息被成功投遞到Broker上,但是Broker并沒有接收到Producer發(fā)出的二次確認時,該事務(wù)消息就處于"暫時不可被消費"狀態(tài),該狀態(tài)的事務(wù)消息被稱為半消息。

b、Broker服務(wù)端將消息持久化之后,給生產(chǎn)者響應(yīng)消息寫入結(jié)果(ACK響應(yīng));

c、生產(chǎn)者根據(jù)發(fā)送結(jié)果執(zhí)行本地事務(wù)邏輯(如果寫入失敗,此時half消息對業(yè)務(wù)不可見,本地邏輯不執(zhí)行);

d、生產(chǎn)者根據(jù)本地事務(wù)執(zhí)行結(jié)果向Broker服務(wù)端提交二次確認(Commit 或是 Rollback),Broker服務(wù)端收到 Commit 狀態(tài)則將半事務(wù)消息標記為可投遞,訂閱方最終將收到該消息;Broker服務(wù)端收到 Rollback 狀態(tài)則刪除半事務(wù)消息,訂閱方將不會接收該消息;

(2)、事務(wù)消息的補償流程

a、在網(wǎng)絡(luò)閃斷或者是應(yīng)用重啟的情況下,可能導(dǎo)致生產(chǎn)者發(fā)送的二次確認消息未能到達Broker服務(wù)端,經(jīng)過固定時間后,Broker服務(wù)端將會對沒有Commit/Rollback的事務(wù)消息(pending狀態(tài)的消息)進行“回查”;

b、生產(chǎn)者收到回查消息后,檢查回查消息對應(yīng)的本地事務(wù)執(zhí)行的最終結(jié)果;

c、生產(chǎn)者根據(jù)本地事務(wù)狀態(tài),再次提交二次確認給Broker,然后Broker重新對半事務(wù)消息Commit或者Rollback;

其中,補償階段用于解決消息Commit或者Rollback發(fā)生超時或者失敗的情況。

事務(wù)消息共有三種狀態(tài),提交狀態(tài)、回滾狀態(tài)、中間狀態(tài):

  • TransactionStatus.CommitTransaction:提交事務(wù),它允許消費者消費此消息。
  • TransactionStatus.RollbackTransaction:回滾事務(wù),它代表該消息將被刪除,不允許被消費。
  • TransactionStatus.Unknown:中間狀態(tài),它代表需要回查本地事務(wù)狀態(tài)來決定是提交還是回滾事務(wù)。

下面我們通過示例演示如何使用RocketMQ的事務(wù)消息。

二、RocketMQ事務(wù)消息使用案例

(1)、定義消息監(jiān)聽器

消息監(jiān)聽器主要是實現(xiàn)TransactionListener接口,然后需要重寫下面兩個方法:

  • executeLocalTransaction:執(zhí)行本地事務(wù);
  • checkLocalTransaction:回查本地事務(wù)狀態(tài),根據(jù)這次回查的結(jié)果來決定此次事務(wù)是提交還是回滾;
/**
 * 事務(wù)監(jiān)聽器,重寫執(zhí)行本地事務(wù)方法以及事務(wù)回查方法
 */
public class TransactionListenerImpl implements TransactionListener {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        String msgKey = msg.getKeys();
        switch (msgKey) {
            case "Num0":
            case "Num1":
                // 明確回復(fù)回滾操作,消息將會被刪除,不允許被消費。
                return LocalTransactionState.ROLLBACK_MESSAGE;
            case "Num8":
            case "Num9":
                // 消息無響應(yīng),代表需要回查本地事務(wù)狀態(tài)來決定是提交還是回滾事務(wù)
                return LocalTransactionState.UNKNOW;
            default:
                // 消息通過,允許消費者消費消息
                return LocalTransactionState.COMMIT_MESSAGE;
        }
    }
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        System.out.println("回查本地事務(wù)狀態(tài),消息Key: " + msg.getKeys() + ",消息內(nèi)容: " + new String(msg.getBody()));
        // 需要根據(jù)業(yè)務(wù),查詢本地事務(wù)是否執(zhí)行成功,這里直接返回COMMIT
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

(2)、定義消息生產(chǎn)者

事務(wù)消息的生產(chǎn)者跟我們之前的普通生產(chǎn)者的不同:

  • a、需創(chuàng)建事務(wù)類型的生產(chǎn)者TransactionMQProducer;
  • b、需調(diào)用setTransactionListener()方法設(shè)置事務(wù)監(jiān)聽器;
  • c、使用sendMessageInTransaction()以事務(wù)方式發(fā)送消息;
public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        // 創(chuàng)建事務(wù)類型的生產(chǎn)者
        TransactionMQProducer producer = new TransactionMQProducer("transaction-producer-group");
        // 設(shè)置NameServer的地址
        producer.setNamesrvAddr("10.0.90.211:9876");
        // 設(shè)置事務(wù)監(jiān)聽器
        producer.setTransactionListener(new TransactionListenerImpl());
        // 啟動生產(chǎn)者
        producer.start();
        // 發(fā)送10條消息
        for (int i = 0; i < 10; i++) {
            try {
                Message msg = new Message("TransactionTopic", "", ("Hello RocketMQ Transaction Message" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 設(shè)置消息Key
                msg.setKeys("Num" + i);
                // 使用事務(wù)方式發(fā)送消息
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.println("sendResult = " + sendResult);
                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
        // 阻塞,目的是為了在消息發(fā)送完成后才關(guān)閉生產(chǎn)者
        Thread.sleep(10000);
        producer.shutdown();
    }
}

(3)、定義消息消費者

public class MQConsumer {
    public static void main(String[] args) throws MQClientException {
        // 創(chuàng)建DefaultMQPushConsumer類并設(shè)定消費者名稱
        DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer("consumer-group-test");
        // 設(shè)置NameServer地址,如果是集群的話,使用分號;分隔開
        mqPushConsumer.setNamesrvAddr("10.0.90.211:9876");
        // 設(shè)置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費
        // 如果不是第一次啟動,那么按照上次消費的位置繼續(xù)消費
        mqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        // 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息,如果訂閱該主題下的所有tag,則使用*
        mqPushConsumer.subscribe("TransactionTopic", "*");
        // 注冊回調(diào)實現(xiàn)類來處理從broker拉取回來的消息
        mqPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
            // 監(jiān)聽類實現(xiàn)MessageListenerConcurrently接口即可,重寫consumeMessage方法接收數(shù)據(jù)
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                MessageExt messageExt = msgList.get(0);
                String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
                System.out.println("消費者接收到消息: " + messageExt.toString() + "---消息內(nèi)容為:" + body);
                // 標記該消息已經(jīng)被成功消費
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 啟動消費者實例
        mqPushConsumer.start();
    }
}

(4)、觀察生產(chǎn)者控制臺輸出

通過控制臺可以看到,生產(chǎn)者成功發(fā)送10條消息,并且我們在事務(wù)監(jiān)聽器中針對message key為Num8、Num9這兩條消息返回UNKNOW狀態(tài),這樣RocketMQ就會執(zhí)行本地事務(wù)回查去確認本地事務(wù)執(zhí)行狀態(tài)【即執(zhí)行checkLocalTransaction()方法】。

sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E00564F4018B4AAC231C40E0E0000, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-a, queueId=2], queueOffset=9]
sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E00564F4018B4AAC231C40E300001, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-a, queueId=3], queueOffset=10]
sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E00564F4018B4AAC231C40E400002, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-a, queueId=0], queueOffset=11]
sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E00564F4018B4AAC231C40E650003, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-a, queueId=1], queueOffset=12]
sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E00564F4018B4AAC231C40E780004, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-a, queueId=2], queueOffset=13]
sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E00564F4018B4AAC231C40E880005, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-a, queueId=3], queueOffset=14]
sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E00564F4018B4AAC231C40E990006, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-a, queueId=0], queueOffset=15]
sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E00564F4018B4AAC231C40EB20007, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-a, queueId=1], queueOffset=16]
sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E00564F4018B4AAC231C40EC30008, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-a, queueId=2], queueOffset=17]
sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E00564F4018B4AAC231C40EE30009, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-a, queueId=3], queueOffset=18]
回查本地事務(wù)狀態(tài),消息Key: Num8,消息內(nèi)容: Hello RocketMQ Transaction Message8
回查本地事務(wù)狀態(tài),消息Key: Num9,消息內(nèi)容: Hello RocketMQ Transaction Message9

(5)、觀察消費者控制臺輸出

可以看到,消費者成功接收到8條消息,因為有2條消息,我們在執(zhí)行本地事務(wù)的時候,明確告訴RocketMQ進行回滾了,所以這2條消息不能被消費者進行消費。

消費者接收到消息: MessageExt [brokerName=broker-a, queueId=0, storeSize=313, queueOffset=1, sysFlag=8, bornTimestamp=1646898932288, bornHost=/10.0.90.139:57933, storeTimestamp=1646898931728, storeHost=/10.0.90.211:10911, msgId=0A005AD300002A9F0000000000004398, commitLogOffset=17304, bodyCRC=1033347556, reconsumeTimes=0, preparedTransactionOffset=16983, toString()=Message{topic='TransactionTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TransactionTopic, MAX_OFFSET=2, KEYS=Num2, TRAN_MSG=true, CONSUME_START_TIME=1646898932329, UNIQ_KEY=AC6E00564F4018B4AAC231C40E400002, CLUSTER=DefaultCluster, PGROUP=transaction-producer-group, WAIT=true, REAL_QID=0}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 84, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 32, 77, 101, 115, 115, 97, 103, 101, 50], transactionId='AC6E00564F4018B4AAC231C40E400002'}]---消息內(nèi)容為:Hello RocketMQ Transaction Message2
消費者接收到消息: MessageExt [brokerName=broker-a, queueId=1, storeSize=313, queueOffset=1, sysFlag=8, bornTimestamp=1646898932325, bornHost=/10.0.90.139:57933, storeTimestamp=1646898931741, storeHost=/10.0.90.211:10911, msgId=0A005AD300002A9F000000000000469A, commitLogOffset=18074, bodyCRC=1250988402, reconsumeTimes=0, preparedTransactionOffset=17753, toString()=Message{topic='TransactionTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TransactionTopic, MAX_OFFSET=2, KEYS=Num3, TRAN_MSG=true, CONSUME_START_TIME=1646898932341, UNIQ_KEY=AC6E00564F4018B4AAC231C40E650003, CLUSTER=DefaultCluster, PGROUP=transaction-producer-group, WAIT=true, REAL_QID=1}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 84, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 32, 77, 101, 115, 115, 97, 103, 101, 51], transactionId='AC6E00564F4018B4AAC231C40E650003'}]---消息內(nèi)容為:Hello RocketMQ Transaction Message3
消費者接收到消息: MessageExt [brokerName=broker-a, queueId=2, storeSize=313, queueOffset=2, sysFlag=8, bornTimestamp=1646898932344, bornHost=/10.0.90.139:57933, storeTimestamp=1646898931758, storeHost=/10.0.90.211:10911, msgId=0A005AD300002A9F000000000000499C, commitLogOffset=18844, bodyCRC=1425278161, reconsumeTimes=0, preparedTransactionOffset=18523, toString()=Message{topic='TransactionTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TransactionTopic, MAX_OFFSET=3, KEYS=Num4, TRAN_MSG=true, CONSUME_START_TIME=1646898932359, UNIQ_KEY=AC6E00564F4018B4AAC231C40E780004, CLUSTER=DefaultCluster, PGROUP=transaction-producer-group, WAIT=true, REAL_QID=2}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 84, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 32, 77, 101, 115, 115, 97, 103, 101, 52], transactionId='AC6E00564F4018B4AAC231C40E780004'}]---消息內(nèi)容為:Hello RocketMQ Transaction Message4
消費者接收到消息: MessageExt [brokerName=broker-a, queueId=3, storeSize=313, queueOffset=2, sysFlag=8, bornTimestamp=1646898932360, bornHost=/10.0.90.139:57933, storeTimestamp=1646898931774, storeHost=/10.0.90.211:10911, msgId=0A005AD300002A9F0000000000004C9E, commitLogOffset=19614, bodyCRC=603141191, reconsumeTimes=0, preparedTransactionOffset=19293, toString()=Message{topic='TransactionTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TransactionTopic, MAX_OFFSET=3, KEYS=Num5, TRAN_MSG=true, CONSUME_START_TIME=1646898932375, UNIQ_KEY=AC6E00564F4018B4AAC231C40E880005, CLUSTER=DefaultCluster, PGROUP=transaction-producer-group, WAIT=true, REAL_QID=3}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 84, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 32, 77, 101, 115, 115, 97, 103, 101, 53], transactionId='AC6E00564F4018B4AAC231C40E880005'}]---消息內(nèi)容為:Hello RocketMQ Transaction Message5
消費者接收到消息: MessageExt [brokerName=broker-a, queueId=0, storeSize=313, queueOffset=2, sysFlag=8, bornTimestamp=1646898932377, bornHost=/10.0.90.139:57933, storeTimestamp=1646898931801, storeHost=/10.0.90.211:10911, msgId=0A005AD300002A9F0000000000004FA0, commitLogOffset=20384, bodyCRC=989488637, reconsumeTimes=0, preparedTransactionOffset=20063, toString()=Message{topic='TransactionTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TransactionTopic, MAX_OFFSET=3, KEYS=Num6, TRAN_MSG=true, CONSUME_START_TIME=1646898932402, UNIQ_KEY=AC6E00564F4018B4AAC231C40E990006, CLUSTER=DefaultCluster, PGROUP=transaction-producer-group, WAIT=true, REAL_QID=0}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 84, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 32, 77, 101, 115, 115, 97, 103, 101, 54], transactionId='AC6E00564F4018B4AAC231C40E990006'}]---消息內(nèi)容為:Hello RocketMQ Transaction Message6
消費者接收到消息: MessageExt [brokerName=broker-a, queueId=1, storeSize=313, queueOffset=2, sysFlag=8, bornTimestamp=1646898932402, bornHost=/10.0.90.139:57933, storeTimestamp=1646898931816, storeHost=/10.0.90.211:10911, msgId=0A005AD300002A9F00000000000052A2, commitLogOffset=21154, bodyCRC=1308448107, reconsumeTimes=0, preparedTransactionOffset=20833, toString()=Message{topic='TransactionTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TransactionTopic, MAX_OFFSET=3, KEYS=Num7, TRAN_MSG=true, CONSUME_START_TIME=1646898932441, UNIQ_KEY=AC6E00564F4018B4AAC231C40EB20007, CLUSTER=DefaultCluster, PGROUP=transaction-producer-group, WAIT=true, REAL_QID=1}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 84, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 32, 77, 101, 115, 115, 97, 103, 101, 55], transactionId='AC6E00564F4018B4AAC231C40EB20007'}]---消息內(nèi)容為:Hello RocketMQ Transaction Message7
消費者接收到消息: MessageExt [brokerName=broker-a, queueId=1, storeSize=339, queueOffset=3, sysFlag=8, bornTimestamp=1646898900749, bornHost=/10.0.90.139:57878, storeTimestamp=1646898935220, storeHost=/10.0.90.211:10911, msgId=0A005AD300002A9F000000000000599B, commitLogOffset=22939, bodyCRC=709195884, reconsumeTimes=0, preparedTransactionOffset=22592, toString()=Message{topic='TransactionTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TransactionTopic, TRANSACTION_CHECK_TIMES=1, MAX_OFFSET=4, KEYS=Num9, TRAN_MSG=true, CONSUME_START_TIME=1646898935835, UNIQ_KEY=AC6E00563BCC18B4AAC231C3930D0009, CLUSTER=DefaultCluster, PGROUP=transaction-producer-group, WAIT=true, REAL_QID=1}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 84, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 32, 77, 101, 115, 115, 97, 103, 101, 57], transactionId='AC6E00563BCC18B4AAC231C3930D0009'}]---消息內(nèi)容為:Hello RocketMQ Transaction Message9
消費者接收到消息: MessageExt [brokerName=broker-a, queueId=0, storeSize=339, queueOffset=3, sysFlag=8, bornTimestamp=1646898900727, bornHost=/10.0.90.139:57878, storeTimestamp=1646898935223, storeHost=/10.0.90.211:10911, msgId=0A005AD300002A9F0000000000005B76, commitLogOffset=23414, bodyCRC=1564625146, reconsumeTimes=0, preparedTransactionOffset=22245, toString()=Message{topic='TransactionTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TransactionTopic, TRANSACTION_CHECK_TIMES=1, MAX_OFFSET=4, KEYS=Num8, TRAN_MSG=true, CONSUME_START_TIME=1646898935839, UNIQ_KEY=AC6E00563BCC18B4AAC231C392F70008, CLUSTER=DefaultCluster, PGROUP=transaction-producer-group, WAIT=true, REAL_QID=0}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 84, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 32, 77, 101, 115, 115, 97, 103, 101, 56], transactionId='AC6E00563BCC18B4AAC231C392F70008'}]---消息內(nèi)容為:Hello RocketMQ Transaction Message8

三、RocketMQ事務(wù)消息原理

設(shè)計思想

在RocketMQ事務(wù)消息的主要流程中,一階段的消息如何對用戶不可見。其中,事務(wù)消息相對普通消息最大的特點就是一階段發(fā)送的消息對用戶是不可見的。那么,如何做到寫入消息但是對用戶不可見呢?RocketMQ事務(wù)消息的做法是:如果消息是half消息,將備份原消息的主題與消息消費隊列,然后改變主題為RMQ_SYS_TRANS_HALF_TOPIC。由于消費組未訂閱該主題,故消費端無法消費half類型的消息。

如何實現(xiàn)事務(wù)回查?

Broker會啟動一個消息回查的定時任務(wù),定時從事務(wù)消息queue中讀取所有待反查的消息。針對每個需要反查的半消息,Broker會給對應(yīng)的Producer發(fā)一個要求執(zhí)行事務(wù)狀態(tài)反查的RPC請求。然后根據(jù)RPC返回響應(yīng)中的反查結(jié)果,來決定這個半消息是需要提交還是回滾,或者后續(xù)繼續(xù)來反查。最后,提交或者回滾事務(wù),將半消息標記為已處理狀態(tài)【將消息存儲在主題為:RMQ_SYS_TRANS_OP_HALF_TOPIC的主題中,代表這些消息已經(jīng)被處理(提交或回滾)】。 如果是提交事務(wù),就把半消息從半消息隊列中復(fù)制到該消息真正的topic和queue中; 如果是回滾事務(wù),則什么都不做。

值得注意的是,rocketmq并不會無休止的的信息事務(wù)狀態(tài)回查,默認回查15次,如果15次回查還是無法得知事務(wù)狀態(tài),rocketmq默認回滾該消息。

四、RocketMQ事務(wù)消息使用限制

使用事務(wù)消息,有一些限制條件:

  • 事務(wù)消息不支持延時消息和批量消息;
  • 事務(wù)性消息可能不止一次被檢查或消費,所以消費者端需要做好消費冪等;
  • 為了避免單個消息被檢查太多次而導(dǎo)致半隊列消息累積,我們默認將單個消息的檢查次數(shù)限制為 15 次(即默認只會回查15次),我們可以通過 Broker 配置文件的 transactionCheckMax參數(shù)來修改此限制。如果已經(jīng)檢查某條消息超過 N 次的話( N = transactionCheckMax ), 則 Broker 將丟棄此消息,并在默認情況下同時打印錯誤日志。用戶可以通過重寫 AbstractTransactionCheckListener 類來修改這個行為;
  • 事務(wù)消息將在 Broker 配置文件中的參數(shù) transactionMsgTimeout 這樣的特定時間長度之后被檢查。當發(fā)送事務(wù)消息時,用戶還可以通過設(shè)置用戶屬性 CHECK_IMMUNITY_TIME_IN_SECONDS 來改變這個限制,該參數(shù)優(yōu)先于 transactionMsgTimeout 參數(shù);
  • 提交給用戶的目標主題消息可能會失敗,目前這依日志的記錄而定。它的高可用性通過 RocketMQ 本身的高可用性機制來保證,如果希望確保事務(wù)消息不丟失、并且事務(wù)完整性得到保證,建議使用同步的雙重寫入機制。
  • 事務(wù)消息的生產(chǎn)者 ID 不能與其他類型消息的生產(chǎn)者 ID 共享。與其他類型的消息不同,事務(wù)消息允許反向查詢、MQ服務(wù)器能通過它們的生產(chǎn)者 ID 查詢到消費者。

到此這篇關(guān)于RocketMQ事務(wù)消息原理與使用詳解的文章就介紹到這了,更多相關(guān)RocketMQ事務(wù)消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Springboot 項目讀取Resources目錄下的文件(推薦)

    Springboot 項目讀取Resources目錄下的文件(推薦)

    這篇文章主要介紹了Springboot 項目讀取Resources目錄下的文件,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-11-11
  • 深入淺析SPI機制在JDK與Spring?Boot中的應(yīng)用

    深入淺析SPI機制在JDK與Spring?Boot中的應(yīng)用

    SPI是一種使軟件框架或庫更加模塊化、可擴展和可維護的有效方法。通過遵循“開閉原則”,?SPI?確保了系統(tǒng)的穩(wěn)定性和靈活性,從而滿足了不斷變化的業(yè)務(wù)需求,這篇文章主要介紹了SPI機制在JDK與Spring?Boot中的應(yīng)用,需要的朋友可以參考下
    2023-09-09
  • SpringBoot 下在 yml 中的 logging 日志配置方法

    SpringBoot 下在 yml 中的 logging 日志配置方法

    logging 配置主要用于控制應(yīng)用程序的日志輸出行為,可以通過配置定制日志的格式、級別、輸出位置等,這篇文章主要介紹了SpringBoot 下在 yml 中的 logging 日志配置,需要的朋友可以參考下
    2024-06-06
  • Spring:spring-webmvc和spring-web有哪些區(qū)別

    Spring:spring-webmvc和spring-web有哪些區(qū)別

    這篇文章主要介紹了Spring:spring-webmvc和spring-web有哪些區(qū)別,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-01-01
  • Springboot深入講解nocos的整合與使用

    Springboot深入講解nocos的整合與使用

    Nacos?是阿里巴巴推出來的一個新開源項目,這是一個更易于構(gòu)建云原生應(yīng)用的動態(tài)服務(wù)發(fā)現(xiàn)、配置管理和服務(wù)管理平臺,在項目開發(fā)過程中,我們經(jīng)常使用nacos作為配置中心和注冊中心。本文章我們就從代碼層面研究下springboot是如何整合nacos使用的
    2022-07-07
  • SpringMVC實現(xiàn)文件上傳下載的全過程

    SpringMVC實現(xiàn)文件上傳下載的全過程

    對于上傳功能,我們在項目中是經(jīng)常會用到的,比如用戶注冊的時候,上傳用戶頭像,這個時候就會使用到上傳的功能,而對于下載使用場景也很常見,下面這篇文章主要給大家介紹了關(guān)于SpringMVC實現(xiàn)文件上傳下載的相關(guān)資料,需要的朋友可以參考下
    2022-01-01
  • 劍指Offer之Java算法習(xí)題精講排列與N叉樹

    劍指Offer之Java算法習(xí)題精講排列與N叉樹

    跟著思路走,之后從簡單題入手,反復(fù)去看,做過之后可能會忘記,之后再做一次,記不住就反復(fù)做,反復(fù)尋求思路和規(guī)律,慢慢積累就會發(fā)現(xiàn)質(zhì)的變化
    2022-03-03
  • SpringBoot動態(tài)導(dǎo)出word文檔實整教程(復(fù)制即可使用)

    SpringBoot動態(tài)導(dǎo)出word文檔實整教程(復(fù)制即可使用)

    在我們做項目的時候會需要把數(shù)據(jù)庫中的數(shù)據(jù)導(dǎo)出到word當中,下面這篇文章主要給大家介紹了關(guān)于SpringBoot動態(tài)導(dǎo)出word文檔實整教程的相關(guān)資料,文中的代碼復(fù)制即可使用,需要的朋友可以參考下
    2023-06-06
  • Junit 5中@ParameterizedTest與@EnumSource結(jié)合使用

    Junit 5中@ParameterizedTest與@EnumSource結(jié)合使用

    今天小編就為大家分享一篇關(guān)于Junit 5中@ParameterizedTest與@EnumSource結(jié)合使用,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧
    2018-12-12
  • SpringBoot中swagger的使用

    SpringBoot中swagger的使用

    這篇文章主要介紹了SpringBoot中swagger的使用,文中有非常詳細的代碼示例,對正在學(xué)習(xí)swagger的小伙伴們有非常好的幫助,需要的朋友可以參考下
    2021-05-05

最新評論