RocketMQ事務(wù)消息使用與原理詳解
一、背景&概述
最近在找工作,面試過程中被多次問到事務(wù)消息的實現(xiàn)原理,另外在分布式事務(wù)解決方案中,事務(wù)消息也是一個不錯的解決方案,本篇文章將圍繞RocketMQ的事務(wù)消息實現(xiàn)展開描述。
二、應(yīng)用場景
所謂事務(wù)消息,其實是為了解決上下游寫一致性,以及強依賴解耦,也即是完成當(dāng)前操作的同時給下游發(fā)送指令,并且保證上下游要么同時成功或者同時失敗,并且考慮上游的性能和RT問題做出的強調(diào)用解耦妥協(xié)。常見的應(yīng)用場景有:
1.訂單履約指令下發(fā)
用戶下單成功后,給履約系統(tǒng)發(fā)送指令進行履約操作,下單失敗不發(fā)送指令,采購缺貨或者其他履約異常,反向觸發(fā)訂單取消或者其他兜底操作。
2.用戶轉(zhuǎn)賬
用戶發(fā)起轉(zhuǎn)賬后,交易狀態(tài)短暫掛起,發(fā)送指令給銀行,如果發(fā)起失敗則不發(fā)送指令,發(fā)送成功后等待結(jié)果更新交易狀態(tài)。
3.訂單支付
支付發(fā)起后,當(dāng)筆訂單處于中間狀態(tài),給支付網(wǎng)關(guān)發(fā)起指令,如果發(fā)起失敗則不發(fā)送指令,發(fā)送成功后等待支付網(wǎng)關(guān)反饋更新支付狀態(tài)。
三、使用方式
1.事務(wù)消息監(jiān)聽器
@Component @Slf4j public class OrderTransactionalListener implements TransactionListener { @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { log.info("開始執(zhí)行本地事務(wù)...."); LocalTransactionState state; try{ String body = new String(message.getBody()); OrderDTO order = JSONObject.parseObject(body, OrderDTO.class); orderService.createOrder(order,message.getTransactionId()); state = LocalTransactionState.COMMIT_MESSAGE; log.info("本地事務(wù)已提交。{}",message.getTransactionId()); }catch (Exception e){ log.error("執(zhí)行本地事務(wù)失敗。{}",e); state = LocalTransactionState.ROLLBACK_MESSAGE; } return state; } @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { log.info("開始回查本地事務(wù)狀態(tài)。{}",messageExt.getTransactionId()); LocalTransactionState state; String transactionId = messageExt.getTransactionId(); if (transactionLogService.get(transactionId)>0){ state = LocalTransactionState.COMMIT_MESSAGE; }else { state = LocalTransactionState.UNKNOW; } log.info("結(jié)束本地事務(wù)狀態(tài)查詢:{}",state); return state; } }
2.編寫事務(wù)消息生產(chǎn)者
@Component @Slf4j public class TransactionalMsgProducer implements InitializingBean, DisposableBean { private String GROUP = "order_transactional"; private TransactionMQProducer msgProducer; //用于執(zhí)行本地事務(wù)和事務(wù)狀態(tài)回查的監(jiān)聽器 @Autowired private OrderTransactionalListener orderTransactionListener; //執(zhí)行任務(wù)的線程池 private ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(50)); private void start(){ try { this.msgProducer.start(); } catch (MQClientException e) { log.error("msg producer starter occur error;",e); } } private void shutdown() { if(null != msgProducer) { try { msgProducer.shutdown(); } catch (Exception e) { log.error("producer shutdown occur error;",e); } } } public TransactionSendResult send(String data, String topic) throws MQClientException { Message message = new Message(topic,data.getBytes()); return this.msgProducer.sendMessageInTransaction(message, null); } @Override public void afterPropertiesSet() throws Exception { msgProducer = new TransactionMQProducer(GROUP); msgProducer.setNamesrvAddr("namesrvHost:ip"); msgProducer.setSendMsgTimeout(Integer.MAX_VALUE); msgProducer.setExecutorService(executor); msgProducer.setTransactionListener(orderTransactionListener); this.start(); } @Override public void destroy() throws Exception { this.shutdown(); } }
3.業(yè)務(wù)實現(xiàn)
@Service @Slf4j public class OrderService { @Autowired private OrderMapper orderMapper; @Autowired private TransactionLogMapper transactionLogMapper; @Autowired private TransactionalMsgProducer producer; //執(zhí)行本地事務(wù)時調(diào)用,將訂單數(shù)據(jù)和事務(wù)日志寫入本地數(shù)據(jù)庫 @Transactional @Override public void createOrder(OrderDTO orderDTO,String transactionId){ //1.創(chuàng)建訂單 Order order = new Order(); BeanUtils.copyProperties(orderDTO,order); orderMapper.createOrder(order); //2.寫入事務(wù)日志 TransactionLog log = new TransactionLog(); log.setId(transactionId); log.setBusiness("order"); log.setForeignKey(String.valueOf(order.getId())); transactionLogMapper.insert(log); log.info("create order success,order={}",orderDTO); } //前端調(diào)用,只用于向RocketMQ發(fā)送事務(wù)消息 @Override public void createOrder(OrderDTO order) throws MQClientException { order.setId(snowflake.nextId()); order.setOrderNo(snowflake.nextIdStr()); producer.send(JSON.toJSONString(order),"order"); } }
4.入口調(diào)用
@RestController @Slf4j public class OrderController { @Autowired private OrderService orderService; @PostMapping("/create_order") public void createOrder(@RequestBody OrderDTO order) { log.info("receive order data,order={}",order.getCommodityCode()); orderService.createOrder(order); } }
這樣我們就實現(xiàn)了rocketmq事務(wù)消息的使用。
四、原理介紹
1.概念模型
- 半消息(half message):半消息是一種特殊的消息類型,該狀態(tài)的消息暫時不能被Consumer消費(消費端不可見)。當(dāng)一條事務(wù)消息被成功投遞到Broker上,但是Broker并沒有接收到Producer發(fā)出的二次確認時,該事務(wù)消息就處于"暫時不可被消費"狀態(tài),該狀態(tài)的事務(wù)消息被稱為半消息。
- 消息狀態(tài)回查(Message status check):由于網(wǎng)絡(luò)抖動閃斷、Producer重啟等原因,可能導(dǎo)致Producer向Broker發(fā)送的二次確認消息沒有成功送達。如果Broker檢測到某條事務(wù)消息長時間處于半消息狀態(tài),則會主動向Producer端發(fā)起回查操作,查詢該事務(wù)消息在Producer端的事務(wù)狀態(tài)(Commit 或 Rollback)。可以看出,Message Status Check主要用來解決分布式事務(wù)中的超時問題。
2.執(zhí)行流程
1):Producer向Broker端發(fā)送Half Message;
2):Broker ACK,Half Message發(fā)送成功;
3):Producer執(zhí)行本地事務(wù);
4):本地事務(wù)完畢,根據(jù)事務(wù)的狀態(tài),Producer向Broker發(fā)送二次確認消息,確認該Half Message的Commit或者Rollback狀態(tài)。Broker收到二次確認消息后,對于Commit狀態(tài),則直接發(fā)送到Consumer端執(zhí)行消費邏輯,而對于Rollback則直接標(biāo)記為失敗,一段時間后清除,并不會發(fā)給Consumer。正常情況下,到此分布式事務(wù)已經(jīng)完成,剩下要處理的就是超時問題,即一段時間后Broker仍沒有收到Producer的二次確認消息;
5):針對超時狀態(tài),Broker主動向Producer發(fā)起消息回查;
6):Producer處理回查消息,返回對應(yīng)的本地事務(wù)的執(zhí)行結(jié)果;
7):Broker針對回查消息的結(jié)果,執(zhí)行Commit或Rollback操作,同4。
3.事務(wù)消息設(shè)計
在RocketMQ事務(wù)消息的主要流程中,一階段的消息如何對用戶不可見。其中事務(wù)消息相對普通消息最大的特點就是一階段發(fā)送的消息對用戶是不可見的。那么如何做到寫入消息但是對用戶不可見呢?RocketMQ事務(wù)消息的做法是:如果消息是half消息,將備份原消息的主題與消息消費隊列,然后改變主題為RMQ_SYS_TRANS_HALF_TOPIC。由于消費組未訂閱該主題,故消費端無法消費half類型的消息,然后RocketMQ會開啟一個定時任務(wù),從Topic為RMQ_SYS_TRANS_HALF_TOPIC中拉取消息進行消費,根據(jù)生產(chǎn)者組獲取一個服務(wù)提供者發(fā)送回查事務(wù)狀態(tài)請求,根據(jù)事務(wù)狀態(tài)來決定是提交或回滾消息。
在RocketMQ中,消息在服務(wù)端的存儲結(jié)構(gòu)如下,每條消息都會有對應(yīng)的索引信息,Consumer通過ConsumeQueue這個二級索引來讀取消息實體內(nèi)容,其流程如下:
RocketMQ的具體實現(xiàn)策略是:寫入的如果事務(wù)消息,對消息的Topic和Queue等屬性進行替換,同時將原來的Topic和Queue信息存儲到消息的屬性中,正因為消息主題被替換,故消息并不會轉(zhuǎn)發(fā)到該原主題的消息消費隊列,消費者無法感知消息的存在,不會消費。
在完成一階段寫入一條對用戶不可見的消息后,二階段如果是Commit操作,則需要讓消息對用戶可見;如果是Rollback則需要撤銷一階段的消息。先說Rollback的情況。對于Rollback,本身一階段的消息對用戶是不可見的,其實不需要真正撤銷消息(實際上RocketMQ也無法去真正的刪除一條消息,因為是順序?qū)懳募模?。但是區(qū)別于這條消息沒有確定狀態(tài)(Pending狀態(tài),事務(wù)懸而未決),需要一個操作來標(biāo)識這條消息的最終狀態(tài)。RocketMQ事務(wù)消息方案中引入了Op消息的概念,用Op消息標(biāo)識事務(wù)消息已經(jīng)確定的狀態(tài)(Commit或者Rollback)。如果一條事務(wù)消息沒有對應(yīng)的Op消息,說明這個事務(wù)的狀態(tài)還無法確定(可能是二階段失敗了)。引入Op消息后,事務(wù)消息無論是Commit或者Rollback都會記錄一個Op操作。Commit相對于Rollback只是在寫入Op消息前創(chuàng)建Half消息的索引。
一階段的Half消息由于是寫到一個特殊的Topic,所以二階段構(gòu)建索引時需要讀取出Half消息,并將Topic和Queue替換成真正的目標(biāo)的Topic和Queue,之后通過一次普通消息的寫入操作來生成一條對用戶可見的消息。所以RocketMQ事務(wù)消息二階段其實是利用了一階段存儲的消息的內(nèi)容,在二階段時恢復(fù)出一條完整的普通消息。
如果在RocketMQ事務(wù)消息的二階段過程中失敗了,例如在做Commit操作時,出現(xiàn)網(wǎng)絡(luò)問題導(dǎo)致Commit失敗,那么需要通過一定的策略使這條消息最終被Commit。RocketMQ采用了一種補償機制,稱為“回查”。Broker端對未確定狀態(tài)的消息發(fā)起回查,將消息發(fā)送到對應(yīng)的Producer端(同一個Group的Producer),由Producer根據(jù)消息來檢查本地事務(wù)的狀態(tài),進而執(zhí)行Commit或者Rollback。Broker端通過對比Half消息和Op消息進行事務(wù)消息的回查并且推進CheckPoint(記錄那些事務(wù)消息的狀態(tài)是確定的)。
需要注意的是,rocketmq并不會無休止的的信息事務(wù)狀態(tài)回查,默認回查15次,如果15次回查還是無法得知事務(wù)狀態(tài),rocketmq默認回滾該消息。
五、源碼分析
1.客服端發(fā)送事務(wù)消息
RocketMQ事務(wù)消息由TransactionMQProducer實現(xiàn),繼承DefaultMQProducer實現(xiàn)了發(fā)送事務(wù)消息的能力。
發(fā)送事務(wù)消息會調(diào)用TransactionMQProducer的sendMessageInTransaction方法:
public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException { if (null == this.transactionListener) { throw new MQClientException("TransactionListener is null", null); } msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic())); return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg); }
檢查有沒有配置事務(wù)監(jiān)聽器,監(jiān)聽器提供了兩個方法:
- executeLocalTransaction:執(zhí)行本地事務(wù)
- checkLocalTransaction:回查本地事務(wù)
然后調(diào)用DefaultMQProducerImpl執(zhí)行發(fā)送:
public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException { //...省略 SendResult sendResult = null; //msg設(shè)置參數(shù)TRAN_MSG,表示為事務(wù)消息 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); try { //發(fā)送消息 sendResult = this.send(msg); } catch (Exception e) { throw new MQClientException("send message Exception", e); } LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null; switch (sendResult.getSendStatus()) { case SEND_OK: { try { if (sendResult.getTransactionId() != null) { msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); } String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null != transactionId && !"".equals(transactionId)) { msg.setTransactionId(transactionId); } //通過LocalTransactionExecutor執(zhí)行,已經(jīng)廢棄 if (null != localTransactionExecuter) { localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); } else if (transactionListener != null) { //消息發(fā)送成功,執(zhí)行本地事務(wù) localTransactionState = transactionListener.executeLocalTransaction(msg, arg); } if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } } catch (Throwable e) { localException = e; } } break; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break; } try { //執(zhí)行endTransaction方法,如果半消息發(fā)送失敗或本地事務(wù)執(zhí)行失敗告訴服務(wù)端是刪除半消息,半消息發(fā)送成功且本地事務(wù)執(zhí)行成功則告訴服務(wù)端生效半消息 this.endTransaction(sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); } //省略... return transactionSendResult; }
該方法做了以下幾件事情:
- 給消息打上事務(wù)屬性,用于broker區(qū)分普通消息和事務(wù)消息
- 發(fā)送半消息(half message)
- 發(fā)送成功則由transactionListener執(zhí)行本地事務(wù)
- 執(zhí)行endTransaction方法,通知broker 執(zhí)行 commit/rollback
發(fā)送消息會正常調(diào)用DefaultMQProducerImpl的發(fā)送消息邏輯,執(zhí)行本地事務(wù)通過transactionListener調(diào)用本地的事務(wù)邏輯,我們看一下結(jié)束事務(wù)endTransaction方法實現(xiàn):
public void endTransaction( final SendResult sendResult, final LocalTransactionState localTransactionState, final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { final MessageId id; if (sendResult.getOffsetMsgId() != null) { id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId()); } else { id = MessageDecoder.decodeMessageId(sendResult.getMsgId()); } String transactionId = sendResult.getTransactionId(); final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName()); EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(id.getOffset()); switch (localTransactionState) { case COMMIT_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; case ROLLBACK_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break; case UNKNOW: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); break; default: break; } requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); requestHeader.setMsgId(sendResult.getMsgId()); String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null; this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout()); }
本地事務(wù)執(zhí)行后,則調(diào)用this.endTransaction()方法,根據(jù)本地事務(wù)執(zhí)行狀態(tài),去提交事務(wù)或者回滾事務(wù)。
如果半消息發(fā)送失敗或本地事務(wù)執(zhí)行失敗告訴服務(wù)端是刪除半消息,半消息發(fā)送成功且本地事務(wù)執(zhí)行成功則告訴服務(wù)端生效半消息。
2.Broker處理事務(wù)消息
RocketMQ服務(wù)端有個NettyRequestProcessor接口,類似于spring的BeanPostProcessor,broker啟動的時候會把對應(yīng)的實現(xiàn)注冊到NettyRemotingServer的本地緩存processorTable中,在收到producer發(fā)送的消息會調(diào)用NettyServerHandler的channelRead0方法,然后會調(diào)用對應(yīng)的NettyRequestProcessor實現(xiàn)處理接收到的消息請求。看一下SendMessageProcessor實現(xiàn):
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { SendMessageContext traceContext; switch (request.getCode()) { case RequestCode.CONSUMER_SEND_MSG_BACK: return this.consumerSendMsgBack(ctx, request); default: SendMessageRequestHeader requestHeader = parseRequestHeader(request); if (requestHeader == null) { return null; } TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, true); RemotingCommand rewriteResult = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext); if (rewriteResult != null) { return rewriteResult; } traceContext = buildMsgContext(ctx, requestHeader); String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER); traceContext.setCommercialOwner(owner); try { this.executeSendMessageHookBefore(ctx, request, traceContext); } catch (AbortProcessException e) { final RemotingCommand errorResponse = RemotingCommand.createResponseCommand(e.getResponseCode(), e.getErrorMessage()); errorResponse.setOpaque(request.getOpaque()); return errorResponse; } RemotingCommand response; if (requestHeader.isBatch()) { response = this.sendBatchMessage(ctx, request, traceContext, requestHeader, mappingContext, (ctx1, response1) -> executeSendMessageHookAfter(response1, ctx1)); } else { response = this.sendMessage(ctx, request, traceContext, requestHeader, mappingContext, (ctx12, response12) -> executeSendMessageHookAfter(response12, ctx12)); } return response; } }
會調(diào)用到SendMessageProcessor.sendMessage(),判斷消息類型,進行半消息存儲:
public RemotingCommand sendMessage(final ChannelHandlerContext ctx, final RemotingCommand request, final SendMessageContext sendMessageContext, final SendMessageRequestHeader requestHeader, final TopicQueueMappingContext mappingContext, final SendMessageCallback sendMessageCallback) throws RemotingCommandException { //...省略 String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); boolean sendTransactionPrepareMessage = false; if (Boolean.parseBoolean(traFlag) && !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1 if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); return response; } sendTransactionPrepareMessage = true; } long beginTimeMillis = this.brokerController.getMessageStore().now(); if (brokerController.getBrokerConfig().isAsyncSendEnable()) { //...異步發(fā)送 return null; } else { PutMessageResult putMessageResult = null; if (sendTransactionPrepareMessage) { //存儲事務(wù)消息 putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner); } else { //存儲普通消息 putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); } return response; } }
繼續(xù)看事務(wù)半消息存儲實現(xiàn)prepareMessage:
public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) { return transactionalMessageBridge.putHalfMessage(messageInner); } private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) { MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic()); MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId())); msgInner.setSysFlag( MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE)); msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic()); msgInner.setQueueId(0); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); return msgInner; }
備份消息的原主題名稱與原隊列ID,然后取消事務(wù)消息的消息標(biāo)簽,重新設(shè)置消息的主題為:RMQ_SYS_TRANS_HALF_TOPIC,隊列ID固定為0。與其他普通消息區(qū)分開,然后完成消息持久化。
到這里,broker就初步處理完了 Producer 發(fā)送的事務(wù)半消息。
當(dāng)客戶端TransactionMQProducer執(zhí)行endTransaction動作時,觸發(fā)broker事務(wù)消息的二階段提交,broker會執(zhí)行EndTransactionProcessor的processRequest方法:
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { //...省略 OperationResult result = new OperationResult(); if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader); if (result.getResponseCode() == ResponseCode.SUCCESS) { RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage()); msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback())); msgInner.setQueueOffset(requestHeader.getTranStateTableOffset()); msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset()); msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp()); MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED); RemotingCommand sendResult = sendFinalMessage(msgInner); if (sendResult.getCode() == ResponseCode.SUCCESS) { this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return sendResult; } return res; } } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) { result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader); if (result.getResponseCode() == ResponseCode.SUCCESS) { RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return res; } } response.setCode(result.getResponseCode()); response.setRemark(result.getResponseRemark()); return response; }
邏輯很清晰,其核心實現(xiàn)如下:
- 根據(jù)commitlogOffset找到消息
- 如果是提交動作,就恢復(fù)原消息的主題與隊列,再次存入commitlog文件進而轉(zhuǎn)到消息消費隊列,供消費者消費,然后將原預(yù)處理消息存入一個新的主題RMQ_SYS_TRANS_OP_HALF_TOPIC,代表該消息已被處理
- 回滾消息,則直接將原預(yù)處理消息存入一個新的主題RMQ_SYS_TRANS_OP_HALF_TOPIC,代表該消息已被處理
還有一種情況,如果本地事務(wù)執(zhí)行結(jié)果是UNKNOW或者由于網(wǎng)絡(luò)問題沒有提交,那么存儲的broker的事務(wù)消息處于漂浮狀態(tài),無法主動轉(zhuǎn)換成可消費或者刪除狀態(tài),那么就需要broker有一種兜底機制來處理這種場景,當(dāng)然RocketMQ提供了一種補償機制,定時回查此類消息,由TransactionalMessageCheckService實現(xiàn):
@Override public void run() { log.info("Start transaction check service thread!"); while (!this.isStopped()) { long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval(); this.waitForRunning(checkInterval); } log.info("End transaction check service thread!"); } @Override protected void onWaitEnd() { long timeout = brokerController.getBrokerConfig().getTransactionTimeOut(); int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax(); long begin = System.currentTimeMillis(); log.info("Begin to check prepare message, begin time:{}", begin); this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener()); log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin); }
整體流程如下圖:
六、總結(jié)與思考
異常情況覆蓋
- 客戶端producer發(fā)送半消息失敗
可能由于網(wǎng)絡(luò)或者mq故障,導(dǎo)致 Producer 發(fā)送半消息(prepare)失敗??蛻舳朔?wù)可以執(zhí)行回滾操作,比如“訂單關(guān)閉”等。
- 半消息發(fā)送成功,本地事務(wù)執(zhí)行失敗
如果producer發(fā)送的半消息成功了,但是執(zhí)行本地事務(wù)失敗了,如更新訂單狀態(tài)為“已完成”。這種情況下,執(zhí)行本地事務(wù)失敗后,會返回rollback給 MQ,MQ會刪除之前發(fā)送的半消息。不會下發(fā)指令給下游依賴。
- 半消息投遞成功,沒收到MQ返回的ack
如果客戶端發(fā)送半消息成功后,沒有收到MQ返回的響應(yīng)??赡苁且驗榫W(wǎng)絡(luò)問題,或者其他未知異常,客戶端以為發(fā)送MQ半消息失敗,執(zhí)行了逆向回滾流程。這個時候其實mq已經(jīng)保存半消息成功了,那這個消息怎么處理?
這個時候broker的補償邏輯上場,消息回查定時任務(wù)TransactionalMessageCheckService會每隔1分鐘掃描一次半消息隊列,判斷是否需要消息回查,然后回查訂單系統(tǒng)的本地事務(wù),這時MQ就會發(fā)現(xiàn)訂單已經(jīng)變成“已關(guān)閉”,此時就要發(fā)送rollback請求給mq,刪除之前的半消息。
- commit/rollback失敗
這個也是通過定時任務(wù)TransactionalMessageCheckService來做補償,它發(fā)現(xiàn)這個消息超過一定時間還沒有進行二階段處理,就會回查本地事務(wù)。
缺點和替代方案
事務(wù)消息很好了解決了分布式事務(wù)場景的業(yè)務(wù)解耦,但是也存在一些問題,比如引入新的組件依賴,并且事務(wù)消息是強依賴,那么還有沒有其他比較可行的替代方案,ebay提出的本地消息表是一種解決方案,消息生產(chǎn)方新增消息表,并記錄消息發(fā)送狀態(tài)。消息表和業(yè)務(wù)數(shù)據(jù)要在一個事務(wù)里提交,也就是說他們要在一個數(shù)據(jù)庫里面。然后消息會經(jīng)過MQ發(fā)送到消息的消費方。如果消息發(fā)送失敗,會進行重試發(fā)送。消息消費方,需要處理這個消息,并完成自己的業(yè)務(wù)邏輯。此時如果本地事務(wù)處理成功,表明已經(jīng)處理成功了,如果處理失敗,那么就會重試執(zhí)行。如果是業(yè)務(wù)上面的失敗,可以給生產(chǎn)方發(fā)送一個業(yè)務(wù)補償消息,通知生產(chǎn)方進行回滾等操作。
本地消息表的優(yōu)點是避免了分布式事務(wù),實現(xiàn)了最終一致性,缺點也明顯,消息表會耦合到業(yè)務(wù)系統(tǒng)中,如果沒有封裝好的解決方案,會有很多支撐邏輯要處理。
以上就是RocketMQ事務(wù)消息使用與原理詳解的詳細內(nèi)容,更多關(guān)于RocketMQ 事務(wù)消息的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
java?byte數(shù)組轉(zhuǎn)String的幾種常用方法
在Java中數(shù)組是一種非常常見的數(shù)據(jù)結(jié)構(gòu),它可以用來存儲多個相同類型的數(shù)據(jù),有時候,我們需要將數(shù)組轉(zhuǎn)換為字符串,以便于輸出或者傳遞給其他方法,這篇文章主要給大家介紹了關(guān)于java?byte數(shù)組轉(zhuǎn)String的幾種常用方法,需要的朋友可以參考下2024-09-09基于springboot i18n國際化后臺多種語言設(shè)置的方式
這篇文章主要介紹了基于springboot i18n國際化后臺多種語言設(shè)置的方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-06-06IntelliJ?IDEA?2022.1.1創(chuàng)建java項目的詳細方法步驟
最近安裝了IntelliJ IDEA 2022.1.1,發(fā)現(xiàn)新版本的窗口還有些變化的,所以下面這篇文章主要給大家介紹了關(guān)于IntelliJ?IDEA?2022.1.1創(chuàng)建java項目的詳細方法步驟,文中通過圖文介紹的非常詳細,需要的朋友可以參考下2022-07-07@RequestBody 部分屬性沒有轉(zhuǎn)化成功的處理
這篇文章主要介紹了@RequestBody 部分屬性沒有轉(zhuǎn)化成功的處理方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-10-10