欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringCloud微服務(wù)開發(fā)基于RocketMQ實(shí)現(xiàn)分布式事務(wù)管理詳解

 更新時間:2022年09月19日 14:54:48   作者:π大星的日常  
分布式事務(wù)是在微服務(wù)開發(fā)中經(jīng)常會遇到的一個問題,之前的文章中我們已經(jīng)實(shí)現(xiàn)了利用Seata來實(shí)現(xiàn)強(qiáng)一致性事務(wù),其實(shí)還有一種廣為人知的方案就是利用消息隊(duì)列來實(shí)現(xiàn)分布式事務(wù),保證數(shù)據(jù)的最終一致性,也就是我們常說的柔性事務(wù)

消息隊(duì)列實(shí)現(xiàn)分布式事務(wù)原理

首先讓我們來看一下基于消息隊(duì)列實(shí)現(xiàn)分布式事務(wù)的原理方案。

柔性事務(wù)

發(fā)送消息的服務(wù)有個OUTBOX數(shù)據(jù)表,在進(jìn)行INSERT、UPDATE、DELETE 業(yè)務(wù)操作時也會給OUTBOX數(shù)據(jù)表INSERT一條消息記錄,這樣可以保證原子性,因?yàn)檫@是基于本地的ACID事務(wù)。

OUTBOX表充當(dāng)臨時消息隊(duì)列,然后我們在引入一個消息中繼(MessageRelay)的服務(wù),由他從OUTBOX表中讀取數(shù)據(jù)并發(fā)布消息到消息組件。

消息中繼的實(shí)現(xiàn)可以很簡單,只需要通過定時任務(wù)定期從OUTBOX表中拉取最新未發(fā)布的數(shù)據(jù),獲取到數(shù)據(jù)后將數(shù)據(jù)發(fā)送給消息組件,最后將完成發(fā)送的消息從OUTBOX表中刪除即可,對于失敗的消息可以根據(jù)業(yè)務(wù)規(guī)則進(jìn)行重試。

RocketMQ的事務(wù)消息

RocketMQ本身已經(jīng)支持事務(wù)消息,如果你們項(xiàng)目使用了RocketMQ,可以直接借助RocketMQ的事務(wù)消息實(shí)現(xiàn)分布式事務(wù),我們先看一下RocketMQ事務(wù)消息的原理然后再借助RocketMQ來實(shí)現(xiàn)分布式事務(wù)。

RocketMQ采用了2PC的思想來實(shí)現(xiàn)了提交事務(wù)消息,同時增加一個補(bǔ)償邏輯來處理二階段超時或者失敗的消息,如下圖所示。

分布式事務(wù)

RocketMQ實(shí)現(xiàn)事務(wù)消息主要分為兩個階段:正常事務(wù)的發(fā)送及提交、事務(wù)信息的補(bǔ)償流程

整體流程為:

正常事務(wù)發(fā)送與提交階段

1、生產(chǎn)者發(fā)送一個半消息給MQServer(半消息是指消費(fèi)者暫時不能消費(fèi)的消息)

2、服務(wù)端響應(yīng)消息寫入結(jié)果,半消息發(fā)送成功

3、開始執(zhí)行本地事務(wù)

4、根據(jù)本地事務(wù)的執(zhí)行狀態(tài)執(zhí)行Commit或者Rollback操作

事務(wù)信息的補(bǔ)償流程

1、如果MQServer長時間沒收到本地事務(wù)的執(zhí)行狀態(tài)會向生產(chǎn)者發(fā)起一個確認(rèn)回查的操作請求

2、生產(chǎn)者收到確認(rèn)回查請求后,檢查本地事務(wù)的執(zhí)行狀態(tài)

3、根據(jù)檢查后的結(jié)果執(zhí)行Commit或者Rollback操作

補(bǔ)償階段主要是用于解決生產(chǎn)者在發(fā)送Commit或者Rollback操作時發(fā)生超時或失敗的情況。

RocketMQ事務(wù)流程關(guān)鍵

事務(wù)消息在一階段對用戶不可見

事務(wù)消息相對普通消息最大的特點(diǎn)就是一階段發(fā)送的消息對用戶是不可見的,也就是說消費(fèi)者不能直接消費(fèi)。這里RocketMQ的實(shí)現(xiàn)方法是原消息的主題與消息消費(fèi)隊(duì)列,然后把主題改成RMQ_SYS_TRANS_HALF_TOPIC,這樣由于消費(fèi)者沒有訂閱這個主題,所以不會被消費(fèi)。

如何處理第二階段的失敗消息?

在本地事務(wù)執(zhí)行完成后會向MQServer發(fā)送Commit或Rollback操作,此時如果在發(fā)送消息的時候生產(chǎn)者出故障了,那么要保證這條消息最終被消費(fèi),MQServer會像服務(wù)端發(fā)送回查請求,確認(rèn)本地事務(wù)的執(zhí)行狀態(tài)。

當(dāng)然了rocketmq并不會無休止的的信息事務(wù)狀態(tài)回查,默認(rèn)回查15次,如果15次回查還是無法得知事務(wù)狀態(tài),RocketMQ默認(rèn)回滾該消息。

消息狀態(tài) 事務(wù)消息有三種狀態(tài):TransactionStatus.CommitTransaction:提交事務(wù)消息,消費(fèi)者可以消費(fèi)此消息

TransactionStatus.RollbackTransaction:回滾事務(wù),它代表該消息將被刪除,不允許被消費(fèi)。

TransactionStatus.Unknown:中間狀態(tài),它代表需要檢查消息隊(duì)列來確定狀態(tài)。

代碼實(shí)現(xiàn)

業(yè)務(wù)需求:用戶請求訂單微服務(wù)order-service接口刪除訂單(退貨),刪除訂單時需要調(diào)用account-service的方法給賬戶增加余額,一個典型的分布式事務(wù)問題。

基礎(chǔ)配置

在Order-Service和Account-Service中引入Rocket消息組件

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>

在配置中心添加RocketMQ的相關(guān)配置

rocketmq:
  name-server: xxx.xx.x.xx:9876
  producer:
    group: cloud-group

在OrderService服務(wù)中建立一張事務(wù)日志表rocketmq_transaction_log(作用稍后說)

發(fā)送半消息

Order-Service作為分布式事務(wù)開始的入口,在Service層我們給RocketMQ發(fā)送一條半消息

OrderController入口

/**
 * 根據(jù)訂單號刪除訂單
 * @param orderNo 訂單編號
 */
@PostMapping("/order/delete")
public ResultData<String> delete(@RequestParam String orderNo){
 log.info("delete order id is {}",orderNo);
 orderService.delete(orderNo);
 return ResultData.success("訂單刪除成功");
}

直接調(diào)用orderService的delete方法

OrderServiceImpl業(yè)務(wù)邏輯

@Override
public void delete(String orderNo) {
 Order order = orderMapper.selectByNo(orderNo);
 //如果訂單存在且狀態(tài)為有效,進(jìn)行業(yè)務(wù)處理
 if (order != null && CloudConstant.VALID_STATUS.equals(order.getStatus())) {
  String transactionId = UUID.randomUUID().toString();
  //如果可以刪除訂單則發(fā)送消息給rocketmq,讓用戶中心消費(fèi)消息
  rocketMQTemplate.sendMessageInTransaction("add-amount",
    MessageBuilder.withPayload(
      UserAddMoneyDTO.builder()
        .userCode(order.getAccountCode())
        .amount(order.getAmount())
        .build()
    )
    .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
    .setHeader("order_id",order.getId())
    .build()
    ,order
  );
 
 }
}

首先校驗(yàn)一下訂單狀態(tài),然后使用rocketMQTemplate.sendMessageInTransaction()發(fā)送事務(wù)消息。

sendMessageInTransaction方法有三個參數(shù):

  • destination:目的地(主題),這里發(fā)送給add-amount這個topic
  • message:發(fā)送給消費(fèi)者的消息體,需要使用MessageBuilder.withPayload()來構(gòu)建消息
  • arg:參數(shù)

注意,這里我們生成了一個transactionId,并放在header中跟消息一起發(fā)送(這里實(shí)際也可以構(gòu)造成一個對象,放在arg里進(jìn)行發(fā)送),作用后面再講!

消息封裝實(shí)體UserAddMoneyDTO

@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class UserAddMoneyDTO {
    /**
     * 用戶編碼
     */
    private String userCode;
    /**
     * 金額
     */
    private BigDecimal amount;
}

這個類生產(chǎn)者和消費(fèi)者都需要用到,所以我直接丟到common包中,大家根據(jù)項(xiàng)目實(shí)際情況決定放哪。

執(zhí)行本地事務(wù)與回查

MQServer收到半消息后會告訴生產(chǎn)者order-service確認(rèn)收到半消息,這時候order-service需要執(zhí)行本地事務(wù),執(zhí)行完本地事務(wù)后再告訴MQServer本地事務(wù)的執(zhí)行狀態(tài),確認(rèn)此消息究竟是Commit還是Rollback。

RocketMQ提供了RocketMQLocalTransactionListener接口,本地事務(wù)監(jiān)聽器,這個接口類的實(shí)現(xiàn)如下:

第一個方法executeLocalTransaction為執(zhí)行本地事務(wù);第二個方法checkLocalTransaction為檢查本地事務(wù)的執(zhí)行狀態(tài),也就是回查動作。

我們需要實(shí)現(xiàn)RocketMQLocalTransactionListener接口,在executeLocalTransaction方法中執(zhí)行本地事務(wù),在執(zhí)行checkLocalTransaction回查方法時告訴RocketMQ到底該提交還是回滾。

這里大家思考一個問題,本地事務(wù)已經(jīng)執(zhí)行完成了,怎么去回查本地事務(wù)的執(zhí)行結(jié)果呢?

答案如下:我們可以在執(zhí)行本地事務(wù)的時候同時生成一條事務(wù)日志,讓本地事務(wù)與日志事務(wù)在同一個方法中,同時添加@Transactional注解,保證兩個操作事務(wù)是一個原子操作。

這樣如果事務(wù)日志表中有這個本地事務(wù)的信息,那就代表本地事務(wù)執(zhí)行成功,需要Commit,相反如果沒有對應(yīng)的事務(wù)日志,則表示執(zhí)行失敗,需要Rollback。這就是為什么我們上面在OrderService中需要建立一張事務(wù)日志表的原因。

實(shí)現(xiàn)RocketMQLocalTransactionListener接口,完成事務(wù)執(zhí)行邏輯

/**
 * 監(jiān)聽事務(wù)消息
 * @author javadaily
 */
@Slf4j
@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddUserAmountListener implements RocketMQLocalTransactionListener {
    private final OrderService orderService;
    private final RocketMqTransactionLogMapper rocketMqTransactionLogMapper;
    /**
     * 執(zhí)行本地事務(wù)
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
        log.info("執(zhí)行本地事務(wù)");
        MessageHeaders headers = message.getHeaders();
        //獲取事務(wù)ID
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        Integer orderId = Integer.valueOf((String)headers.get("order_id"));
        log.info("transactionId is {}, orderId is {}",transactionId,orderId);
        try{
            //執(zhí)行本地事務(wù),并記錄日志
            orderService.changeStatuswithRocketMqLog(orderId, CloudConstant.INVALID_STATUS,transactionId);
            //執(zhí)行成功,可以提交事務(wù)
            return RocketMQLocalTransactionState.COMMIT;
        }catch (Exception e){
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    /**
     * 本地事務(wù)的檢查,檢查本地事務(wù)是否成功
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        MessageHeaders headers = message.getHeaders();
        //獲取事務(wù)ID
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        log.info("檢查本地事務(wù),事務(wù)ID:{}",transactionId);
        //根據(jù)事務(wù)id從日志表檢索
        QueryWrapper<RocketmqTransactionLog> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("transaction_id",transactionId);
        RocketmqTransactionLog rocketmqTransactionLog = rocketMqTransactionLogMapper.selectOne(queryWrapper);
        if(null != rocketmqTransactionLog){
            return RocketMQLocalTransactionState.COMMIT;
        }
        return RocketMQLocalTransactionState.ROLLBACK;
    }
}

本地事務(wù)執(zhí)行邏輯

@Transactional(rollbackFor = RuntimeException.class)
@Override
public void changeStatuswithRocketMqLog(Integer id,String status,String transactionId){
    orderMapper.changeStatus(id,status);
    rocketMqTransactionLogMapper.insert(
        RocketmqTransactionLog.builder()
        .transactionId(transactionId)
        .log("執(zhí)行刪除訂單操作")
        .build()
    );
}

修改訂單狀態(tài)為刪除狀態(tài),同時往事務(wù)日志表中插入一條事務(wù)日志,用@Transactional注解保證事務(wù)。

Account-Service消費(fèi)消息

監(jiān)聽消息并處理給用戶增加余額邏輯

@Slf4j
@Service
@RocketMQMessageListener(topic = "add-amount",consumerGroup = "cloud-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired) )
public class AddUserAmountListener implements RocketMQListener<UserAddMoneyDTO> {
    private final AccountMapper accountMapper;
    /**
     * 收到消息的業(yè)務(wù)邏輯
     */
    @Override
    public void onMessage(UserAddMoneyDTO userAddMoneyDTO) {
        log.info("received message: {}",userAddMoneyDTO);
        accountMapper.increaseAmount(userAddMoneyDTO.getUserCode(),userAddMoneyDTO.getAmount());
        log.info("add money success");
    }
}

測試

測試數(shù)據(jù)

訂單表

用戶表

事務(wù)日志表

如果事務(wù)消息成功消費(fèi)最終用戶表中jianzh5這個用戶的amount應(yīng)該變成300(100+200)

測試準(zhǔn)備

我們在執(zhí)行本地事務(wù)成功并需要通知消息隊(duì)列提交事務(wù)處打個斷點(diǎn),然后在執(zhí)行到此處時手動模擬異常

模擬異常

在準(zhǔn)備提交事務(wù)時我們通過命令taskkill /pid 10116 -t -f命令強(qiáng)制殺掉OrderService進(jìn)程。(先通過jps獲取OrderService進(jìn)程ID)

重啟服務(wù)器,檢查是否會執(zhí)行回查方法

重啟OrderService程序會自動執(zhí)行回查方法,結(jié)合事務(wù)日志表判斷是否提交事務(wù)。

運(yùn)行后的結(jié)果

小結(jié)

我們介紹了使用消息隊(duì)列實(shí)現(xiàn)柔性事務(wù)的方案,重點(diǎn)剖析了RocketMQ事務(wù)消息的原理,并通過Demo案例實(shí)現(xiàn)了分布式事務(wù)(柔性事務(wù))。

到此這篇關(guān)于SpringCloud微服務(wù)開發(fā)基于RocketMQ實(shí)現(xiàn)分布式事務(wù)管理詳解的文章就介紹到這了,更多相關(guān)SpringCloud RocketMQ分布式事務(wù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 深入解析Java編程中的抽象類

    深入解析Java編程中的抽象類

    這篇文章主要介紹了Java編程中的抽象類,抽象類體現(xiàn)了Java面向?qū)ο缶幊痰奶匦?需要的朋友可以參考下
    2015-10-10
  • Java中MyBatis的動態(tài)語句詳解

    Java中MyBatis的動態(tài)語句詳解

    這篇文章主要介紹了Java中MyBatis的動態(tài)語句詳解,動態(tài) SQL 是 MyBatis 的強(qiáng)大特性之一,通過不同參數(shù)生成不同的 SQL,可以動態(tài)地對數(shù)據(jù)持久層進(jìn)行操作,而不需要每個數(shù)據(jù)訪問操作都要進(jìn)行手動地拼接 SQL 語句,需要的朋友可以參考下
    2023-08-08
  • spring AOP的Around增強(qiáng)實(shí)現(xiàn)方法分析

    spring AOP的Around增強(qiáng)實(shí)現(xiàn)方法分析

    這篇文章主要介紹了spring AOP的Around增強(qiáng)實(shí)現(xiàn)方法,結(jié)合實(shí)例形式分析了spring面向切面AOP的Around增強(qiáng)具體步驟與相關(guān)操作方法,需要的朋友可以參考下
    2020-01-01
  • Spring Boot詳解配置文件有哪些作用與細(xì)則

    Spring Boot詳解配置文件有哪些作用與細(xì)則

    SpringBoot項(xiàng)目是一個標(biāo)準(zhǔn)的Maven項(xiàng)目,它的配置文件需要放在src/main/resources/下,其文件名必須為application,其存在兩種文件形式,分別是properties和yaml(或者yml)文件
    2022-07-07
  • Mybatis批量插入大量數(shù)據(jù)的最優(yōu)方式總結(jié)

    Mybatis批量插入大量數(shù)據(jù)的最優(yōu)方式總結(jié)

    批量插入功能是我們?nèi)粘9ぷ髦斜容^常見的業(yè)務(wù)功能之一,下面這篇文章主要給大家總結(jié)介紹了關(guān)于Mybatis批量插入大量數(shù)據(jù)的幾種最優(yōu)方式,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2023-03-03
  • 詳解mybatis批量插入10萬條數(shù)據(jù)的優(yōu)化過程

    詳解mybatis批量插入10萬條數(shù)據(jù)的優(yōu)化過程

    這篇文章主要介紹了詳解mybatis批量插入10萬條數(shù)據(jù)的優(yōu)化過程,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-04-04
  • spring boot項(xiàng)目application.properties文件存放及使用介紹

    spring boot項(xiàng)目application.properties文件存放及使用介紹

    這篇文章主要介紹了spring boot項(xiàng)目application.properties文件存放及使用介紹,我們的application.properties文件中會有很多敏感信息,大家在使用過程中要多加小心
    2021-06-06
  • Java?IO及BufferedReader.readline()出現(xiàn)的Bug

    Java?IO及BufferedReader.readline()出現(xiàn)的Bug

    這篇文章主要介紹了Java?IO及BufferedReader.readline()出現(xiàn)的Bug,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • Spring中的BeanFactory工廠詳細(xì)解析

    Spring中的BeanFactory工廠詳細(xì)解析

    這篇文章主要介紹了Spring中的BeanFactory工廠詳細(xì)解析,Spring的本質(zhì)是一個bean工廠(beanFactory)或者說bean容器,它按照我們的要求,生產(chǎn)我們需要的各種各樣的bean,提供給我們使用,需要的朋友可以參考下
    2023-12-12
  • fastjson生成json時Null屬性不顯示的解決方法

    fastjson生成json時Null屬性不顯示的解決方法

    下面小編就為大家?guī)硪黄猣astjson生成json時Null屬性不顯示的解決方法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-02-02

最新評論