欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ事務(wù)消息使用與原理詳解

 更新時間:2023年07月03日 09:39:31   作者:叔牙  
這篇文章主要為大家介紹了RocketMQ事務(wù)消息的實現(xiàn)原理,在分布式事務(wù)解決方案中,事務(wù)消息也是一個不錯的解決方案,本篇文章將圍繞RocketMQ的事務(wù)消息實現(xiàn)展開描述,需要的朋友可以參考下

一、背景&概述

最近在找工作,面試過程中被多次問到事務(wù)消息的實現(xiàn)原理,另外在分布式事務(wù)解決方案中,事務(wù)消息也是一個不錯的解決方案,本篇文章將圍繞RocketMQ的事務(wù)消息實現(xiàn)展開描述。

二、應(yīng)用場景

所謂事務(wù)消息,其實是為了解決上下游寫一致性,以及強依賴解耦,也即是完成當(dāng)前操作的同時給下游發(fā)送指令,并且保證上下游要么同時成功或者同時失敗,并且考慮上游的性能和RT問題做出的強調(diào)用解耦妥協(xié)。常見的應(yīng)用場景有:

1.訂單履約指令下發(fā)

用戶下單成功后,給履約系統(tǒng)發(fā)送指令進行履約操作,下單失敗不發(fā)送指令,采購缺貨或者其他履約異常,反向觸發(fā)訂單取消或者其他兜底操作。

2.用戶轉(zhuǎn)賬

用戶發(fā)起轉(zhuǎn)賬后,交易狀態(tài)短暫掛起,發(fā)送指令給銀行,如果發(fā)起失敗則不發(fā)送指令,發(fā)送成功后等待結(jié)果更新交易狀態(tài)。

3.訂單支付

支付發(fā)起后,當(dāng)筆訂單處于中間狀態(tài),給支付網(wǎng)關(guān)發(fā)起指令,如果發(fā)起失敗則不發(fā)送指令,發(fā)送成功后等待支付網(wǎng)關(guān)反饋更新支付狀態(tài)。

三、使用方式

1.事務(wù)消息監(jiān)聽器

@Component
@Slf4j
public class OrderTransactionalListener implements TransactionListener {
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        log.info("開始執(zhí)行本地事務(wù)....");
        LocalTransactionState state;
        try{
            String body = new String(message.getBody());
            OrderDTO order = JSONObject.parseObject(body, OrderDTO.class);
            orderService.createOrder(order,message.getTransactionId());
            state = LocalTransactionState.COMMIT_MESSAGE;
            log.info("本地事務(wù)已提交。{}",message.getTransactionId());
        }catch (Exception e){
            log.error("執(zhí)行本地事務(wù)失敗。{}",e);
            state = LocalTransactionState.ROLLBACK_MESSAGE;
        }
        return state;
    }
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        log.info("開始回查本地事務(wù)狀態(tài)。{}",messageExt.getTransactionId());
        LocalTransactionState state;
        String transactionId = messageExt.getTransactionId();
        if (transactionLogService.get(transactionId)>0){
            state = LocalTransactionState.COMMIT_MESSAGE;
        }else {
            state = LocalTransactionState.UNKNOW;
        }
        log.info("結(jié)束本地事務(wù)狀態(tài)查詢:{}",state);
        return state;
    }
}

2.編寫事務(wù)消息生產(chǎn)者

@Component
@Slf4j
public class TransactionalMsgProducer implements InitializingBean, DisposableBean {
    private String GROUP = "order_transactional";
    private TransactionMQProducer msgProducer;
    //用于執(zhí)行本地事務(wù)和事務(wù)狀態(tài)回查的監(jiān)聽器
    @Autowired
    private OrderTransactionalListener orderTransactionListener;
    //執(zhí)行任務(wù)的線程池
    private ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
            TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(50));
    private void start(){
        try {
            this.msgProducer.start();
        } catch (MQClientException e) {
            log.error("msg producer starter occur error;",e);
        }
    }
    private void shutdown() {
        if(null != msgProducer) {
            try {
                msgProducer.shutdown();
            } catch (Exception e) {
                log.error("producer shutdown occur error;",e);
            }
        }
    }
    public TransactionSendResult send(String data, String topic) throws MQClientException {
        Message message = new Message(topic,data.getBytes());
        return this.msgProducer.sendMessageInTransaction(message, null);
    }
    @Override
    public void afterPropertiesSet() throws Exception {
        msgProducer = new TransactionMQProducer(GROUP);
        msgProducer.setNamesrvAddr("namesrvHost:ip");
        msgProducer.setSendMsgTimeout(Integer.MAX_VALUE);
        msgProducer.setExecutorService(executor);
        msgProducer.setTransactionListener(orderTransactionListener);
        this.start();
    }
    @Override
    public void destroy() throws Exception {
        this.shutdown();
    }
}

3.業(yè)務(wù)實現(xiàn)

@Service
@Slf4j
public class OrderService {
    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    private  TransactionLogMapper transactionLogMapper;
    @Autowired
    private TransactionalMsgProducer producer;
    //執(zhí)行本地事務(wù)時調(diào)用,將訂單數(shù)據(jù)和事務(wù)日志寫入本地數(shù)據(jù)庫
    @Transactional
    @Override
    public void createOrder(OrderDTO orderDTO,String transactionId){
        //1.創(chuàng)建訂單
        Order order = new Order();
        BeanUtils.copyProperties(orderDTO,order);
        orderMapper.createOrder(order);
        //2.寫入事務(wù)日志
        TransactionLog log = new TransactionLog();
        log.setId(transactionId);
        log.setBusiness("order");
        log.setForeignKey(String.valueOf(order.getId()));
        transactionLogMapper.insert(log);
        log.info("create order success,order={}",orderDTO);
    }
    //前端調(diào)用,只用于向RocketMQ發(fā)送事務(wù)消息
    @Override
    public void createOrder(OrderDTO order) throws MQClientException {
        order.setId(snowflake.nextId());
        order.setOrderNo(snowflake.nextIdStr());
        producer.send(JSON.toJSONString(order),"order");
    }
}

4.入口調(diào)用

@RestController
@Slf4j
public class OrderController {
    @Autowired
    private OrderService orderService;
    @PostMapping("/create_order")
    public void createOrder(@RequestBody OrderDTO order) {
        log.info("receive order data,order={}",order.getCommodityCode());
        orderService.createOrder(order);
    }
}

這樣我們就實現(xiàn)了rocketmq事務(wù)消息的使用。

四、原理介紹

1.概念模型

  • 半消息(half message):半消息是一種特殊的消息類型,該狀態(tài)的消息暫時不能被Consumer消費(消費端不可見)。當(dāng)一條事務(wù)消息被成功投遞到Broker上,但是Broker并沒有接收到Producer發(fā)出的二次確認時,該事務(wù)消息就處于"暫時不可被消費"狀態(tài),該狀態(tài)的事務(wù)消息被稱為半消息。
  • 消息狀態(tài)回查(Message status check):由于網(wǎng)絡(luò)抖動閃斷、Producer重啟等原因,可能導(dǎo)致Producer向Broker發(fā)送的二次確認消息沒有成功送達。如果Broker檢測到某條事務(wù)消息長時間處于半消息狀態(tài),則會主動向Producer端發(fā)起回查操作,查詢該事務(wù)消息在Producer端的事務(wù)狀態(tài)(Commit 或 Rollback)。可以看出,Message Status Check主要用來解決分布式事務(wù)中的超時問題。

2.執(zhí)行流程

1):Producer向Broker端發(fā)送Half Message;
2):Broker ACK,Half Message發(fā)送成功;
3):Producer執(zhí)行本地事務(wù);
4):本地事務(wù)完畢,根據(jù)事務(wù)的狀態(tài),Producer向Broker發(fā)送二次確認消息,確認該Half Message的Commit或者Rollback狀態(tài)。Broker收到二次確認消息后,對于Commit狀態(tài),則直接發(fā)送到Consumer端執(zhí)行消費邏輯,而對于Rollback則直接標(biāo)記為失敗,一段時間后清除,并不會發(fā)給Consumer。正常情況下,到此分布式事務(wù)已經(jīng)完成,剩下要處理的就是超時問題,即一段時間后Broker仍沒有收到Producer的二次確認消息;
5):針對超時狀態(tài),Broker主動向Producer發(fā)起消息回查;
6):Producer處理回查消息,返回對應(yīng)的本地事務(wù)的執(zhí)行結(jié)果;
7):Broker針對回查消息的結(jié)果,執(zhí)行Commit或Rollback操作,同4。

3.事務(wù)消息設(shè)計

在RocketMQ事務(wù)消息的主要流程中,一階段的消息如何對用戶不可見。其中事務(wù)消息相對普通消息最大的特點就是一階段發(fā)送的消息對用戶是不可見的。那么如何做到寫入消息但是對用戶不可見呢?RocketMQ事務(wù)消息的做法是:如果消息是half消息,將備份原消息的主題與消息消費隊列,然后改變主題為RMQ_SYS_TRANS_HALF_TOPIC。由于消費組未訂閱該主題,故消費端無法消費half類型的消息,然后RocketMQ會開啟一個定時任務(wù),從Topic為RMQ_SYS_TRANS_HALF_TOPIC中拉取消息進行消費,根據(jù)生產(chǎn)者組獲取一個服務(wù)提供者發(fā)送回查事務(wù)狀態(tài)請求,根據(jù)事務(wù)狀態(tài)來決定是提交或回滾消息。
在RocketMQ中,消息在服務(wù)端的存儲結(jié)構(gòu)如下,每條消息都會有對應(yīng)的索引信息,Consumer通過ConsumeQueue這個二級索引來讀取消息實體內(nèi)容,其流程如下:

RocketMQ的具體實現(xiàn)策略是:寫入的如果事務(wù)消息,對消息的Topic和Queue等屬性進行替換,同時將原來的Topic和Queue信息存儲到消息的屬性中,正因為消息主題被替換,故消息并不會轉(zhuǎn)發(fā)到該原主題的消息消費隊列,消費者無法感知消息的存在,不會消費。
在完成一階段寫入一條對用戶不可見的消息后,二階段如果是Commit操作,則需要讓消息對用戶可見;如果是Rollback則需要撤銷一階段的消息。先說Rollback的情況。對于Rollback,本身一階段的消息對用戶是不可見的,其實不需要真正撤銷消息(實際上RocketMQ也無法去真正的刪除一條消息,因為是順序?qū)懳募模?。但是區(qū)別于這條消息沒有確定狀態(tài)(Pending狀態(tài),事務(wù)懸而未決),需要一個操作來標(biāo)識這條消息的最終狀態(tài)。RocketMQ事務(wù)消息方案中引入了Op消息的概念,用Op消息標(biāo)識事務(wù)消息已經(jīng)確定的狀態(tài)(Commit或者Rollback)。如果一條事務(wù)消息沒有對應(yīng)的Op消息,說明這個事務(wù)的狀態(tài)還無法確定(可能是二階段失敗了)。引入Op消息后,事務(wù)消息無論是Commit或者Rollback都會記錄一個Op操作。Commit相對于Rollback只是在寫入Op消息前創(chuàng)建Half消息的索引。
一階段的Half消息由于是寫到一個特殊的Topic,所以二階段構(gòu)建索引時需要讀取出Half消息,并將Topic和Queue替換成真正的目標(biāo)的Topic和Queue,之后通過一次普通消息的寫入操作來生成一條對用戶可見的消息。所以RocketMQ事務(wù)消息二階段其實是利用了一階段存儲的消息的內(nèi)容,在二階段時恢復(fù)出一條完整的普通消息。
如果在RocketMQ事務(wù)消息的二階段過程中失敗了,例如在做Commit操作時,出現(xiàn)網(wǎng)絡(luò)問題導(dǎo)致Commit失敗,那么需要通過一定的策略使這條消息最終被Commit。RocketMQ采用了一種補償機制,稱為“回查”。Broker端對未確定狀態(tài)的消息發(fā)起回查,將消息發(fā)送到對應(yīng)的Producer端(同一個Group的Producer),由Producer根據(jù)消息來檢查本地事務(wù)的狀態(tài),進而執(zhí)行Commit或者Rollback。Broker端通過對比Half消息和Op消息進行事務(wù)消息的回查并且推進CheckPoint(記錄那些事務(wù)消息的狀態(tài)是確定的)。
需要注意的是,rocketmq并不會無休止的的信息事務(wù)狀態(tài)回查,默認回查15次,如果15次回查還是無法得知事務(wù)狀態(tài),rocketmq默認回滾該消息。

五、源碼分析

1.客服端發(fā)送事務(wù)消息

RocketMQ事務(wù)消息由TransactionMQProducer實現(xiàn),繼承DefaultMQProducer實現(xiàn)了發(fā)送事務(wù)消息的能力。

發(fā)送事務(wù)消息會調(diào)用TransactionMQProducer的sendMessageInTransaction方法:

public TransactionSendResult sendMessageInTransaction(final Message msg,
    final Object arg) throws MQClientException {
    if (null == this.transactionListener) {
        throw new MQClientException("TransactionListener is null", null);
    }
    msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
    return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}

檢查有沒有配置事務(wù)監(jiān)聽器,監(jiān)聽器提供了兩個方法:

  • executeLocalTransaction:執(zhí)行本地事務(wù)
  • checkLocalTransaction:回查本地事務(wù)

然后調(diào)用DefaultMQProducerImpl執(zhí)行發(fā)送:

public TransactionSendResult sendMessageInTransaction(final Message msg,
                                                      final LocalTransactionExecuter localTransactionExecuter, final Object arg)
    throws MQClientException {
    //...省略
    SendResult sendResult = null;
    //msg設(shè)置參數(shù)TRAN_MSG,表示為事務(wù)消息
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
    try {
        //發(fā)送消息
        sendResult = this.send(msg);
    } catch (Exception e) {
        throw new MQClientException("send message Exception", e);
    }
    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
    Throwable localException = null;
    switch (sendResult.getSendStatus()) {
        case SEND_OK: {
            try {
                if (sendResult.getTransactionId() != null) {
                    msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                }
                String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                if (null != transactionId && !"".equals(transactionId)) {
                    msg.setTransactionId(transactionId);
                }
                //通過LocalTransactionExecutor執(zhí)行,已經(jīng)廢棄
                if (null != localTransactionExecuter) {
                    localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                } else if (transactionListener != null) {
                    //消息發(fā)送成功,執(zhí)行本地事務(wù)
                    localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                }
                if (null == localTransactionState) {
                    localTransactionState = LocalTransactionState.UNKNOW;
                }
            } catch (Throwable e) {
                localException = e;
            }
        }
        break;
        case FLUSH_DISK_TIMEOUT:
        case FLUSH_SLAVE_TIMEOUT:
        case SLAVE_NOT_AVAILABLE:
            localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
            break;
        default:
            break;
    }
    try {
        //執(zhí)行endTransaction方法,如果半消息發(fā)送失敗或本地事務(wù)執(zhí)行失敗告訴服務(wù)端是刪除半消息,半消息發(fā)送成功且本地事務(wù)執(zhí)行成功則告訴服務(wù)端生效半消息
        this.endTransaction(sendResult, localTransactionState, localException);
    } catch (Exception e) {
        log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
    }
    //省略...
    return transactionSendResult;
}

該方法做了以下幾件事情:

  • 給消息打上事務(wù)屬性,用于broker區(qū)分普通消息和事務(wù)消息
  • 發(fā)送半消息(half message)
  • 發(fā)送成功則由transactionListener執(zhí)行本地事務(wù)
  • 執(zhí)行endTransaction方法,通知broker 執(zhí)行 commit/rollback

發(fā)送消息會正常調(diào)用DefaultMQProducerImpl的發(fā)送消息邏輯,執(zhí)行本地事務(wù)通過transactionListener調(diào)用本地的事務(wù)邏輯,我們看一下結(jié)束事務(wù)endTransaction方法實現(xiàn):

public void endTransaction(
    final SendResult sendResult,
    final LocalTransactionState localTransactionState,
    final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
    final MessageId id;
    if (sendResult.getOffsetMsgId() != null) {
        id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
    } else {
        id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
    }
    String transactionId = sendResult.getTransactionId();
    final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
    EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
    requestHeader.setTransactionId(transactionId);
    requestHeader.setCommitLogOffset(id.getOffset());
    switch (localTransactionState) {
        case COMMIT_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
            break;
        case ROLLBACK_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
            break;
        case UNKNOW:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
            break;
        default:
            break;
    }
    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
    requestHeader.setMsgId(sendResult.getMsgId());
    String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
    this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
        this.defaultMQProducer.getSendMsgTimeout());
}

本地事務(wù)執(zhí)行后,則調(diào)用this.endTransaction()方法,根據(jù)本地事務(wù)執(zhí)行狀態(tài),去提交事務(wù)或者回滾事務(wù)。
如果半消息發(fā)送失敗或本地事務(wù)執(zhí)行失敗告訴服務(wù)端是刪除半消息,半消息發(fā)送成功且本地事務(wù)執(zhí)行成功則告訴服務(wù)端生效半消息。

2.Broker處理事務(wù)消息

RocketMQ服務(wù)端有個NettyRequestProcessor接口,類似于spring的BeanPostProcessor,broker啟動的時候會把對應(yīng)的實現(xiàn)注冊到NettyRemotingServer的本地緩存processorTable中,在收到producer發(fā)送的消息會調(diào)用NettyServerHandler的channelRead0方法,然后會調(diào)用對應(yīng)的NettyRequestProcessor實現(xiàn)處理接收到的消息請求。看一下SendMessageProcessor實現(xiàn):

public RemotingCommand processRequest(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    SendMessageContext traceContext;
    switch (request.getCode()) {
        case RequestCode.CONSUMER_SEND_MSG_BACK:
            return this.consumerSendMsgBack(ctx, request);
        default:
            SendMessageRequestHeader requestHeader = parseRequestHeader(request);
            if (requestHeader == null) {
                return null;
            }
            TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, true);
            RemotingCommand rewriteResult = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext);
            if (rewriteResult != null) {
                return rewriteResult;
            }
            traceContext = buildMsgContext(ctx, requestHeader);
            String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
            traceContext.setCommercialOwner(owner);
            try {
                this.executeSendMessageHookBefore(ctx, request, traceContext);
            } catch (AbortProcessException e) {
                final RemotingCommand errorResponse = RemotingCommand.createResponseCommand(e.getResponseCode(), e.getErrorMessage());
                errorResponse.setOpaque(request.getOpaque());
                return errorResponse;
            }
            RemotingCommand response;
            if (requestHeader.isBatch()) {
                response = this.sendBatchMessage(ctx, request, traceContext, requestHeader, mappingContext,
                    (ctx1, response1) -> executeSendMessageHookAfter(response1, ctx1));
            } else {
                response = this.sendMessage(ctx, request, traceContext, requestHeader, mappingContext,
                    (ctx12, response12) -> executeSendMessageHookAfter(response12, ctx12));
            }
            return response;
    }
}

會調(diào)用到SendMessageProcessor.sendMessage(),判斷消息類型,進行半消息存儲:

public RemotingCommand sendMessage(final ChannelHandlerContext ctx,
    final RemotingCommand request,
    final SendMessageContext sendMessageContext,
    final SendMessageRequestHeader requestHeader,
    final TopicQueueMappingContext mappingContext,
    final SendMessageCallback sendMessageCallback) throws RemotingCommandException {
    //...省略
    String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    boolean sendTransactionPrepareMessage = false;
    if (Boolean.parseBoolean(traFlag)
        && !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1
        if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark(
                "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                    + "] sending transaction message is forbidden");
            return response;
        }
        sendTransactionPrepareMessage = true;
    }
    long beginTimeMillis = this.brokerController.getMessageStore().now();
    if (brokerController.getBrokerConfig().isAsyncSendEnable()) {
    	//...異步發(fā)送
        return null;
    } else {
        PutMessageResult putMessageResult = null;
        if (sendTransactionPrepareMessage) {
        	//存儲事務(wù)消息
            putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
        } else {
        	//存儲普通消息
            putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
        }
        return response;
    }
}

繼續(xù)看事務(wù)半消息存儲實現(xiàn)prepareMessage:

public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) {
    return transactionalMessageBridge.putHalfMessage(messageInner);
}
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
        String.valueOf(msgInner.getQueueId()));
    msgInner.setSysFlag(
        MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
    msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
    msgInner.setQueueId(0);
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    return msgInner;
}

備份消息的原主題名稱與原隊列ID,然后取消事務(wù)消息的消息標(biāo)簽,重新設(shè)置消息的主題為:RMQ_SYS_TRANS_HALF_TOPIC,隊列ID固定為0。與其他普通消息區(qū)分開,然后完成消息持久化。
到這里,broker就初步處理完了 Producer 發(fā)送的事務(wù)半消息。
當(dāng)客戶端TransactionMQProducer執(zhí)行endTransaction動作時,觸發(fā)broker事務(wù)消息的二階段提交,broker會執(zhí)行EndTransactionProcessor的processRequest方法:

public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
    RemotingCommandException {
    //...省略
    OperationResult result = new OperationResult();
    if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
        result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
        if (result.getResponseCode() == ResponseCode.SUCCESS) {
            RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
            if (res.getCode() == ResponseCode.SUCCESS) {
                MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
                msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
                msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
                MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
                RemotingCommand sendResult = sendFinalMessage(msgInner);
                if (sendResult.getCode() == ResponseCode.SUCCESS) {
                    this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                }
                return sendResult;
            }
            return res;
        }
    } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
        result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
        if (result.getResponseCode() == ResponseCode.SUCCESS) {
            RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
            if (res.getCode() == ResponseCode.SUCCESS) {
                this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
            }
            return res;
        }
    }
    response.setCode(result.getResponseCode());
    response.setRemark(result.getResponseRemark());
    return response;
}

邏輯很清晰,其核心實現(xiàn)如下:

  • 根據(jù)commitlogOffset找到消息
  • 如果是提交動作,就恢復(fù)原消息的主題與隊列,再次存入commitlog文件進而轉(zhuǎn)到消息消費隊列,供消費者消費,然后將原預(yù)處理消息存入一個新的主題RMQ_SYS_TRANS_OP_HALF_TOPIC,代表該消息已被處理
  • 回滾消息,則直接將原預(yù)處理消息存入一個新的主題RMQ_SYS_TRANS_OP_HALF_TOPIC,代表該消息已被處理

還有一種情況,如果本地事務(wù)執(zhí)行結(jié)果是UNKNOW或者由于網(wǎng)絡(luò)問題沒有提交,那么存儲的broker的事務(wù)消息處于漂浮狀態(tài),無法主動轉(zhuǎn)換成可消費或者刪除狀態(tài),那么就需要broker有一種兜底機制來處理這種場景,當(dāng)然RocketMQ提供了一種補償機制,定時回查此類消息,由TransactionalMessageCheckService實現(xiàn):

@Override
public void run() {
    log.info("Start transaction check service thread!");
    while (!this.isStopped()) {
        long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
        this.waitForRunning(checkInterval);
    }
    log.info("End transaction check service thread!");
}
@Override
protected void onWaitEnd() {
    long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
    int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
    long begin = System.currentTimeMillis();
    log.info("Begin to check prepare message, begin time:{}", begin);
    this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
    log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}

整體流程如下圖:

六、總結(jié)與思考

異常情況覆蓋

  • 客戶端producer發(fā)送半消息失敗

可能由于網(wǎng)絡(luò)或者mq故障,導(dǎo)致 Producer 發(fā)送半消息(prepare)失敗??蛻舳朔?wù)可以執(zhí)行回滾操作,比如“訂單關(guān)閉”等。

  • 半消息發(fā)送成功,本地事務(wù)執(zhí)行失敗

如果producer發(fā)送的半消息成功了,但是執(zhí)行本地事務(wù)失敗了,如更新訂單狀態(tài)為“已完成”。這種情況下,執(zhí)行本地事務(wù)失敗后,會返回rollback給 MQ,MQ會刪除之前發(fā)送的半消息。不會下發(fā)指令給下游依賴。

  • 半消息投遞成功,沒收到MQ返回的ack

如果客戶端發(fā)送半消息成功后,沒有收到MQ返回的響應(yīng)??赡苁且驗榫W(wǎng)絡(luò)問題,或者其他未知異常,客戶端以為發(fā)送MQ半消息失敗,執(zhí)行了逆向回滾流程。這個時候其實mq已經(jīng)保存半消息成功了,那這個消息怎么處理?
這個時候broker的補償邏輯上場,消息回查定時任務(wù)TransactionalMessageCheckService會每隔1分鐘掃描一次半消息隊列,判斷是否需要消息回查,然后回查訂單系統(tǒng)的本地事務(wù),這時MQ就會發(fā)現(xiàn)訂單已經(jīng)變成“已關(guān)閉”,此時就要發(fā)送rollback請求給mq,刪除之前的半消息。

  • commit/rollback失敗

這個也是通過定時任務(wù)TransactionalMessageCheckService來做補償,它發(fā)現(xiàn)這個消息超過一定時間還沒有進行二階段處理,就會回查本地事務(wù)。

缺點和替代方案

事務(wù)消息很好了解決了分布式事務(wù)場景的業(yè)務(wù)解耦,但是也存在一些問題,比如引入新的組件依賴,并且事務(wù)消息是強依賴,那么還有沒有其他比較可行的替代方案,ebay提出的本地消息表是一種解決方案,消息生產(chǎn)方新增消息表,并記錄消息發(fā)送狀態(tài)。消息表和業(yè)務(wù)數(shù)據(jù)要在一個事務(wù)里提交,也就是說他們要在一個數(shù)據(jù)庫里面。然后消息會經(jīng)過MQ發(fā)送到消息的消費方。如果消息發(fā)送失敗,會進行重試發(fā)送。消息消費方,需要處理這個消息,并完成自己的業(yè)務(wù)邏輯。此時如果本地事務(wù)處理成功,表明已經(jīng)處理成功了,如果處理失敗,那么就會重試執(zhí)行。如果是業(yè)務(wù)上面的失敗,可以給生產(chǎn)方發(fā)送一個業(yè)務(wù)補償消息,通知生產(chǎn)方進行回滾等操作。
本地消息表的優(yōu)點是避免了分布式事務(wù),實現(xiàn)了最終一致性,缺點也明顯,消息表會耦合到業(yè)務(wù)系統(tǒng)中,如果沒有封裝好的解決方案,會有很多支撐邏輯要處理。

以上就是RocketMQ事務(wù)消息使用與原理詳解的詳細內(nèi)容,更多關(guān)于RocketMQ 事務(wù)消息的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • java?byte數(shù)組轉(zhuǎn)String的幾種常用方法

    java?byte數(shù)組轉(zhuǎn)String的幾種常用方法

    在Java中數(shù)組是一種非常常見的數(shù)據(jù)結(jié)構(gòu),它可以用來存儲多個相同類型的數(shù)據(jù),有時候,我們需要將數(shù)組轉(zhuǎn)換為字符串,以便于輸出或者傳遞給其他方法,這篇文章主要給大家介紹了關(guān)于java?byte數(shù)組轉(zhuǎn)String的幾種常用方法,需要的朋友可以參考下
    2024-09-09
  • 基于springboot i18n國際化后臺多種語言設(shè)置的方式

    基于springboot i18n國際化后臺多種語言設(shè)置的方式

    這篇文章主要介紹了基于springboot i18n國際化后臺多種語言設(shè)置的方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-06-06
  • hadoop之MapReduce框架原理

    hadoop之MapReduce框架原理

    這篇文章主要介紹了hadoop的MapReduce框架原理,MapReduce是分為兩個階段的,MapperTask階段,和ReduceTask階段。如果有感興趣的小伙伴可以借鑒參考
    2023-03-03
  • IntelliJ?IDEA?2022.1.1創(chuàng)建java項目的詳細方法步驟

    IntelliJ?IDEA?2022.1.1創(chuàng)建java項目的詳細方法步驟

    最近安裝了IntelliJ IDEA 2022.1.1,發(fā)現(xiàn)新版本的窗口還有些變化的,所以下面這篇文章主要給大家介紹了關(guān)于IntelliJ?IDEA?2022.1.1創(chuàng)建java項目的詳細方法步驟,文中通過圖文介紹的非常詳細,需要的朋友可以參考下
    2022-07-07
  • SpringBoot log打印及輸出方式

    SpringBoot log打印及輸出方式

    這篇文章主要介紹了SpringBoot log打印及輸出方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-11-11
  • SpringBoot整合Redis管道的示例代碼

    SpringBoot整合Redis管道的示例代碼

    本文將結(jié)合實例代碼,介紹SpringBoot整合Redis管道,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-07-07
  • @RequestBody 部分屬性沒有轉(zhuǎn)化成功的處理

    @RequestBody 部分屬性沒有轉(zhuǎn)化成功的處理

    這篇文章主要介紹了@RequestBody 部分屬性沒有轉(zhuǎn)化成功的處理方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-10-10
  • SpringBoot啟動自動終止也不報錯的原因及解決

    SpringBoot啟動自動終止也不報錯的原因及解決

    這篇文章主要介紹了SpringBoot啟動自動終止也不報錯的原因及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-09-09
  • IDEA maven依賴錯誤中包下面紅色波浪線

    IDEA maven依賴錯誤中包下面紅色波浪線

    這篇文章主要介紹了IDEA maven依賴錯誤中包下面紅色波浪線,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-08-08
  • java 創(chuàng)建線程的方法總結(jié)

    java 創(chuàng)建線程的方法總結(jié)

    這篇文章主要介紹了java 創(chuàng)建線程的方法總結(jié)的相關(guān)資料,需要的朋友可以參考下
    2017-03-03

最新評論