RocketMQ事務消息保證消息的可靠性和一致性
這篇講解一下rocketMq的事務消息的原理
在發(fā)送事務消息的時候,會加一個標識,表示這個消息是事務消息。broker接收到消息后,在我們之前看的代碼里org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage會判斷是否是事務消息。
if (sendTransactionPrepareMessage) {
asyncPutMessageFuture = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
asyncPutMessageFuture = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
sendTransactionPrepareMessage=true表示是事務消息,所以走了一個單獨的邏輯。
public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {
return store.asyncPutMessage(parseHalfMessageInner(messageInner));
}
這里parseHalfMessageInner這個方法里面開始了偷梁換柱,把topic和queueId都改了,把原本的信息先存在變量里面。所以實際上這個消息發(fā)到了半消息專有的topic里面,topic名字叫做RMQ_SYS_TRANS_HALF_TOPIC
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
然后其他代碼還是和普通的消息一樣,就是把事務消息做了轉發(fā),存在了RMQ_SYS_TRANS_HALF_TOPIC里面。
到這里發(fā)送半消息就成功了,然后最后客戶端發(fā)送了半消息之后,會查一下本地事務的情況是否完成。這里有3種情況:commit、rollback、未知。完成和回滾都是確認的狀態(tài),這個比較好處理,比較難的是未知。我們先看能得到確認結果的情況。
如果完成和回滾,會給客戶端發(fā)送結束事務的消息,這個消息叫END_TRANSACTION,包括消息里面包括了之前發(fā)送的半消息的id和offset。
broker處理的代碼在org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest中。就是根據offset拿到半消息,然后如果是commit,就是把原本的topic和queueId還原,發(fā)到原本的隊列里面,這樣就可以正常消費了。然后把這個半消息“刪除”。如果是rollBack,也是拿到這個半消息,然后直接“刪除”就可以了。接下來看一下怎么“刪除”。
為什么我刪除會打引號呢?因為半消息其實就是跟正常的消息一樣,存在commitLog文件里面,mq的設計,就沒有刪除這個功能。所以所謂的刪除其實就是把這個消息消費掉,不做任何處理,就是刪除了。
想象一下,這個半消息有commit/rollBack/未知,3種狀態(tài),未知的肯定不能刪除,那他怎么知道哪些消息是可以刪除的呢?總不能所有的都再去客戶端查一下事務的結果吧?mq怎么做的呢?前面提到的刪除其實就是把這些commit和rollBack處理過后的半消息,再保存起來,后面消費半消息的數據的時候,只要從里面查一下是否需要刪除就可以了。
這里又有一個問題,怎么把需要刪除的半消息存起來呢?mq存儲數據就是commitLog,所以其實這些需要刪除的數據,就是又發(fā)到了一個特定的topic里面。這個topic名字是RMQ_SYS_TRANS_OP_HALF_TOPIC。主意區(qū)分,原本半消息的topic名字是half_topic,這個topic名字是op_half_topic,存儲的是處理過后,可以刪除的半消息。
所以說前面提到的帶引號的“刪除”,就是把消息發(fā)到op_half_topic就表示是刪除了,這個op_half_topic消息的內容就是half_topic的offset。那么現(xiàn)在需要有個地方,來消費half_topic,然后判斷是否存在于op_half_topic,如果是表示可以刪除了,如果不是,就接著保存起來。
處理邏輯就在TransactionalMessageCheckService這個定時任務中。具體是在TransactionalMessageServiceImpl#check方法里面
@Override
public void check(long transactionTimeout, int transactionCheckMax,
AbstractTransactionalMessageCheckListener listener) {
try {
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
// 先拿到半消息
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
log.warn("The queue of topic is empty :" + topic);
return;
}
log.debug("Check topic={}, queues={}", topic, msgQueues);
for (MessageQueue messageQueue : msgQueues) {
long startTime = System.currentTimeMillis();
MessageQueue opQueue = getOpQueue(messageQueue);
// 拿到半消息的最小偏移量
long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
// 拿到op_half的最小偏移量
long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
if (halfOffset < 0 || opOffset < 0) {
log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
halfOffset, opOffset);
continue;
}
List<Long> doneOpOffset = new ArrayList<>();
HashMap<Long, Long> removeMap = new HashMap<>();
// 拉取op的消息(32條),op消息內容是half的offset,跟half_topic的最小offset比較,如果op的小于最小的,就說明已經處理過了,放在doneOpOffset,反之,則說明還沒處理過,就先放在removeMap里面
PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
if (null == pullResult) {
log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
messageQueue, halfOffset, opOffset);
continue;
}
// single thread
int getMessageNullCount = 1;
long newOffset = halfOffset;
long i = halfOffset;
// 然后對half_topic進行處理
while (true) {
if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
break;
}
// 如果這個offset已經處理過了,就接著處理下一個
if (removeMap.containsKey(i)) {
log.debug("Half offset {} has been committed/rolled back", i);
Long removedOpOffset = removeMap.remove(i);
doneOpOffset.add(removedOpOffset);
} else {
// 如果沒有處理過,就要把數據撈出來重新投遞
GetResult getResult = getHalfMsg(messageQueue, i);
MessageExt msgExt = getResult.getMsg();
if (msgExt == null) {
if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
break;
}
if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
messageQueue, getMessageNullCount, getResult.getPullResult());
break;
} else {
log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
i, messageQueue, getMessageNullCount, getResult.getPullResult());
i = getResult.getPullResult().getNextBeginOffset();
newOffset = i;
continue;
}
}
if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
listener.resolveDiscardMsg(msgExt);
newOffset = i + 1;
i++;
continue;
}
if (msgExt.getStoreTimestamp() >= startTime) {
log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
new Date(msgExt.getStoreTimestamp()));
break;
}
long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
long checkImmunityTime = transactionTimeout;
String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
if (null != checkImmunityTimeStr) {
checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
if (valueOfCurrentMinusBorn < checkImmunityTime) {
if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
newOffset = i + 1;
i++;
continue;
}
}
} else {
if (0 <= valueOfCurrentMinusBorn && valueOfCurrentMinusBorn < checkImmunityTime) {
log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
checkImmunityTime, new Date(msgExt.getBornTimestamp()));
break;
}
}
List<MessageExt> opMsg = pullResult.getMsgFoundList();
boolean isNeedCheck = opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime
|| opMsg != null && opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout
|| valueOfCurrentMinusBorn <= -1;
if (isNeedCheck) {
// 重新投遞
if (!putBackHalfMsgQueue(msgExt, i)) {
continue;
}
// 再重新確認事務
listener.resolveHalfMsg(msgExt);
} else {
pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
messageQueue, pullResult);
continue;
}
}
newOffset = i + 1;
i++;
}
// 更新offset
if (newOffset != halfOffset) {
transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
}
long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
if (newOpOffset != opOffset) {
transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
}
}
} catch (Throwable e) {
log.error("Check error", e);
}
}
我講解一下這個代碼做了啥。我們先明確這個代碼是要實現(xiàn)什么功能。就是消費half_topic,然后去根據op_half_topic的數據來判斷half_topc的消息是否被處理過,處理過了就直接忽略、丟棄,如果沒有處理過,就“保留”這個消息,等待后面事務確認了再處理。
這里“保留”我也是加了引號,因為mq消費是一條一條按順序消費,如果中間有一個數據卡住了,后面數據就沒法消費了。所以這里“保留”,其實也是消費了,只是他消費到了不確定結果的消息,他是重新投遞到了half_topic,來實現(xiàn)“保留”的目的。
好了,明確了這個代碼實現(xiàn)的功能,我們來一步步看一下細節(jié)。
首先是拿到half_topic和op_half_topic的offset,知道現(xiàn)在是消費到了哪里。然后去拉取op_half_topic,每次32條,op_half消息內容存的是half_topic的offset,只要判斷這條op_half里面的offset小于half_topic的offset,就表示已經消費過了,放在doneOpOffset的list里面,如果op_half保存的offset大于half_topic的offset,就表示還沒消費,放入removeMap,就表示這個半消息可以放心刪除了。
這一步,通過消費op_half,跟half_topic的minOffset做比較,構建了doneOpOffset,和removeMap。
然后就是消費half_topic的消息,只要判斷每條消息的offset是否在removeMap中,就表示可以刪除,放入doneOpOffset中,直接消費下一條數據,所以這里其實也不用真的拉取half_topic的消息,只要用offset來判斷就行,消費過了,offset+1,就可以去判斷下一條消息。
如果half_topic的offset沒有在removeMap中,就表示暫時還不知道結果,這時候就重新發(fā)送到half_topic,重新投遞之后,然后給客戶端發(fā)送一個檢查事務的請求,客戶端檢測過后,還是用之前的END_TRANSACTION命令,再發(fā)給broker,broker就會放到op_half里面,等于就是重新發(fā)了一個半消息的流程,實現(xiàn)了閉環(huán)。
最后就是更新兩個topic的offset了。之前的doneOpOffset保存下來,就是為了更新op_half的offset,只有都處理過了,才會更新,如果中間有一個沒有處理,就會阻塞在那條消息。
總結:
所以現(xiàn)在的情況是這樣的,對于half_topic的半消息如果有結果就忽略,如果沒有結果就重新投遞,不會阻塞,所以half_topic的offset會一直往后更新。但是op_half要等所有的都done了,才會更新offset。假設一種情況,如果op_offset1對于的是half_offset1這個消息,然后half_offset1剛好被消費,重新投遞了。這是op_offset1找不到對應的半消息,所以不會被消費。但是不會被卡主,等到下次的時候,op_offset1這個數據的offset已經小于half_offset1這個消息的offset,所以這個op_offset1也會當做已經處理過了。
可以看到整個過程其實很巧妙,大家可以結合代碼捋一捋。
到此這篇關于RocketMQ事務消息保證消息的可靠性和一致性的文章就介紹到這了,更多相關RocketMQ事務消息內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Java Spring開發(fā)環(huán)境搭建及簡單入門示例教程
這篇文章主要介紹了Java Spring開發(fā)環(huán)境搭建及簡單入門示例,結合實例形式分析了spring環(huán)境搭建、配置、使用方法及相關注意事項,需要的朋友可以參考下2017-11-11
Spring Boot整合Spring Security簡單實現(xiàn)登入登出從零搭建教程
這篇文章主要給大家介紹了關于Spring Boot整合Spring Security簡單實現(xiàn)登入登出從零搭建的相關資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面來一起看看吧2018-09-09
Java?循環(huán)隊列/環(huán)形隊列的實現(xiàn)流程
循環(huán)隊列又叫環(huán)形隊列,是一種特殊的隊列。循環(huán)隊列解決了隊列出隊時需要將所有數據前移一位的問題。本文將帶大家詳細了解循環(huán)隊列如何實現(xiàn),需要的朋友可以參考一下2022-02-02
Spring boot項目redisTemplate實現(xiàn)輕量級消息隊列的方法
這篇文章主要給大家介紹了關于Spring boot項目redisTemplate實現(xiàn)輕量級消息隊列的相關資料,文中通過示例代碼介紹的非常詳細,對大家學習或者使用Spring boot具有一定的參考學習價值,需要的朋友們下面來一起學習學習吧2019-04-04
iOS獲取AppIcon and LaunchImage''s name(app圖標和啟動圖片名字)
這篇文章主要介紹了iOS獲取AppIcon and LaunchImage's name(app圖標和啟動圖片名字)的相關資料,非常不錯,具有參考借鑒價值,感興趣的朋友一起學習吧2016-08-08
圖解Spring Security 中用戶是如何實現(xiàn)登錄的
這篇文章主要介紹了圖解Spring Security 中用戶是如何實現(xiàn)登錄的,文中通過示例代碼和圖片介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-07-07

