Spring Boot集成Seata實現(xiàn)基于AT模式的分布式事務的解決方案
1.什么是Seata?
Seata 是一款開源的分布式事務解決方案,致力于提供高性能和簡單易用的分布式事務服務。Seata 將為用戶提供了 AT、TCC、SAGA 和 XA 事務模式,為用戶打造一站式的分布式解決方案。
AT 模式
前提?
- 基于支持本地 ACID 事務的關系型數(shù)據(jù)庫。
- Java 應用,通過 JDBC 訪問數(shù)據(jù)庫。
整體機制?
兩階段提交協(xié)議的演變:
- 一階段:業(yè)務數(shù)據(jù)和回滾日志記錄在同一個本地事務中提交,釋放本地鎖和連接資源。
- 二階段:
- 提交異步化,非??焖俚赝瓿伞?/li>
- 回滾通過一階段的回滾日志進行反向補償。
寫隔離
- 一階段本地事務提交前,需要確保先拿到 全局鎖 。
- 拿不到 全局鎖 ,不能提交本地事務。
- 拿 全局鎖 的嘗試被限制在一定范圍內(nèi),超出范圍將放棄,并回滾本地事務,釋放本地鎖。
以一個示例來說明: 兩個全局事務 tx1 和 tx2,分別對 a 表的 m 字段進行更新操作,m 的初始值 1000。 tx1 先開始,開啟本地事務,拿到本地鎖,更新操作 m = 1000 - 100 = 900。本地事務提交前,先拿到該記錄的 全局鎖 ,本地提交釋放本地鎖。 tx2 后開始,開啟本地事務,拿到本地鎖,更新操作 m = 900 - 100 = 800。本地事務提交前,嘗試拿該記錄的 全局鎖 ,tx1 全局提交前,該記錄的全局鎖被 tx1 持有,tx2 需要重試等待 全局鎖 。
tx1 二階段全局提交,釋放 全局鎖 。tx2 拿到 全局鎖 提交本地事務。
如果 tx1 的二階段全局回滾,則 tx1 需要重新獲取該數(shù)據(jù)的本地鎖,進行反向補償?shù)母虏僮?,實現(xiàn)分支的回滾。 此時,如果 tx2 仍在等待該數(shù)據(jù)的 全局鎖,同時持有本地鎖,則 tx1 的分支回滾會失敗。分支的回滾會一直重試,直到 tx2 的 全局鎖 等鎖超時,放棄 全局鎖 并回滾本地事務釋放本地鎖,tx1 的分支回滾最終成功。 因為整個過程 全局鎖 在 tx1 結(jié)束前一直是被 tx1 持有的,所以不會發(fā)生 臟寫 的問題。
讀隔離
在數(shù)據(jù)庫本地事務隔離級別 讀已提交(Read Committed) 或以上的基礎上,Seata(AT 模式)的默認全局隔離級別是 讀未提交(Read Uncommitted) 。 如果應用在特定場景下,必需要求全局的 讀已提交 ,目前 Seata 的方式是通過 SELECT FOR UPDATE 語句的代理。
SELECT FOR UPDATE 語句的執(zhí)行會申請 全局鎖 ,如果 全局鎖 被其他事務持有,則釋放本地鎖(回滾 SELECT FOR UPDATE 語句的本地執(zhí)行)并重試。這個過程中,查詢是被 block 住的,直到 全局鎖 拿到,即讀取的相關數(shù)據(jù)是 已提交 的,才返回。
出于總體性能上的考慮,Seata 目前的方案并沒有對所有 SELECT 語句都進行代理,僅針對 FOR UPDATE 的 SELECT 語句。
具體例子相見:What Is Seata? | Apache Seata
2.環(huán)境搭建
安裝mysql
參見代碼倉庫里面的mysql模塊里面的docker文件夾
install seta-server
version: "3.1" services: seata-server: image: seataio/seata-server:latest hostname: seata-server ports: - "7091:7091" - "8091:8091" environment: - SEATA_PORT=8091 - STORE_MODE=file
http://localhost:7091/#/Overview
default username and password is admin/admin
3.代碼工程
實驗目標
訂單服務調(diào)用庫存服務和賬戶余額服務進行相應的扣減,并且最終生成訂單
seata-order
訂單服務
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>seata</artifactId> <groupId>com.et</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>seata-order</artifactId> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-autoconfigure</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.48</version> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>2.1.2</version> </dependency> <dependency> <groupId>io.seata</groupId> <artifactId>seata-spring-boot-starter</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>io.seata</groupId> <artifactId>seata-http</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.8</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies> </project>
controller
package com.et.seata.order.controller; import com.et.seata.order.service.OrderService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import java.io.IOException; import java.util.HashMap; import java.util.Map; @RestController public class HelloWorldController { @Autowired private OrderService orderService; @PostMapping("/create") public Map<String, Object> createOrder(@RequestParam("userId") Long userId, @RequestParam("productId") Long productId, @RequestParam("price") Integer price) throws IOException { Map<String, Object> map = new HashMap<>(); map.put("msg", "HelloWorld"); map.put("reuslt", orderService.createOrder(userId,productId,price)); return map; } }
service
package com.et.seata.order.service; import com.alibaba.fastjson.JSONObject; import com.et.seata.order.dao.OrderDao; import com.et.seata.order.dto.OrderDO; import io.seata.core.context.RootContext; import io.seata.integration.http.DefaultHttpExecutor; import io.seata.spring.annotation.GlobalTransactional; import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpResponse; import org.apache.http.util.EntityUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.io.IOException; /** * @author liuhaihua * @version 1.0 * @ClassName OrderServiceImpl * @Description todo * @date 2024/08/08/ 13:53 */ @Slf4j @Service public class OrderServiceImpl implements OrderService{ @Autowired OrderDao orderDao; @Override @GlobalTransactional // <1> public Integer createOrder(Long userId, Long productId, Integer price) throws IOException { Integer amount = 1; // 購買數(shù)量,暫時設置為 1。 log.info("[createOrder] 當前 XID: {}", RootContext.getXID()); // <2> 扣減庫存 this.reduceStock(productId, amount); // <3> 扣減余額 this.reduceBalance(userId, price); // <4> 保存訂單 log.info("[createOrder] 保存訂單"); return this.saveOrder(userId,productId,price,amount); } private Integer saveOrder(Long userId, Long productId, Integer price,Integer amount){ // <4> 保存訂單 OrderDO order = new OrderDO(); order.setUserId(userId); order.setProductId(productId); order.setPayAmount(amount * price); orderDao.saveOrder(order); log.info("[createOrder] 保存訂單: {}", order.getId()); return order.getId(); } private void reduceStock(Long productId, Integer amount) throws IOException { // 參數(shù)拼接 JSONObject params = new JSONObject().fluentPut("productId", String.valueOf(productId)) .fluentPut("amount", String.valueOf(amount)); // 執(zhí)行調(diào)用 HttpResponse response = DefaultHttpExecutor.getInstance().executePost("http://127.0.0.1:8082", "/stock", params, HttpResponse.class); // 解析結(jié)果 Boolean success = Boolean.valueOf(EntityUtils.toString(response.getEntity())); if (!success) { throw new RuntimeException("扣除庫存失敗"); } } private void reduceBalance(Long userId, Integer price) throws IOException { // 參數(shù)拼接 JSONObject params = new JSONObject().fluentPut("userId", String.valueOf(userId)) .fluentPut("price", String.valueOf(price)); // 執(zhí)行調(diào)用 HttpResponse response = DefaultHttpExecutor.getInstance().executePost("http://127.0.0.1:8083", "/balance", params, HttpResponse.class); // 解析結(jié)果 Boolean success = Boolean.valueOf(EntityUtils.toString(response.getEntity())); if (!success) { throw new RuntimeException("扣除余額失敗"); } } }
application.yaml
server: port: 8081 # 端口 spring: application: name: order-service datasource: url: jdbc:mysql://127.0.0.1:3306/seata_order?useSSL=false&useUnicode=true&characterEncoding=UTF-8 driver-class-name: com.mysql.jdbc.Driver username: root password: 123456 # Seata 配置項,對應 SeataProperties 類 seata: application-id: ${spring.application.name} # Seata 應用編號,默認為 ${spring.application.name} tx-service-group: ${spring.application.name}-group # Seata 事務組編號,用于 TC 集群名 # 服務配置項,對應 ServiceProperties 類 service: # 虛擬組和分組的映射 vgroup-mapping: order-service-group: default # 分組和 Seata 服務的映射 grouplist: default: 127.0.0.1:8091
seata-product
商品庫存服務
controller
package com.et.seata.product.controller; import com.et.seata.product.dto.ProductReduceStockDTO; import com.et.seata.product.service.ProductService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; @RestController @Slf4j public class ProductController { @Autowired ProductService productService; @PostMapping("/stock") public Boolean reduceStock(@RequestBody ProductReduceStockDTO productReduceStockDTO) { log.info("[reduceStock] 收到減少庫存請求, 商品:{}, 價格:{}", productReduceStockDTO.getProductId(), productReduceStockDTO.getAmount()); try { productService.reduceStock(productReduceStockDTO.getProductId(), productReduceStockDTO.getAmount()); // 正常扣除庫存,返回 true return true; } catch (Exception e) { // 失敗扣除庫存,返回 false return false; } } }
service
package com.et.seata.product.service; import com.et.seata.product.dao.ProductDao; import io.seata.core.context.RootContext; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @Service @Slf4j public class ProductServiceImpl implements ProductService { @Autowired private ProductDao productDao; @Override @Transactional // <1> 開啟新事物 public void reduceStock(Long productId, Integer amount) throws Exception { log.info("[reduceStock] 當前 XID: {}", RootContext.getXID()); // <2> 檢查庫存 checkStock(productId, amount); log.info("[reduceStock] 開始扣減 {} 庫存", productId); // <3> 扣減庫存 int updateCount = productDao.reduceStock(productId, amount); // 扣除成功 if (updateCount == 0) { log.warn("[reduceStock] 扣除 {} 庫存失敗", productId); throw new Exception("庫存不足"); } // 扣除失敗 log.info("[reduceStock] 扣除 {} 庫存成功", productId); } private void checkStock(Long productId, Integer requiredAmount) throws Exception { log.info("[checkStock] 檢查 {} 庫存", productId); Integer stock = productDao.getStock(productId); if (stock < requiredAmount) { log.warn("[checkStock] {} 庫存不足,當前庫存: {}", productId, stock); throw new Exception("庫存不足"); } } }
dao
package com.et.seata.product.dao; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Select; import org.apache.ibatis.annotations.Update; import org.springframework.stereotype.Repository; @Mapper @Repository public interface ProductDao { /** * 獲取庫存 * * @param productId 商品編號 * @return 庫存 */ @Select("SELECT stock FROM product WHERE id = #{productId}") Integer getStock(@Param("productId") Long productId); /** * 扣減庫存 * * @param productId 商品編號 * @param amount 扣減數(shù)量 * @return 影響記錄行數(shù) */ @Update("UPDATE product SET stock = stock - #{amount} WHERE id = #{productId} AND stock >= #{amount}") int reduceStock(@Param("productId") Long productId, @Param("amount") Integer amount); }
seata-balance
用戶余額服務
controller
package com.et.seata.balance.controller; import com.et.seata.balance.dto.AccountReduceBalanceDTO; import com.et.seata.balance.service.AccountService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.HashMap; import java.util.Map; @RestController @Slf4j public class AccountController { @Autowired private AccountService accountService; @PostMapping("/balance") public Boolean reduceBalance(@RequestBody AccountReduceBalanceDTO accountReduceBalanceDTO) { log.info("[reduceBalance] 收到減少余額請求, 用戶:{}, 金額:{}", accountReduceBalanceDTO.getUserId(), accountReduceBalanceDTO.getPrice()); try { accountService.reduceBalance(accountReduceBalanceDTO.getUserId(), accountReduceBalanceDTO.getPrice()); // 正??鄢囝~,返回 true return true; } catch (Exception e) { // 失敗扣除余額,返回 false return false; } } }
service
package com.et.seata.balance.service; import com.et.seata.balance.dao.AccountDao; import io.seata.core.context.RootContext; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; @Service @Slf4j public class AccountServiceImpl implements AccountService { @Autowired private AccountDao accountDao; @Override @Transactional(propagation = Propagation.REQUIRES_NEW) // <1> 開啟新事物 public void reduceBalance(Long userId, Integer price) throws Exception { log.info("[reduceBalance] 當前 XID: {}", RootContext.getXID()); // <2> 檢查余額 checkBalance(userId, price); log.info("[reduceBalance] 開始扣減用戶 {} 余額", userId); // <3> 扣除余額 int updateCount = accountDao.reduceBalance(price); // 扣除成功 if (updateCount == 0) { log.warn("[reduceBalance] 扣除用戶 {} 余額失敗", userId); throw new Exception("余額不足"); } log.info("[reduceBalance] 扣除用戶 {} 余額成功", userId); } private void checkBalance(Long userId, Integer price) throws Exception { log.info("[checkBalance] 檢查用戶 {} 余額", userId); Integer balance = accountDao.getBalance(userId); if (balance < price) { log.warn("[checkBalance] 用戶 {} 余額不足,當前余額:{}", userId, balance); throw new Exception("余額不足"); } } }
dao
package com.et.seata.balance.dao; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Select; import org.apache.ibatis.annotations.Update; import org.springframework.stereotype.Repository; @Mapper @Repository public interface AccountDao { /** * 獲取賬戶余額 * * @param userId 用戶 ID * @return 賬戶余額 */ @Select("SELECT balance FROM account WHERE id = #{userId}") Integer getBalance(@Param("userId") Long userId); /** * 扣減余額 * * @param price 需要扣減的數(shù)目 * @return 影響記錄行數(shù) */ @Update("UPDATE account SET balance = balance - #{price} WHERE id = 1 AND balance >= ${price}") int reduceBalance(@Param("price") Integer price); }
以上只是一些關鍵代碼,所有代碼請參見下面代碼倉庫
代碼倉庫
https://github.com/Harries/springboot-demo
4.測試
- 啟動seata-order服務
- 啟動seata-product服務
- 啟動seata-balance服務
?編輯可以看到控制臺輸出回滾日志
2024-08-08 22:00:59.467 INFO 35051 --- [tch_RMROLE_1_16] i.s.core.rpc.netty.RmMessageListener : onMessage:xid=172.22.0.3:8091:27573281007513609,branchId=27573281007513610,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/seata_storage,applicationData=null
2024-08-08 22:00:59.467 INFO 35051 --- [tch_RMROLE_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacking: 172.22.0.3:8091:27573281007513609 27573281007513610 jdbc:mysql://127.0.0.1:3306/seata_storage
2024-08-08 22:00:59.503 INFO 35051 --- [tch_RMROLE_1_16] i.s.r.d.undo.AbstractUndoLogManager : xid 172.22.0.3:8091:27573281007513609 branch 27573281007513610, undo_log deleted with GlobalFinished
2024-08-08 22:00:59.511 INFO 35051 --- [tch_RMROLE_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacked result: PhaseTwo_Rollbacked
5.引用
到此這篇關于Spring Boot集成Seata實現(xiàn)基于AT模式的分布式事務的文章就介紹到這了,更多相關Spring Boot集成Seata分布式事務內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
maven自動將源碼打包并發(fā)布的實現(xiàn)步驟
maven-source-plugin 提供項目自動將源碼打包并發(fā)布的功能,在需要發(fā)布源碼項目的 pom.xml 文件中添加即可,本文就來介紹一下如何設置,感興趣的可以了解一下2023-11-11Java編程rabbitMQ實現(xiàn)消息的收發(fā)
RabbitMQ是一個在AMQP基礎上完成的,可復用的企業(yè)消息系統(tǒng),本文通過實例來給大家分享通過操作rabbitMQ實現(xiàn)消息的收發(fā),感興趣的朋友可以參考下。2017-09-09Spring Cloud Hystrix 服務容錯保護的原理實現(xiàn)
這篇文章主要介紹了Spring Cloud Hystrix 服務容錯保護的原理實現(xiàn),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2019-05-05Java C++題解leetcode886可能的二分法并查集染色法
這篇文章主要為大家介紹了Java C++題解leetcode886可能的二分法并查集染色法實現(xiàn)示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-10-10