RocketMQ事務(wù)消息原理與使用詳解
一、RocketMQ事務(wù)消息概要
RocketMQ事務(wù)消息(Transactional Message)是指應(yīng)用本地事務(wù)和發(fā)送消息操作可以被定義到全局事務(wù)中,要么同時成功,要么同時失敗。RocketMQ的事務(wù)消息提供類似 X/Open XA 的分布式事務(wù)功能,通過事務(wù)消息能達到分布式事務(wù)的最終一致。
Apache RocketMQ在4.3.0版中已經(jīng)支持分布式事務(wù)消息,采用了2PC(兩階段提交)+ 補償機制(事務(wù)狀態(tài)回查)的思想來實現(xiàn)了提交事務(wù)消息,同時增加一個補償邏輯來處理二階段超時或者失敗的消息,如下圖所示。
我們可以看到,事務(wù)消息主要分為兩個流程:
(1)、正常事務(wù)消息的發(fā)送及提交
a、生產(chǎn)者發(fā)送half消息到Broker服務(wù)端(半消息);
半消息是一種特殊的消息類型,該狀態(tài)的消息暫時不能被Consumer消費。當一條事務(wù)消息被成功投遞到Broker上,但是Broker并沒有接收到Producer發(fā)出的二次確認時,該事務(wù)消息就處于"暫時不可被消費"狀態(tài),該狀態(tài)的事務(wù)消息被稱為半消息。
b、Broker服務(wù)端將消息持久化之后,給生產(chǎn)者響應(yīng)消息寫入結(jié)果(ACK響應(yīng));
c、生產(chǎn)者根據(jù)發(fā)送結(jié)果執(zhí)行本地事務(wù)邏輯(如果寫入失敗,此時half消息對業(yè)務(wù)不可見,本地邏輯不執(zhí)行);
d、生產(chǎn)者根據(jù)本地事務(wù)執(zhí)行結(jié)果向Broker服務(wù)端提交二次確認(Commit 或是 Rollback),Broker服務(wù)端收到 Commit 狀態(tài)則將半事務(wù)消息標記為可投遞,訂閱方最終將收到該消息;Broker服務(wù)端收到 Rollback 狀態(tài)則刪除半事務(wù)消息,訂閱方將不會接收該消息;
(2)、事務(wù)消息的補償流程
a、在網(wǎng)絡(luò)閃斷或者是應(yīng)用重啟的情況下,可能導(dǎo)致生產(chǎn)者發(fā)送的二次確認消息未能到達Broker服務(wù)端,經(jīng)過固定時間后,Broker服務(wù)端將會對沒有Commit/Rollback的事務(wù)消息(pending狀態(tài)的消息)進行“回查”;
b、生產(chǎn)者收到回查消息后,檢查回查消息對應(yīng)的本地事務(wù)執(zhí)行的最終結(jié)果;
c、生產(chǎn)者根據(jù)本地事務(wù)狀態(tài),再次提交二次確認給Broker,然后Broker重新對半事務(wù)消息Commit或者Rollback;
其中,補償階段用于解決消息Commit或者Rollback發(fā)生超時或者失敗的情況。
事務(wù)消息共有三種狀態(tài),提交狀態(tài)、回滾狀態(tài)、中間狀態(tài):
- TransactionStatus.CommitTransaction:提交事務(wù),它允許消費者消費此消息。
- TransactionStatus.RollbackTransaction:回滾事務(wù),它代表該消息將被刪除,不允許被消費。
- TransactionStatus.Unknown:中間狀態(tài),它代表需要回查本地事務(wù)狀態(tài)來決定是提交還是回滾事務(wù)。
下面我們通過示例演示如何使用RocketMQ的事務(wù)消息。
二、RocketMQ事務(wù)消息使用案例
(1)、定義消息監(jiān)聽器
消息監(jiān)聽器主要是實現(xiàn)TransactionListener接口,然后需要重寫下面兩個方法:
- executeLocalTransaction:執(zhí)行本地事務(wù);
- checkLocalTransaction:回查本地事務(wù)狀態(tài),根據(jù)這次回查的結(jié)果來決定此次事務(wù)是提交還是回滾;
/** * 事務(wù)監(jiān)聽器,重寫執(zhí)行本地事務(wù)方法以及事務(wù)回查方法 */ public class TransactionListenerImpl implements TransactionListener { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { String msgKey = msg.getKeys(); switch (msgKey) { case "Num0": case "Num1": // 明確回復(fù)回滾操作,消息將會被刪除,不允許被消費。 return LocalTransactionState.ROLLBACK_MESSAGE; case "Num8": case "Num9": // 消息無響應(yīng),代表需要回查本地事務(wù)狀態(tài)來決定是提交還是回滾事務(wù) return LocalTransactionState.UNKNOW; default: // 消息通過,允許消費者消費消息 return LocalTransactionState.COMMIT_MESSAGE; } } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { System.out.println("回查本地事務(wù)狀態(tài),消息Key: " + msg.getKeys() + ",消息內(nèi)容: " + new String(msg.getBody())); // 需要根據(jù)業(yè)務(wù),查詢本地事務(wù)是否執(zhí)行成功,這里直接返回COMMIT return LocalTransactionState.COMMIT_MESSAGE; } }
(2)、定義消息生產(chǎn)者
事務(wù)消息的生產(chǎn)者跟我們之前的普通生產(chǎn)者的不同:
- a、需創(chuàng)建事務(wù)類型的生產(chǎn)者TransactionMQProducer;
- b、需調(diào)用setTransactionListener()方法設(shè)置事務(wù)監(jiān)聽器;
- c、使用sendMessageInTransaction()以事務(wù)方式發(fā)送消息;
public class TransactionProducer { public static void main(String[] args) throws MQClientException, InterruptedException { // 創(chuàng)建事務(wù)類型的生產(chǎn)者 TransactionMQProducer producer = new TransactionMQProducer("transaction-producer-group"); // 設(shè)置NameServer的地址 producer.setNamesrvAddr("10.0.90.211:9876"); // 設(shè)置事務(wù)監(jiān)聽器 producer.setTransactionListener(new TransactionListenerImpl()); // 啟動生產(chǎn)者 producer.start(); // 發(fā)送10條消息 for (int i = 0; i < 10; i++) { try { Message msg = new Message("TransactionTopic", "", ("Hello RocketMQ Transaction Message" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); // 設(shè)置消息Key msg.setKeys("Num" + i); // 使用事務(wù)方式發(fā)送消息 SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.println("sendResult = " + sendResult); Thread.sleep(10); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); } } // 阻塞,目的是為了在消息發(fā)送完成后才關(guān)閉生產(chǎn)者 Thread.sleep(10000); producer.shutdown(); } }
(3)、定義消息消費者
public class MQConsumer { public static void main(String[] args) throws MQClientException { // 創(chuàng)建DefaultMQPushConsumer類并設(shè)定消費者名稱 DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer("consumer-group-test"); // 設(shè)置NameServer地址,如果是集群的話,使用分號;分隔開 mqPushConsumer.setNamesrvAddr("10.0.90.211:9876"); // 設(shè)置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費 // 如果不是第一次啟動,那么按照上次消費的位置繼續(xù)消費 mqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息,如果訂閱該主題下的所有tag,則使用* mqPushConsumer.subscribe("TransactionTopic", "*"); // 注冊回調(diào)實現(xiàn)類來處理從broker拉取回來的消息 mqPushConsumer.registerMessageListener(new MessageListenerConcurrently() { // 監(jiān)聽類實現(xiàn)MessageListenerConcurrently接口即可,重寫consumeMessage方法接收數(shù)據(jù) @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext consumeConcurrentlyContext) { MessageExt messageExt = msgList.get(0); String body = new String(messageExt.getBody(), StandardCharsets.UTF_8); System.out.println("消費者接收到消息: " + messageExt.toString() + "---消息內(nèi)容為:" + body); // 標記該消息已經(jīng)被成功消費 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啟動消費者實例 mqPushConsumer.start(); } }
(4)、觀察生產(chǎn)者控制臺輸出
通過控制臺可以看到,生產(chǎn)者成功發(fā)送10條消息,并且我們在事務(wù)監(jiān)聽器中針對message key為Num8、Num9這兩條消息返回UNKNOW狀態(tài),這樣RocketMQ就會執(zhí)行本地事務(wù)回查去確認本地事務(wù)執(zhí)行狀態(tài)【即執(zhí)行checkLocalTransaction()方法】。
sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E00564F4018B4AAC231C40E0E0000, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-a, queueId=2], queueOffset=9]
sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E00564F4018B4AAC231C40E300001, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-a, queueId=3], queueOffset=10]
sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E00564F4018B4AAC231C40E400002, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-a, queueId=0], queueOffset=11]
sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E00564F4018B4AAC231C40E650003, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-a, queueId=1], queueOffset=12]
sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E00564F4018B4AAC231C40E780004, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-a, queueId=2], queueOffset=13]
sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E00564F4018B4AAC231C40E880005, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-a, queueId=3], queueOffset=14]
sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E00564F4018B4AAC231C40E990006, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-a, queueId=0], queueOffset=15]
sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E00564F4018B4AAC231C40EB20007, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-a, queueId=1], queueOffset=16]
sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E00564F4018B4AAC231C40EC30008, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-a, queueId=2], queueOffset=17]
sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E00564F4018B4AAC231C40EE30009, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-a, queueId=3], queueOffset=18]
回查本地事務(wù)狀態(tài),消息Key: Num8,消息內(nèi)容: Hello RocketMQ Transaction Message8
回查本地事務(wù)狀態(tài),消息Key: Num9,消息內(nèi)容: Hello RocketMQ Transaction Message9
(5)、觀察消費者控制臺輸出
可以看到,消費者成功接收到8條消息,因為有2條消息,我們在執(zhí)行本地事務(wù)的時候,明確告訴RocketMQ進行回滾了,所以這2條消息不能被消費者進行消費。
消費者接收到消息: MessageExt [brokerName=broker-a, queueId=0, storeSize=313, queueOffset=1, sysFlag=8, bornTimestamp=1646898932288, bornHost=/10.0.90.139:57933, storeTimestamp=1646898931728, storeHost=/10.0.90.211:10911, msgId=0A005AD300002A9F0000000000004398, commitLogOffset=17304, bodyCRC=1033347556, reconsumeTimes=0, preparedTransactionOffset=16983, toString()=Message{topic='TransactionTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TransactionTopic, MAX_OFFSET=2, KEYS=Num2, TRAN_MSG=true, CONSUME_START_TIME=1646898932329, UNIQ_KEY=AC6E00564F4018B4AAC231C40E400002, CLUSTER=DefaultCluster, PGROUP=transaction-producer-group, WAIT=true, REAL_QID=0}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 84, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 32, 77, 101, 115, 115, 97, 103, 101, 50], transactionId='AC6E00564F4018B4AAC231C40E400002'}]---消息內(nèi)容為:Hello RocketMQ Transaction Message2
消費者接收到消息: MessageExt [brokerName=broker-a, queueId=1, storeSize=313, queueOffset=1, sysFlag=8, bornTimestamp=1646898932325, bornHost=/10.0.90.139:57933, storeTimestamp=1646898931741, storeHost=/10.0.90.211:10911, msgId=0A005AD300002A9F000000000000469A, commitLogOffset=18074, bodyCRC=1250988402, reconsumeTimes=0, preparedTransactionOffset=17753, toString()=Message{topic='TransactionTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TransactionTopic, MAX_OFFSET=2, KEYS=Num3, TRAN_MSG=true, CONSUME_START_TIME=1646898932341, UNIQ_KEY=AC6E00564F4018B4AAC231C40E650003, CLUSTER=DefaultCluster, PGROUP=transaction-producer-group, WAIT=true, REAL_QID=1}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 84, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 32, 77, 101, 115, 115, 97, 103, 101, 51], transactionId='AC6E00564F4018B4AAC231C40E650003'}]---消息內(nèi)容為:Hello RocketMQ Transaction Message3
消費者接收到消息: MessageExt [brokerName=broker-a, queueId=2, storeSize=313, queueOffset=2, sysFlag=8, bornTimestamp=1646898932344, bornHost=/10.0.90.139:57933, storeTimestamp=1646898931758, storeHost=/10.0.90.211:10911, msgId=0A005AD300002A9F000000000000499C, commitLogOffset=18844, bodyCRC=1425278161, reconsumeTimes=0, preparedTransactionOffset=18523, toString()=Message{topic='TransactionTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TransactionTopic, MAX_OFFSET=3, KEYS=Num4, TRAN_MSG=true, CONSUME_START_TIME=1646898932359, UNIQ_KEY=AC6E00564F4018B4AAC231C40E780004, CLUSTER=DefaultCluster, PGROUP=transaction-producer-group, WAIT=true, REAL_QID=2}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 84, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 32, 77, 101, 115, 115, 97, 103, 101, 52], transactionId='AC6E00564F4018B4AAC231C40E780004'}]---消息內(nèi)容為:Hello RocketMQ Transaction Message4
消費者接收到消息: MessageExt [brokerName=broker-a, queueId=3, storeSize=313, queueOffset=2, sysFlag=8, bornTimestamp=1646898932360, bornHost=/10.0.90.139:57933, storeTimestamp=1646898931774, storeHost=/10.0.90.211:10911, msgId=0A005AD300002A9F0000000000004C9E, commitLogOffset=19614, bodyCRC=603141191, reconsumeTimes=0, preparedTransactionOffset=19293, toString()=Message{topic='TransactionTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TransactionTopic, MAX_OFFSET=3, KEYS=Num5, TRAN_MSG=true, CONSUME_START_TIME=1646898932375, UNIQ_KEY=AC6E00564F4018B4AAC231C40E880005, CLUSTER=DefaultCluster, PGROUP=transaction-producer-group, WAIT=true, REAL_QID=3}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 84, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 32, 77, 101, 115, 115, 97, 103, 101, 53], transactionId='AC6E00564F4018B4AAC231C40E880005'}]---消息內(nèi)容為:Hello RocketMQ Transaction Message5
消費者接收到消息: MessageExt [brokerName=broker-a, queueId=0, storeSize=313, queueOffset=2, sysFlag=8, bornTimestamp=1646898932377, bornHost=/10.0.90.139:57933, storeTimestamp=1646898931801, storeHost=/10.0.90.211:10911, msgId=0A005AD300002A9F0000000000004FA0, commitLogOffset=20384, bodyCRC=989488637, reconsumeTimes=0, preparedTransactionOffset=20063, toString()=Message{topic='TransactionTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TransactionTopic, MAX_OFFSET=3, KEYS=Num6, TRAN_MSG=true, CONSUME_START_TIME=1646898932402, UNIQ_KEY=AC6E00564F4018B4AAC231C40E990006, CLUSTER=DefaultCluster, PGROUP=transaction-producer-group, WAIT=true, REAL_QID=0}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 84, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 32, 77, 101, 115, 115, 97, 103, 101, 54], transactionId='AC6E00564F4018B4AAC231C40E990006'}]---消息內(nèi)容為:Hello RocketMQ Transaction Message6
消費者接收到消息: MessageExt [brokerName=broker-a, queueId=1, storeSize=313, queueOffset=2, sysFlag=8, bornTimestamp=1646898932402, bornHost=/10.0.90.139:57933, storeTimestamp=1646898931816, storeHost=/10.0.90.211:10911, msgId=0A005AD300002A9F00000000000052A2, commitLogOffset=21154, bodyCRC=1308448107, reconsumeTimes=0, preparedTransactionOffset=20833, toString()=Message{topic='TransactionTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TransactionTopic, MAX_OFFSET=3, KEYS=Num7, TRAN_MSG=true, CONSUME_START_TIME=1646898932441, UNIQ_KEY=AC6E00564F4018B4AAC231C40EB20007, CLUSTER=DefaultCluster, PGROUP=transaction-producer-group, WAIT=true, REAL_QID=1}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 84, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 32, 77, 101, 115, 115, 97, 103, 101, 55], transactionId='AC6E00564F4018B4AAC231C40EB20007'}]---消息內(nèi)容為:Hello RocketMQ Transaction Message7
消費者接收到消息: MessageExt [brokerName=broker-a, queueId=1, storeSize=339, queueOffset=3, sysFlag=8, bornTimestamp=1646898900749, bornHost=/10.0.90.139:57878, storeTimestamp=1646898935220, storeHost=/10.0.90.211:10911, msgId=0A005AD300002A9F000000000000599B, commitLogOffset=22939, bodyCRC=709195884, reconsumeTimes=0, preparedTransactionOffset=22592, toString()=Message{topic='TransactionTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TransactionTopic, TRANSACTION_CHECK_TIMES=1, MAX_OFFSET=4, KEYS=Num9, TRAN_MSG=true, CONSUME_START_TIME=1646898935835, UNIQ_KEY=AC6E00563BCC18B4AAC231C3930D0009, CLUSTER=DefaultCluster, PGROUP=transaction-producer-group, WAIT=true, REAL_QID=1}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 84, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 32, 77, 101, 115, 115, 97, 103, 101, 57], transactionId='AC6E00563BCC18B4AAC231C3930D0009'}]---消息內(nèi)容為:Hello RocketMQ Transaction Message9
消費者接收到消息: MessageExt [brokerName=broker-a, queueId=0, storeSize=339, queueOffset=3, sysFlag=8, bornTimestamp=1646898900727, bornHost=/10.0.90.139:57878, storeTimestamp=1646898935223, storeHost=/10.0.90.211:10911, msgId=0A005AD300002A9F0000000000005B76, commitLogOffset=23414, bodyCRC=1564625146, reconsumeTimes=0, preparedTransactionOffset=22245, toString()=Message{topic='TransactionTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TransactionTopic, TRANSACTION_CHECK_TIMES=1, MAX_OFFSET=4, KEYS=Num8, TRAN_MSG=true, CONSUME_START_TIME=1646898935839, UNIQ_KEY=AC6E00563BCC18B4AAC231C392F70008, CLUSTER=DefaultCluster, PGROUP=transaction-producer-group, WAIT=true, REAL_QID=0}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 84, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 32, 77, 101, 115, 115, 97, 103, 101, 56], transactionId='AC6E00563BCC18B4AAC231C392F70008'}]---消息內(nèi)容為:Hello RocketMQ Transaction Message8
三、RocketMQ事務(wù)消息原理
設(shè)計思想
在RocketMQ事務(wù)消息的主要流程中,一階段的消息如何對用戶不可見。其中,事務(wù)消息相對普通消息最大的特點就是一階段發(fā)送的消息對用戶是不可見的。那么,如何做到寫入消息但是對用戶不可見呢?RocketMQ事務(wù)消息的做法是:如果消息是half消息,將備份原消息的主題與消息消費隊列,然后改變主題為RMQ_SYS_TRANS_HALF_TOPIC。由于消費組未訂閱該主題,故消費端無法消費half類型的消息。
如何實現(xiàn)事務(wù)回查?
Broker會啟動一個消息回查的定時任務(wù),定時從事務(wù)消息queue中讀取所有待反查的消息。針對每個需要反查的半消息,Broker會給對應(yīng)的Producer發(fā)一個要求執(zhí)行事務(wù)狀態(tài)反查的RPC請求。然后根據(jù)RPC返回響應(yīng)中的反查結(jié)果,來決定這個半消息是需要提交還是回滾,或者后續(xù)繼續(xù)來反查。最后,提交或者回滾事務(wù),將半消息標記為已處理狀態(tài)【將消息存儲在主題為:RMQ_SYS_TRANS_OP_HALF_TOPIC的主題中,代表這些消息已經(jīng)被處理(提交或回滾)】。 如果是提交事務(wù),就把半消息從半消息隊列中復(fù)制到該消息真正的topic和queue中; 如果是回滾事務(wù),則什么都不做。
值得注意的是,rocketmq并不會無休止的的信息事務(wù)狀態(tài)回查,默認回查15次,如果15次回查還是無法得知事務(wù)狀態(tài),rocketmq默認回滾該消息。
四、RocketMQ事務(wù)消息使用限制
使用事務(wù)消息,有一些限制條件:
- 事務(wù)消息不支持延時消息和批量消息;
- 事務(wù)性消息可能不止一次被檢查或消費,所以消費者端需要做好消費冪等;
- 為了避免單個消息被檢查太多次而導(dǎo)致半隊列消息累積,我們默認將單個消息的檢查次數(shù)限制為 15 次(即默認只會回查15次),我們可以通過 Broker 配置文件的
transactionCheckMax
參數(shù)來修改此限制。如果已經(jīng)檢查某條消息超過 N 次的話( N =transactionCheckMax
), 則 Broker 將丟棄此消息,并在默認情況下同時打印錯誤日志。用戶可以通過重寫AbstractTransactionCheckListener
類來修改這個行為; - 事務(wù)消息將在 Broker 配置文件中的參數(shù) transactionMsgTimeout 這樣的特定時間長度之后被檢查。當發(fā)送事務(wù)消息時,用戶還可以通過設(shè)置用戶屬性 CHECK_IMMUNITY_TIME_IN_SECONDS 來改變這個限制,該參數(shù)優(yōu)先于
transactionMsgTimeout
參數(shù); - 提交給用戶的目標主題消息可能會失敗,目前這依日志的記錄而定。它的高可用性通過 RocketMQ 本身的高可用性機制來保證,如果希望確保事務(wù)消息不丟失、并且事務(wù)完整性得到保證,建議使用同步的雙重寫入機制。
- 事務(wù)消息的生產(chǎn)者 ID 不能與其他類型消息的生產(chǎn)者 ID 共享。與其他類型的消息不同,事務(wù)消息允許反向查詢、MQ服務(wù)器能通過它們的生產(chǎn)者 ID 查詢到消費者。
到此這篇關(guān)于RocketMQ事務(wù)消息原理與使用詳解的文章就介紹到這了,更多相關(guān)RocketMQ事務(wù)消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Springboot 項目讀取Resources目錄下的文件(推薦)
這篇文章主要介紹了Springboot 項目讀取Resources目錄下的文件,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-11-11深入淺析SPI機制在JDK與Spring?Boot中的應(yīng)用
SPI是一種使軟件框架或庫更加模塊化、可擴展和可維護的有效方法。通過遵循“開閉原則”,?SPI?確保了系統(tǒng)的穩(wěn)定性和靈活性,從而滿足了不斷變化的業(yè)務(wù)需求,這篇文章主要介紹了SPI機制在JDK與Spring?Boot中的應(yīng)用,需要的朋友可以參考下2023-09-09SpringBoot 下在 yml 中的 logging 日志配置方法
logging 配置主要用于控制應(yīng)用程序的日志輸出行為,可以通過配置定制日志的格式、級別、輸出位置等,這篇文章主要介紹了SpringBoot 下在 yml 中的 logging 日志配置,需要的朋友可以參考下2024-06-06Spring:spring-webmvc和spring-web有哪些區(qū)別
這篇文章主要介紹了Spring:spring-webmvc和spring-web有哪些區(qū)別,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-01-01SpringBoot動態(tài)導(dǎo)出word文檔實整教程(復(fù)制即可使用)
在我們做項目的時候會需要把數(shù)據(jù)庫中的數(shù)據(jù)導(dǎo)出到word當中,下面這篇文章主要給大家介紹了關(guān)于SpringBoot動態(tài)導(dǎo)出word文檔實整教程的相關(guān)資料,文中的代碼復(fù)制即可使用,需要的朋友可以參考下2023-06-06Junit 5中@ParameterizedTest與@EnumSource結(jié)合使用
今天小編就為大家分享一篇關(guān)于Junit 5中@ParameterizedTest與@EnumSource結(jié)合使用,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2018-12-12