RocketMQ事務(wù)消息保證消息的可靠性和一致性
這篇講解一下rocketMq的事務(wù)消息的原理
在發(fā)送事務(wù)消息的時候,會加一個標(biāo)識,表示這個消息是事務(wù)消息。broker接收到消息后,在我們之前看的代碼里org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage會判斷是否是事務(wù)消息。
if (sendTransactionPrepareMessage) { asyncPutMessageFuture = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner); } else { asyncPutMessageFuture = this.brokerController.getMessageStore().asyncPutMessage(msgInner); }
sendTransactionPrepareMessage=true表示是事務(wù)消息,所以走了一個單獨的邏輯。
public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) { return store.asyncPutMessage(parseHalfMessageInner(messageInner)); }
這里parseHalfMessageInner這個方法里面開始了偷梁換柱,把topic和queueId都改了,把原本的信息先存在變量里面。所以實際上這個消息發(fā)到了半消息專有的topic里面,topic名字叫做RMQ_SYS_TRANS_HALF_TOPIC
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) { MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic()); MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId())); msgInner.setSysFlag( MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE)); msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic()); msgInner.setQueueId(0); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); return msgInner; }
然后其他代碼還是和普通的消息一樣,就是把事務(wù)消息做了轉(zhuǎn)發(fā),存在了RMQ_SYS_TRANS_HALF_TOPIC里面。
到這里發(fā)送半消息就成功了,然后最后客戶端發(fā)送了半消息之后,會查一下本地事務(wù)的情況是否完成。這里有3種情況:commit、rollback、未知。完成和回滾都是確認(rèn)的狀態(tài),這個比較好處理,比較難的是未知。我們先看能得到確認(rèn)結(jié)果的情況。
如果完成和回滾,會給客戶端發(fā)送結(jié)束事務(wù)的消息,這個消息叫END_TRANSACTION,包括消息里面包括了之前發(fā)送的半消息的id和offset。
broker處理的代碼在org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest中。就是根據(jù)offset拿到半消息,然后如果是commit,就是把原本的topic和queueId還原,發(fā)到原本的隊列里面,這樣就可以正常消費了。然后把這個半消息“刪除”。如果是rollBack,也是拿到這個半消息,然后直接“刪除”就可以了。接下來看一下怎么“刪除”。
為什么我刪除會打引號呢?因為半消息其實就是跟正常的消息一樣,存在commitLog文件里面,mq的設(shè)計,就沒有刪除這個功能。所以所謂的刪除其實就是把這個消息消費掉,不做任何處理,就是刪除了。
想象一下,這個半消息有commit/rollBack/未知,3種狀態(tài),未知的肯定不能刪除,那他怎么知道哪些消息是可以刪除的呢?總不能所有的都再去客戶端查一下事務(wù)的結(jié)果吧?mq怎么做的呢?前面提到的刪除其實就是把這些commit和rollBack處理過后的半消息,再保存起來,后面消費半消息的數(shù)據(jù)的時候,只要從里面查一下是否需要刪除就可以了。
這里又有一個問題,怎么把需要刪除的半消息存起來呢?mq存儲數(shù)據(jù)就是commitLog,所以其實這些需要刪除的數(shù)據(jù),就是又發(fā)到了一個特定的topic里面。這個topic名字是RMQ_SYS_TRANS_OP_HALF_TOPIC。主意區(qū)分,原本半消息的topic名字是half_topic,這個topic名字是op_half_topic,存儲的是處理過后,可以刪除的半消息。
所以說前面提到的帶引號的“刪除”,就是把消息發(fā)到op_half_topic就表示是刪除了,這個op_half_topic消息的內(nèi)容就是half_topic的offset。那么現(xiàn)在需要有個地方,來消費half_topic,然后判斷是否存在于op_half_topic,如果是表示可以刪除了,如果不是,就接著保存起來。
處理邏輯就在TransactionalMessageCheckService這個定時任務(wù)中。具體是在TransactionalMessageServiceImpl#check方法里面
@Override public void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener) { try { String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC; // 先拿到半消息 Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic); if (msgQueues == null || msgQueues.size() == 0) { log.warn("The queue of topic is empty :" + topic); return; } log.debug("Check topic={}, queues={}", topic, msgQueues); for (MessageQueue messageQueue : msgQueues) { long startTime = System.currentTimeMillis(); MessageQueue opQueue = getOpQueue(messageQueue); // 拿到半消息的最小偏移量 long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue); // 拿到op_half的最小偏移量 long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue); log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset); if (halfOffset < 0 || opOffset < 0) { log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue, halfOffset, opOffset); continue; } List<Long> doneOpOffset = new ArrayList<>(); HashMap<Long, Long> removeMap = new HashMap<>(); // 拉取op的消息(32條),op消息內(nèi)容是half的offset,跟half_topic的最小offset比較,如果op的小于最小的,就說明已經(jīng)處理過了,放在doneOpOffset,反之,則說明還沒處理過,就先放在removeMap里面 PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset); if (null == pullResult) { log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null", messageQueue, halfOffset, opOffset); continue; } // single thread int getMessageNullCount = 1; long newOffset = halfOffset; long i = halfOffset; // 然后對half_topic進(jìn)行處理 while (true) { if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) { log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT); break; } // 如果這個offset已經(jīng)處理過了,就接著處理下一個 if (removeMap.containsKey(i)) { log.debug("Half offset {} has been committed/rolled back", i); Long removedOpOffset = removeMap.remove(i); doneOpOffset.add(removedOpOffset); } else { // 如果沒有處理過,就要把數(shù)據(jù)撈出來重新投遞 GetResult getResult = getHalfMsg(messageQueue, i); MessageExt msgExt = getResult.getMsg(); if (msgExt == null) { if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) { break; } if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) { log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult()); break; } else { log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult()); i = getResult.getPullResult().getNextBeginOffset(); newOffset = i; continue; } } if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) { listener.resolveDiscardMsg(msgExt); newOffset = i + 1; i++; continue; } if (msgExt.getStoreTimestamp() >= startTime) { log.debug("Fresh stored. the miss offset={}, check it later, store={}", i, new Date(msgExt.getStoreTimestamp())); break; } long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp(); long checkImmunityTime = transactionTimeout; String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS); if (null != checkImmunityTimeStr) { checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout); if (valueOfCurrentMinusBorn < checkImmunityTime) { if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) { newOffset = i + 1; i++; continue; } } } else { if (0 <= valueOfCurrentMinusBorn && valueOfCurrentMinusBorn < checkImmunityTime) { log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i, checkImmunityTime, new Date(msgExt.getBornTimestamp())); break; } } List<MessageExt> opMsg = pullResult.getMsgFoundList(); boolean isNeedCheck = opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime || opMsg != null && opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout || valueOfCurrentMinusBorn <= -1; if (isNeedCheck) { // 重新投遞 if (!putBackHalfMsgQueue(msgExt, i)) { continue; } // 再重新確認(rèn)事務(wù) listener.resolveHalfMsg(msgExt); } else { pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset); log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i, messageQueue, pullResult); continue; } } newOffset = i + 1; i++; } // 更新offset if (newOffset != halfOffset) { transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset); } long newOpOffset = calculateOpOffset(doneOpOffset, opOffset); if (newOpOffset != opOffset) { transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset); } } } catch (Throwable e) { log.error("Check error", e); } }
我講解一下這個代碼做了啥。我們先明確這個代碼是要實現(xiàn)什么功能。就是消費half_topic,然后去根據(jù)op_half_topic的數(shù)據(jù)來判斷half_topc的消息是否被處理過,處理過了就直接忽略、丟棄,如果沒有處理過,就“保留”這個消息,等待后面事務(wù)確認(rèn)了再處理。
這里“保留”我也是加了引號,因為mq消費是一條一條按順序消費,如果中間有一個數(shù)據(jù)卡住了,后面數(shù)據(jù)就沒法消費了。所以這里“保留”,其實也是消費了,只是他消費到了不確定結(jié)果的消息,他是重新投遞到了half_topic,來實現(xiàn)“保留”的目的。
好了,明確了這個代碼實現(xiàn)的功能,我們來一步步看一下細(xì)節(jié)。
首先是拿到half_topic和op_half_topic的offset,知道現(xiàn)在是消費到了哪里。然后去拉取op_half_topic,每次32條,op_half消息內(nèi)容存的是half_topic的offset,只要判斷這條op_half里面的offset小于half_topic的offset,就表示已經(jīng)消費過了,放在doneOpOffset的list里面,如果op_half保存的offset大于half_topic的offset,就表示還沒消費,放入removeMap,就表示這個半消息可以放心刪除了。
這一步,通過消費op_half,跟half_topic的minOffset做比較,構(gòu)建了doneOpOffset,和removeMap。
然后就是消費half_topic的消息,只要判斷每條消息的offset是否在removeMap中,就表示可以刪除,放入doneOpOffset中,直接消費下一條數(shù)據(jù),所以這里其實也不用真的拉取half_topic的消息,只要用offset來判斷就行,消費過了,offset+1,就可以去判斷下一條消息。
如果half_topic的offset沒有在removeMap中,就表示暫時還不知道結(jié)果,這時候就重新發(fā)送到half_topic,重新投遞之后,然后給客戶端發(fā)送一個檢查事務(wù)的請求,客戶端檢測過后,還是用之前的END_TRANSACTION命令,再發(fā)給broker,broker就會放到op_half里面,等于就是重新發(fā)了一個半消息的流程,實現(xiàn)了閉環(huán)。
最后就是更新兩個topic的offset了。之前的doneOpOffset保存下來,就是為了更新op_half的offset,只有都處理過了,才會更新,如果中間有一個沒有處理,就會阻塞在那條消息。
總結(jié):
所以現(xiàn)在的情況是這樣的,對于half_topic的半消息如果有結(jié)果就忽略,如果沒有結(jié)果就重新投遞,不會阻塞,所以half_topic的offset會一直往后更新。但是op_half要等所有的都done了,才會更新offset。假設(shè)一種情況,如果op_offset1對于的是half_offset1這個消息,然后half_offset1剛好被消費,重新投遞了。這是op_offset1找不到對應(yīng)的半消息,所以不會被消費。但是不會被卡主,等到下次的時候,op_offset1這個數(shù)據(jù)的offset已經(jīng)小于half_offset1這個消息的offset,所以這個op_offset1也會當(dāng)做已經(jīng)處理過了。
可以看到整個過程其實很巧妙,大家可以結(jié)合代碼捋一捋。
到此這篇關(guān)于RocketMQ事務(wù)消息保證消息的可靠性和一致性的文章就介紹到這了,更多相關(guān)RocketMQ事務(wù)消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java Spring開發(fā)環(huán)境搭建及簡單入門示例教程
這篇文章主要介紹了Java Spring開發(fā)環(huán)境搭建及簡單入門示例,結(jié)合實例形式分析了spring環(huán)境搭建、配置、使用方法及相關(guān)注意事項,需要的朋友可以參考下2017-11-11Java使用Junit4.jar進(jìn)行單元測試的方法
今天通過本文給大家介紹Java使用Junit4.jar進(jìn)行單元測試的方法,本文通過圖文實例相結(jié)合給大家介紹的非常詳細(xì),需要的朋友參考下吧2021-11-11Spring Boot整合Spring Security簡單實現(xiàn)登入登出從零搭建教程
這篇文章主要給大家介紹了關(guān)于Spring Boot整合Spring Security簡單實現(xiàn)登入登出從零搭建的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面來一起看看吧2018-09-09Java?循環(huán)隊列/環(huán)形隊列的實現(xiàn)流程
循環(huán)隊列又叫環(huán)形隊列,是一種特殊的隊列。循環(huán)隊列解決了隊列出隊時需要將所有數(shù)據(jù)前移一位的問題。本文將帶大家詳細(xì)了解循環(huán)隊列如何實現(xiàn),需要的朋友可以參考一下2022-02-02Mybatis中使用updateBatch進(jìn)行批量更新
這篇文章主要介紹了Mybatis中使用updateBatch進(jìn)行批量更新的相關(guān)資料,有逐條更新,sql批量更新等,具體實例代碼大家參考下本文2018-04-04Spring boot項目redisTemplate實現(xiàn)輕量級消息隊列的方法
這篇文章主要給大家介紹了關(guān)于Spring boot項目redisTemplate實現(xiàn)輕量級消息隊列的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家學(xué)習(xí)或者使用Spring boot具有一定的參考學(xué)習(xí)價值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧2019-04-04iOS獲取AppIcon and LaunchImage''s name(app圖標(biāo)和啟動圖片名字)
這篇文章主要介紹了iOS獲取AppIcon and LaunchImage's name(app圖標(biāo)和啟動圖片名字)的相關(guān)資料,非常不錯,具有參考借鑒價值,感興趣的朋友一起學(xué)習(xí)吧2016-08-08圖解Spring Security 中用戶是如何實現(xiàn)登錄的
這篇文章主要介紹了圖解Spring Security 中用戶是如何實現(xiàn)登錄的,文中通過示例代碼和圖片介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-07-07