springboot整合rocketmq實現(xiàn)分布式事務(wù)
1 執(zhí)行流程

(1) 發(fā)送方向 MQ 服務(wù)端發(fā)送消息。
(2) MQ Server 將消息持久化成功之后,向發(fā)送方 ACK 確認消息已經(jīng)發(fā)送成功,此時消息為半消息。
(3) 發(fā)送方開始執(zhí)行本地事務(wù)邏輯。
(4) 發(fā)送方根據(jù)本地事務(wù)執(zhí)行結(jié)果向 MQ Server 提交二次確認(Commit 或是 Rollback),MQ Server 收到Commit 狀態(tài)則將半消息標記為可投遞,訂閱方最終將收到該消息;MQ Server 收到 Rollback 狀態(tài)則刪除半消息,訂閱方將不會接受該消息。
(5) 在斷網(wǎng)或者是應(yīng)用重啟的特殊情況下,上述步驟4提交的二次確認最終未到達 MQ Server,經(jīng)過固定時間后MQ Server 將對該消息發(fā)起消息回查。
(6) 發(fā)送方收到消息回查后,需要檢查對應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。
(7) 發(fā)送方根據(jù)檢查得到的本地事務(wù)的最終狀態(tài)再次提交二次確認,MQ Server 仍按照步驟4對半消息進行操作。
2 工程

2.1 pom
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.71</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.2</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.3.0.RELEASE</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
2.2 application.yml
rocketmq:
name-server: 192.168.38.50:9876
producer:
group: transcation-group
2.3 TransactionListenerImpl
@RocketMQTransactionListener(txProducerGroup = "transaction-producer-group")
@Slf4j
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
private static Map<String, RocketMQLocalTransactionState> STATE_MAP = new HashMap<>();
/**
* 執(zhí)行業(yè)務(wù)邏輯
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
String transId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
try {
System.out.println("用戶A賬戶減500元.");
System.out.println("用戶B賬戶加500元.");
STATE_MAP.put(transId, RocketMQLocalTransactionState.COMMIT);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
e.printStackTrace();
}
STATE_MAP.put(transId, RocketMQLocalTransactionState.ROLLBACK);
return RocketMQLocalTransactionState.UNKNOWN;
}
/**
* 回查
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
String transId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
log.info("回查消息 -> transId ={} , state = {}", transId, STATE_MAP.get(transId));
return STATE_MAP.get(transId);
}
}
2.4 SpringTransactionProducer
@Component
@Slf4j
public class SpringTransactionProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 發(fā)送消息
*
*/
public void sendMsg(String topic, String msg) {
Message<String> message = MessageBuilder.withPayload(msg).build();
this.rocketMQTemplate.sendMessageInTransaction("transaction-producer-group", topic, message, null);
log.info("發(fā)送成功");
}
}
2.5 SpringTxConsumer
@Component
@RocketMQMessageListener(topic = "pay_topic",
consumerGroup = "transaction-consumer-group",
selectorExpression = "*")
@Slf4j
public class SpringTxConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
log.info("接收到消息 -> {}", msg);
}
}
2.6 ProducerController
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private SpringTransactionProducer springTransactionProducer;
@GetMapping("/sendMsg")
public String sendMsg() {
springTransactionProducer.sendMsg("pay_topic", "用戶A賬戶減500元,用戶B賬戶加500元。");
return "發(fā)送成功";
}
}
2.7 RocketApplication
@SpringBootApplication
public class RocketApplication {
public static void main(String[] args) {
SpringApplication.run(RocketApplication.class);
}
}
3 測試
3.1 正常消費測試
描述: 正常啟動及可。


3.2 回查代碼測試
描述: 執(zhí)行本地事務(wù)時添加異常,重啟測試,發(fā)現(xiàn)消費者沒有收到消息。



到此這篇關(guān)于springboot整合rocketmq實現(xiàn)分布式事務(wù)的文章就介紹到這了,更多相關(guān)springboot 分布式事務(wù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot?@Transactional事務(wù)不生效排查方式
這篇文章主要介紹了SpringBoot?@Transactional事務(wù)不生效排查方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-01-01
spring event 事件異步處理方式(發(fā)布,監(jiān)聽,異步處理)
這篇文章主要介紹了spring event 事件異步處理方式(發(fā)布,監(jiān)聽,異步處理),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-02-02

