欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spring Boot集成Seata實現(xiàn)基于AT模式的分布式事務的解決方案

 更新時間:2024年08月12日 10:56:06   作者:HBLOGA  
Seata 是一款開源的分布式事務解決方案,致力于提供高性能和簡單易用的分布式事務服務,這篇文章主要介紹了Spring Boot集成Seata實現(xiàn)基于AT模式的分布式事務,需要的朋友可以參考下

1.什么是Seata?

Seata 是一款開源的分布式事務解決方案,致力于提供高性能和簡單易用的分布式事務服務。Seata 將為用戶提供了 AT、TCC、SAGA 和 XA 事務模式,為用戶打造一站式的分布式解決方案。

AT 模式

前提?

  • 基于支持本地 ACID 事務的關系型數(shù)據(jù)庫。
  • Java 應用,通過 JDBC 訪問數(shù)據(jù)庫。

整體機制?

兩階段提交協(xié)議的演變:

  • 一階段:業(yè)務數(shù)據(jù)和回滾日志記錄在同一個本地事務中提交,釋放本地鎖和連接資源。
  • 二階段:
    • 提交異步化,非??焖俚赝瓿伞?/li>
    • 回滾通過一階段的回滾日志進行反向補償。

 寫隔離

  • 一階段本地事務提交前,需要確保先拿到 全局鎖 。
  • 拿不到 全局鎖 ,不能提交本地事務。
  • 拿 全局鎖 的嘗試被限制在一定范圍內(nèi),超出范圍將放棄,并回滾本地事務,釋放本地鎖。

以一個示例來說明: 兩個全局事務 tx1 和 tx2,分別對 a 表的 m 字段進行更新操作,m 的初始值 1000。 tx1 先開始,開啟本地事務,拿到本地鎖,更新操作 m = 1000 - 100 = 900。本地事務提交前,先拿到該記錄的 全局鎖 ,本地提交釋放本地鎖。 tx2 后開始,開啟本地事務,拿到本地鎖,更新操作 m = 900 - 100 = 800。本地事務提交前,嘗試拿該記錄的 全局鎖 ,tx1 全局提交前,該記錄的全局鎖被 tx1 持有,tx2 需要重試等待 全局鎖 。

tx1 二階段全局提交,釋放 全局鎖 。tx2 拿到 全局鎖 提交本地事務。

如果 tx1 的二階段全局回滾,則 tx1 需要重新獲取該數(shù)據(jù)的本地鎖,進行反向補償?shù)母虏僮?,實現(xiàn)分支的回滾。 此時,如果 tx2 仍在等待該數(shù)據(jù)的 全局鎖,同時持有本地鎖,則 tx1 的分支回滾會失敗。分支的回滾會一直重試,直到 tx2 的 全局鎖 等鎖超時,放棄 全局鎖 并回滾本地事務釋放本地鎖,tx1 的分支回滾最終成功。 因為整個過程 全局鎖 在 tx1 結(jié)束前一直是被 tx1 持有的,所以不會發(fā)生 臟寫 的問題。

讀隔離

在數(shù)據(jù)庫本地事務隔離級別 讀已提交(Read Committed) 或以上的基礎上,Seata(AT 模式)的默認全局隔離級別是 讀未提交(Read Uncommitted) 。 如果應用在特定場景下,必需要求全局的 讀已提交 ,目前 Seata 的方式是通過 SELECT FOR UPDATE 語句的代理。

Read Isolation: SELECT FOR UPDATE

SELECT FOR UPDATE 語句的執(zhí)行會申請 全局鎖 ,如果 全局鎖 被其他事務持有,則釋放本地鎖(回滾 SELECT FOR UPDATE 語句的本地執(zhí)行)并重試。這個過程中,查詢是被 block 住的,直到 全局鎖 拿到,即讀取的相關數(shù)據(jù)是 已提交 的,才返回。

出于總體性能上的考慮,Seata 目前的方案并沒有對所有 SELECT 語句都進行代理,僅針對 FOR UPDATE 的 SELECT 語句。

具體例子相見:What Is Seata? | Apache Seata

2.環(huán)境搭建

安裝mysql

參見代碼倉庫里面的mysql模塊里面的docker文件夾

 install seta-server

version: "3.1"
services:
  seata-server:
    image: seataio/seata-server:latest
    hostname: seata-server
    ports:
      - "7091:7091"
      - "8091:8091"
    environment:
      - SEATA_PORT=8091
      - STORE_MODE=file

http://localhost:7091/#/Overview

default username and password is admin/admin

3.代碼工程

實驗目標

訂單服務調(diào)用庫存服務和賬戶余額服務進行相應的扣減,并且最終生成訂單

seata-order

訂單服務

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>seata</artifactId>
        <groupId>com.et</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>seata-order</artifactId>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.48</version>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.1.2</version>
        </dependency>
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-spring-boot-starter</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-http</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.8</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>
</project>

controller

package com.et.seata.order.controller;
import com.et.seata.order.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@RestController
public class HelloWorldController {
    @Autowired
    private OrderService orderService;
    @PostMapping("/create")
    public Map<String, Object> createOrder(@RequestParam("userId") Long userId,
                                              @RequestParam("productId") Long productId,
                                              @RequestParam("price") Integer price) throws IOException {
        Map<String, Object> map = new HashMap<>();
        map.put("msg", "HelloWorld");
        map.put("reuslt", orderService.createOrder(userId,productId,price));
        return map;
    }
}

service

package com.et.seata.order.service;
import com.alibaba.fastjson.JSONObject;
import com.et.seata.order.dao.OrderDao;
import com.et.seata.order.dto.OrderDO;
import io.seata.core.context.RootContext;
import io.seata.integration.http.DefaultHttpExecutor;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpResponse;
import org.apache.http.util.EntityUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
/**
 * @author liuhaihua
 * @version 1.0
 * @ClassName OrderServiceImpl
 * @Description todo
 * @date 2024/08/08/ 13:53
 */
@Slf4j
@Service
public class OrderServiceImpl implements OrderService{
   @Autowired
   OrderDao orderDao;
    @Override
    @GlobalTransactional // <1>
    public Integer createOrder(Long userId, Long productId, Integer price) throws IOException {
        Integer amount = 1; // 購買數(shù)量,暫時設置為 1。
        log.info("[createOrder] 當前 XID: {}", RootContext.getXID());
        // <2> 扣減庫存
        this.reduceStock(productId, amount);
        // <3> 扣減余額
        this.reduceBalance(userId, price);
        // <4> 保存訂單
        log.info("[createOrder] 保存訂單");
        return this.saveOrder(userId,productId,price,amount);
    }
    private Integer saveOrder(Long userId, Long productId, Integer price,Integer amount){
      // <4> 保存訂單
      OrderDO order = new OrderDO();
      order.setUserId(userId);
      order.setProductId(productId);
      order.setPayAmount(amount * price);
      orderDao.saveOrder(order);
      log.info("[createOrder] 保存訂單: {}", order.getId());
      return order.getId();
   }
    private void reduceStock(Long productId, Integer amount) throws IOException {
        // 參數(shù)拼接
        JSONObject params = new JSONObject().fluentPut("productId", String.valueOf(productId))
                .fluentPut("amount", String.valueOf(amount));
        // 執(zhí)行調(diào)用
        HttpResponse response = DefaultHttpExecutor.getInstance().executePost("http://127.0.0.1:8082", "/stock",
                params, HttpResponse.class);
        // 解析結(jié)果
        Boolean success = Boolean.valueOf(EntityUtils.toString(response.getEntity()));
        if (!success) {
            throw new RuntimeException("扣除庫存失敗");
        }
    }
    private void reduceBalance(Long userId, Integer price) throws IOException {
        // 參數(shù)拼接
        JSONObject params = new JSONObject().fluentPut("userId", String.valueOf(userId))
                .fluentPut("price", String.valueOf(price));
        // 執(zhí)行調(diào)用
        HttpResponse response = DefaultHttpExecutor.getInstance().executePost("http://127.0.0.1:8083", "/balance",
                params, HttpResponse.class);
        // 解析結(jié)果
        Boolean success = Boolean.valueOf(EntityUtils.toString(response.getEntity()));
        if (!success) {
            throw new RuntimeException("扣除余額失敗");
        }
    }
}

application.yaml

server:
  port: 8081 # 端口
spring:
  application:
    name: order-service
  datasource:
    url: jdbc:mysql://127.0.0.1:3306/seata_order?useSSL=false&useUnicode=true&characterEncoding=UTF-8
    driver-class-name: com.mysql.jdbc.Driver
    username: root
    password: 123456
# Seata 配置項,對應 SeataProperties 類
seata:
  application-id: ${spring.application.name} # Seata 應用編號,默認為 ${spring.application.name}
  tx-service-group: ${spring.application.name}-group # Seata 事務組編號,用于 TC 集群名
  # 服務配置項,對應 ServiceProperties 類
  service:
    # 虛擬組和分組的映射
    vgroup-mapping:
      order-service-group: default
    # 分組和 Seata 服務的映射
    grouplist:
      default: 127.0.0.1:8091

seata-product

商品庫存服務

controller

package com.et.seata.product.controller;
import com.et.seata.product.dto.ProductReduceStockDTO;
import com.et.seata.product.service.ProductService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
@RestController
@Slf4j
public class ProductController {
   @Autowired
   ProductService productService;
   @PostMapping("/stock")
   public Boolean reduceStock(@RequestBody ProductReduceStockDTO productReduceStockDTO) {
      log.info("[reduceStock] 收到減少庫存請求, 商品:{}, 價格:{}", productReduceStockDTO.getProductId(),
            productReduceStockDTO.getAmount());
      try {
         productService.reduceStock(productReduceStockDTO.getProductId(), productReduceStockDTO.getAmount());
         // 正常扣除庫存,返回 true
         return true;
      } catch (Exception e) {
         // 失敗扣除庫存,返回 false
         return false;
      }
   }
}

service

package com.et.seata.product.service;
import com.et.seata.product.dao.ProductDao;
import io.seata.core.context.RootContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
@Slf4j
public class ProductServiceImpl implements ProductService {
    @Autowired
    private ProductDao productDao;
    @Override
    @Transactional // <1> 開啟新事物
    public void reduceStock(Long productId, Integer amount) throws Exception {
        log.info("[reduceStock] 當前 XID: {}", RootContext.getXID());
        // <2> 檢查庫存
        checkStock(productId, amount);
        log.info("[reduceStock] 開始扣減 {} 庫存", productId);
        // <3> 扣減庫存
        int updateCount = productDao.reduceStock(productId, amount);
        // 扣除成功
        if (updateCount == 0) {
            log.warn("[reduceStock] 扣除 {} 庫存失敗", productId);
            throw new Exception("庫存不足");
        }
        // 扣除失敗
        log.info("[reduceStock] 扣除 {} 庫存成功", productId);
    }
    private void checkStock(Long productId, Integer requiredAmount) throws Exception {
        log.info("[checkStock] 檢查 {} 庫存", productId);
        Integer stock = productDao.getStock(productId);
        if (stock < requiredAmount) {
            log.warn("[checkStock] {} 庫存不足,當前庫存: {}", productId, stock);
            throw new Exception("庫存不足");
        }
    }
}

dao

package com.et.seata.product.dao;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import org.springframework.stereotype.Repository;
@Mapper
@Repository
public interface ProductDao {
    /**
     * 獲取庫存
     *
     * @param productId 商品編號
     * @return 庫存
     */
    @Select("SELECT stock FROM product WHERE id = #{productId}")
    Integer getStock(@Param("productId") Long productId);
    /**
     * 扣減庫存
     *
     * @param productId 商品編號
     * @param amount    扣減數(shù)量
     * @return 影響記錄行數(shù)
     */
    @Update("UPDATE product SET stock = stock - #{amount} WHERE id = #{productId} AND stock >= #{amount}")
    int reduceStock(@Param("productId") Long productId, @Param("amount") Integer amount);
}

seata-balance

用戶余額服務

controller

package com.et.seata.balance.controller;
import com.et.seata.balance.dto.AccountReduceBalanceDTO;
import com.et.seata.balance.service.AccountService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
@RestController
@Slf4j
public class AccountController {
   @Autowired
   private AccountService accountService;
   @PostMapping("/balance")
   public Boolean reduceBalance(@RequestBody AccountReduceBalanceDTO accountReduceBalanceDTO) {
      log.info("[reduceBalance] 收到減少余額請求, 用戶:{}, 金額:{}", accountReduceBalanceDTO.getUserId(),
            accountReduceBalanceDTO.getPrice());
      try {
         accountService.reduceBalance(accountReduceBalanceDTO.getUserId(), accountReduceBalanceDTO.getPrice());
         // 正??鄢囝~,返回 true
         return true;
      } catch (Exception e) {
         // 失敗扣除余額,返回 false
         return false;
      }
   }
}

service

package com.et.seata.balance.service;
import com.et.seata.balance.dao.AccountDao;
import io.seata.core.context.RootContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
@Service
@Slf4j
public class AccountServiceImpl implements AccountService {
    @Autowired
    private AccountDao accountDao;
    @Override
    @Transactional(propagation = Propagation.REQUIRES_NEW) // <1> 開啟新事物
    public void reduceBalance(Long userId, Integer price) throws Exception {
        log.info("[reduceBalance] 當前 XID: {}", RootContext.getXID());
        // <2> 檢查余額
        checkBalance(userId, price);
        log.info("[reduceBalance] 開始扣減用戶 {} 余額", userId);
        // <3> 扣除余額
        int updateCount = accountDao.reduceBalance(price);
        // 扣除成功
        if (updateCount == 0) {
            log.warn("[reduceBalance] 扣除用戶 {} 余額失敗", userId);
            throw new Exception("余額不足");
        }
        log.info("[reduceBalance] 扣除用戶 {} 余額成功", userId);
    }
    private void checkBalance(Long userId, Integer price) throws Exception {
        log.info("[checkBalance] 檢查用戶 {} 余額", userId);
        Integer balance = accountDao.getBalance(userId);
        if (balance < price) {
            log.warn("[checkBalance] 用戶 {} 余額不足,當前余額:{}", userId, balance);
            throw new Exception("余額不足");
        }
    }
}

dao

package com.et.seata.balance.dao;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import org.springframework.stereotype.Repository;
@Mapper
@Repository
public interface AccountDao {
    /**
     * 獲取賬戶余額
     *
     * @param userId 用戶 ID
     * @return 賬戶余額
     */
    @Select("SELECT balance FROM account WHERE id = #{userId}")
    Integer getBalance(@Param("userId") Long userId);
    /**
     * 扣減余額
     *
     * @param price 需要扣減的數(shù)目
     * @return 影響記錄行數(shù)
     */
    @Update("UPDATE account SET balance = balance - #{price} WHERE id = 1 AND balance >= ${price}")
    int reduceBalance(@Param("price") Integer price);
}

以上只是一些關鍵代碼,所有代碼請參見下面代碼倉庫

代碼倉庫

https://github.com/Harries/springboot-demo

4.測試

  • 啟動seata-order服務
  • 啟動seata-product服務
  • 啟動seata-balance服務

?編輯可以看到控制臺輸出回滾日志

2024-08-08 22:00:59.467 INFO 35051 --- [tch_RMROLE_1_16] i.s.core.rpc.netty.RmMessageListener : onMessage:xid=172.22.0.3:8091:27573281007513609,branchId=27573281007513610,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/seata_storage,applicationData=null
2024-08-08 22:00:59.467 INFO 35051 --- [tch_RMROLE_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacking: 172.22.0.3:8091:27573281007513609 27573281007513610 jdbc:mysql://127.0.0.1:3306/seata_storage
2024-08-08 22:00:59.503 INFO 35051 --- [tch_RMROLE_1_16] i.s.r.d.undo.AbstractUndoLogManager : xid 172.22.0.3:8091:27573281007513609 branch 27573281007513610, undo_log deleted with GlobalFinished
2024-08-08 22:00:59.511 INFO 35051 --- [tch_RMROLE_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacked result: PhaseTwo_Rollbacked

5.引用

https://seata.apache.org

到此這篇關于Spring Boot集成Seata實現(xiàn)基于AT模式的分布式事務的文章就介紹到這了,更多相關Spring Boot集成Seata分布式事務內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • 詳解Java目錄操作與文件操作教程

    詳解Java目錄操作與文件操作教程

    本章具體介紹了目錄操作、文件操作的基本使用方法和常用函數(shù),圖解穿插代碼實現(xiàn),感興趣的朋友來看看吧
    2022-03-03
  • Java中的LinkedHashMap詳解

    Java中的LinkedHashMap詳解

    這篇文章主要介紹了Java中的LinkedHashMap詳解,LinkedHashMap繼承自HashMap,它的多種操作都是建立在HashMap操作的基礎上的,同HashMap不同的是,LinkedHashMap維護了一個Entry的雙向鏈表,保證了插入的Entry中的順序,需要的朋友可以參考下
    2023-09-09
  • Spring Security基于JWT登錄認證的項目實踐

    Spring Security基于JWT登錄認證的項目實踐

    JWT被用來在身份提供者和服務提供者間傳遞被認證的用戶身份信息,本文主要介紹了Spring Security基于JWT登錄認證的項目實踐,文中通過示例代碼介紹的非常詳細,需要的朋友們下面隨著小編來一起學習學習吧
    2023-07-07
  • maven自動將源碼打包并發(fā)布的實現(xiàn)步驟

    maven自動將源碼打包并發(fā)布的實現(xiàn)步驟

    maven-source-plugin 提供項目自動將源碼打包并發(fā)布的功能,在需要發(fā)布源碼項目的 pom.xml 文件中添加即可,本文就來介紹一下如何設置,感興趣的可以了解一下
    2023-11-11
  • Java編程rabbitMQ實現(xiàn)消息的收發(fā)

    Java編程rabbitMQ實現(xiàn)消息的收發(fā)

    RabbitMQ是一個在AMQP基礎上完成的,可復用的企業(yè)消息系統(tǒng),本文通過實例來給大家分享通過操作rabbitMQ實現(xiàn)消息的收發(fā),感興趣的朋友可以參考下。
    2017-09-09
  • Spring Cloud Hystrix 服務容錯保護的原理實現(xiàn)

    Spring Cloud Hystrix 服務容錯保護的原理實現(xiàn)

    這篇文章主要介紹了Spring Cloud Hystrix 服務容錯保護的原理實現(xiàn),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2019-05-05
  • Java中的OpenJDK使用原理

    Java中的OpenJDK使用原理

    這篇文章主要介紹了Java中的OpenJDK使用原理,OpenJDK是Java的開發(fā)工具包,關于Java為什么要使用它文章作簡單介紹,感興趣的朋友可以參考一下
    2022-06-06
  • 用Eclipse生成JPA元模型的方法

    用Eclipse生成JPA元模型的方法

    下面小編就為大家?guī)硪黄肊clipse生成JPA元模型的方法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-08-08
  • SpringBoot如何使用過濾器進行XSS防御

    SpringBoot如何使用過濾器進行XSS防御

    想對全局的請求都進行XSS防御可以使用servlet中的過濾器或者spring mvc中的攔截器,下面我們就來看看如何使用servlet中的過濾器進行XSS防御吧
    2024-11-11
  • Java C++題解leetcode886可能的二分法并查集染色法

    Java C++題解leetcode886可能的二分法并查集染色法

    這篇文章主要為大家介紹了Java C++題解leetcode886可能的二分法并查集染色法實現(xiàn)示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-10-10

最新評論