SpringCloud微服務開發(fā)基于RocketMQ實現(xiàn)分布式事務管理詳解
消息隊列實現(xiàn)分布式事務原理
首先讓我們來看一下基于消息隊列實現(xiàn)分布式事務的原理方案。
柔性事務
發(fā)送消息的服務有個OUTBOX數(shù)據(jù)表,在進行INSERT、UPDATE、DELETE 業(yè)務操作時也會給OUTBOX數(shù)據(jù)表INSERT一條消息記錄,這樣可以保證原子性,因為這是基于本地的ACID事務。
OUTBOX表充當臨時消息隊列,然后我們在引入一個消息中繼(MessageRelay)的服務,由他從OUTBOX表中讀取數(shù)據(jù)并發(fā)布消息到消息組件。
消息中繼的實現(xiàn)可以很簡單,只需要通過定時任務定期從OUTBOX表中拉取最新未發(fā)布的數(shù)據(jù),獲取到數(shù)據(jù)后將數(shù)據(jù)發(fā)送給消息組件,最后將完成發(fā)送的消息從OUTBOX表中刪除即可,對于失敗的消息可以根據(jù)業(yè)務規(guī)則進行重試。
RocketMQ的事務消息
RocketMQ本身已經(jīng)支持事務消息,如果你們項目使用了RocketMQ,可以直接借助RocketMQ的事務消息實現(xiàn)分布式事務,我們先看一下RocketMQ事務消息的原理然后再借助RocketMQ來實現(xiàn)分布式事務。
RocketMQ采用了2PC的思想來實現(xiàn)了提交事務消息,同時增加一個補償邏輯來處理二階段超時或者失敗的消息,如下圖所示。
分布式事務
RocketMQ實現(xiàn)事務消息主要分為兩個階段:正常事務的發(fā)送及提交、事務信息的補償流程
整體流程為:
正常事務發(fā)送與提交階段
1、生產者發(fā)送一個半消息給MQServer(半消息是指消費者暫時不能消費的消息)
2、服務端響應消息寫入結果,半消息發(fā)送成功
3、開始執(zhí)行本地事務
4、根據(jù)本地事務的執(zhí)行狀態(tài)執(zhí)行Commit或者Rollback操作
事務信息的補償流程
1、如果MQServer長時間沒收到本地事務的執(zhí)行狀態(tài)會向生產者發(fā)起一個確認回查的操作請求
2、生產者收到確認回查請求后,檢查本地事務的執(zhí)行狀態(tài)
3、根據(jù)檢查后的結果執(zhí)行Commit或者Rollback操作
補償階段主要是用于解決生產者在發(fā)送Commit或者Rollback操作時發(fā)生超時或失敗的情況。
RocketMQ事務流程關鍵
事務消息在一階段對用戶不可見
事務消息相對普通消息最大的特點就是一階段發(fā)送的消息對用戶是不可見的,也就是說消費者不能直接消費。這里RocketMQ的實現(xiàn)方法是原消息的主題與消息消費隊列,然后把主題改成RMQ_SYS_TRANS_HALF_TOPIC
,這樣由于消費者沒有訂閱這個主題,所以不會被消費。
如何處理第二階段的失敗消息?
在本地事務執(zhí)行完成后會向MQServer發(fā)送Commit或Rollback操作,此時如果在發(fā)送消息的時候生產者出故障了,那么要保證這條消息最終被消費,MQServer會像服務端發(fā)送回查請求,確認本地事務的執(zhí)行狀態(tài)。
當然了rocketmq并不會無休止的的信息事務狀態(tài)回查,默認回查15次,如果15次回查還是無法得知事務狀態(tài),RocketMQ默認回滾該消息。
消息狀態(tài) 事務消息有三種狀態(tài):TransactionStatus.CommitTransaction:提交事務消息,消費者可以消費此消息
TransactionStatus.RollbackTransaction:回滾事務,它代表該消息將被刪除,不允許被消費。
TransactionStatus.Unknown:中間狀態(tài),它代表需要檢查消息隊列來確定狀態(tài)。
代碼實現(xiàn)
業(yè)務需求:用戶請求訂單微服務order-service
接口刪除訂單(退貨),刪除訂單時需要調用account-service
的方法給賬戶增加余額,一個典型的分布式事務問題。
基礎配置
在Order-Service和Account-Service中引入Rocket消息組件
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> </dependency>
在配置中心添加RocketMQ的相關配置
rocketmq:
name-server: xxx.xx.x.xx:9876
producer:
group: cloud-group
在OrderService服務中建立一張事務日志表rocketmq_transaction_log(作用稍后說)
發(fā)送半消息
Order-Service作為分布式事務開始的入口,在Service層我們給RocketMQ發(fā)送一條半消息
OrderController入口
/** * 根據(jù)訂單號刪除訂單 * @param orderNo 訂單編號 */ @PostMapping("/order/delete") public ResultData<String> delete(@RequestParam String orderNo){ log.info("delete order id is {}",orderNo); orderService.delete(orderNo); return ResultData.success("訂單刪除成功"); }
直接調用orderService的delete方法
OrderServiceImpl業(yè)務邏輯
@Override public void delete(String orderNo) { Order order = orderMapper.selectByNo(orderNo); //如果訂單存在且狀態(tài)為有效,進行業(yè)務處理 if (order != null && CloudConstant.VALID_STATUS.equals(order.getStatus())) { String transactionId = UUID.randomUUID().toString(); //如果可以刪除訂單則發(fā)送消息給rocketmq,讓用戶中心消費消息 rocketMQTemplate.sendMessageInTransaction("add-amount", MessageBuilder.withPayload( UserAddMoneyDTO.builder() .userCode(order.getAccountCode()) .amount(order.getAmount()) .build() ) .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId) .setHeader("order_id",order.getId()) .build() ,order ); } }
首先校驗一下訂單狀態(tài),然后使用rocketMQTemplate.sendMessageInTransaction()
發(fā)送事務消息。
sendMessageInTransaction方法有三個參數(shù):
- destination:目的地(主題),這里發(fā)送給
add-amount
這個topic - message:發(fā)送給消費者的消息體,需要使用
MessageBuilder.withPayload()
來構建消息 - arg:參數(shù)
注意,這里我們生成了一個transactionId,并放在header中跟消息一起發(fā)送(這里實際也可以構造成一個對象,放在arg里進行發(fā)送),作用后面再講!
消息封裝實體UserAddMoneyDTO
@Data @NoArgsConstructor @AllArgsConstructor @Builder public class UserAddMoneyDTO { /** * 用戶編碼 */ private String userCode; /** * 金額 */ private BigDecimal amount; }
這個類生產者和消費者都需要用到,所以我直接丟到common包中,大家根據(jù)項目實際情況決定放哪。
執(zhí)行本地事務與回查
MQServer收到半消息后會告訴生產者order-service確認收到半消息,這時候order-service需要執(zhí)行本地事務,執(zhí)行完本地事務后再告訴MQServer本地事務的執(zhí)行狀態(tài),確認此消息究竟是Commit還是Rollback。
RocketMQ提供了RocketMQLocalTransactionListener
接口,本地事務監(jiān)聽器,這個接口類的實現(xiàn)如下:
第一個方法executeLocalTransaction
為執(zhí)行本地事務;第二個方法checkLocalTransaction
為檢查本地事務的執(zhí)行狀態(tài),也就是回查動作。
我們需要實現(xiàn)RocketMQLocalTransactionListener
接口,在executeLocalTransaction
方法中執(zhí)行本地事務,在執(zhí)行checkLocalTransaction
回查方法時告訴RocketMQ到底該提交還是回滾。
這里大家思考一個問題,本地事務已經(jīng)執(zhí)行完成了,怎么去回查本地事務的執(zhí)行結果呢?
答案如下:我們可以在執(zhí)行本地事務的時候同時生成一條事務日志,讓本地事務與日志事務在同一個方法中,同時添加@Transactional
注解,保證兩個操作事務是一個原子操作。
這樣如果事務日志表中有這個本地事務的信息,那就代表本地事務執(zhí)行成功,需要Commit,相反如果沒有對應的事務日志,則表示執(zhí)行失敗,需要Rollback。這就是為什么我們上面在OrderService中需要建立一張事務日志表的原因。
實現(xiàn)RocketMQLocalTransactionListener
接口,完成事務執(zhí)行邏輯
/** * 監(jiān)聽事務消息 * @author javadaily */ @Slf4j @RocketMQTransactionListener @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class AddUserAmountListener implements RocketMQLocalTransactionListener { private final OrderService orderService; private final RocketMqTransactionLogMapper rocketMqTransactionLogMapper; /** * 執(zhí)行本地事務 */ @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) { log.info("執(zhí)行本地事務"); MessageHeaders headers = message.getHeaders(); //獲取事務ID String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID); Integer orderId = Integer.valueOf((String)headers.get("order_id")); log.info("transactionId is {}, orderId is {}",transactionId,orderId); try{ //執(zhí)行本地事務,并記錄日志 orderService.changeStatuswithRocketMqLog(orderId, CloudConstant.INVALID_STATUS,transactionId); //執(zhí)行成功,可以提交事務 return RocketMQLocalTransactionState.COMMIT; }catch (Exception e){ return RocketMQLocalTransactionState.ROLLBACK; } } /** * 本地事務的檢查,檢查本地事務是否成功 */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { MessageHeaders headers = message.getHeaders(); //獲取事務ID String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID); log.info("檢查本地事務,事務ID:{}",transactionId); //根據(jù)事務id從日志表檢索 QueryWrapper<RocketmqTransactionLog> queryWrapper = new QueryWrapper<>(); queryWrapper.eq("transaction_id",transactionId); RocketmqTransactionLog rocketmqTransactionLog = rocketMqTransactionLogMapper.selectOne(queryWrapper); if(null != rocketmqTransactionLog){ return RocketMQLocalTransactionState.COMMIT; } return RocketMQLocalTransactionState.ROLLBACK; } }
本地事務執(zhí)行邏輯
@Transactional(rollbackFor = RuntimeException.class) @Override public void changeStatuswithRocketMqLog(Integer id,String status,String transactionId){ orderMapper.changeStatus(id,status); rocketMqTransactionLogMapper.insert( RocketmqTransactionLog.builder() .transactionId(transactionId) .log("執(zhí)行刪除訂單操作") .build() ); }
修改訂單狀態(tài)為刪除狀態(tài),同時往事務日志表中插入一條事務日志,用@Transactional注解保證事務。
Account-Service消費消息
監(jiān)聽消息并處理給用戶增加余額邏輯
@Slf4j @Service @RocketMQMessageListener(topic = "add-amount",consumerGroup = "cloud-group") @RequiredArgsConstructor(onConstructor = @__(@Autowired) ) public class AddUserAmountListener implements RocketMQListener<UserAddMoneyDTO> { private final AccountMapper accountMapper; /** * 收到消息的業(yè)務邏輯 */ @Override public void onMessage(UserAddMoneyDTO userAddMoneyDTO) { log.info("received message: {}",userAddMoneyDTO); accountMapper.increaseAmount(userAddMoneyDTO.getUserCode(),userAddMoneyDTO.getAmount()); log.info("add money success"); } }
測試
測試數(shù)據(jù)
訂單表
用戶表
事務日志表
如果事務消息成功消費最終用戶表中jianzh5這個用戶的amount應該變成300(100+200)
測試準備
我們在執(zhí)行本地事務成功并需要通知消息隊列提交事務處打個斷點,然后在執(zhí)行到此處時手動模擬異常
模擬異常
在準備提交事務時我們通過命令taskkill /pid 10116 -t -f
命令強制殺掉OrderService進程。(先通過jps獲取OrderService進程ID)
重啟服務器,檢查是否會執(zhí)行回查方法
重啟OrderService程序會自動執(zhí)行回查方法,結合事務日志表判斷是否提交事務。
運行后的結果
小結
我們介紹了使用消息隊列實現(xiàn)柔性事務的方案,重點剖析了RocketMQ事務消息的原理,并通過Demo案例實現(xiàn)了分布式事務(柔性事務)。
到此這篇關于SpringCloud微服務開發(fā)基于RocketMQ實現(xiàn)分布式事務管理詳解的文章就介紹到這了,更多相關SpringCloud RocketMQ分布式事務內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
spring AOP的Around增強實現(xiàn)方法分析
這篇文章主要介紹了spring AOP的Around增強實現(xiàn)方法,結合實例形式分析了spring面向切面AOP的Around增強具體步驟與相關操作方法,需要的朋友可以參考下2020-01-01Mybatis批量插入大量數(shù)據(jù)的最優(yōu)方式總結
批量插入功能是我們日常工作中比較常見的業(yè)務功能之一,下面這篇文章主要給大家總結介紹了關于Mybatis批量插入大量數(shù)據(jù)的幾種最優(yōu)方式,文中通過實例代碼介紹的非常詳細,需要的朋友可以參考下2023-03-03詳解mybatis批量插入10萬條數(shù)據(jù)的優(yōu)化過程
這篇文章主要介紹了詳解mybatis批量插入10萬條數(shù)據(jù)的優(yōu)化過程,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2021-04-04spring boot項目application.properties文件存放及使用介紹
這篇文章主要介紹了spring boot項目application.properties文件存放及使用介紹,我們的application.properties文件中會有很多敏感信息,大家在使用過程中要多加小心2021-06-06Java?IO及BufferedReader.readline()出現(xiàn)的Bug
這篇文章主要介紹了Java?IO及BufferedReader.readline()出現(xiàn)的Bug,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12