欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

springboot整合rocketmq實現(xiàn)分布式事務(wù)

 更新時間:2021年05月30日 09:29:01   作者:你攜秋月攬星河丶  
大多數(shù)情況下很多公司是使用消息隊列的方式實現(xiàn)分布式事務(wù)。 本篇文章重點講解springboot環(huán)境下整合rocketmq實現(xiàn)分布式事務(wù),感興趣的可以了解一下

1 執(zhí)行流程

在這里插入圖片描述

(1) 發(fā)送方向 MQ 服務(wù)端發(fā)送消息。
(2) MQ Server 將消息持久化成功之后,向發(fā)送方 ACK 確認消息已經(jīng)發(fā)送成功,此時消息為半消息。
(3) 發(fā)送方開始執(zhí)行本地事務(wù)邏輯。
(4) 發(fā)送方根據(jù)本地事務(wù)執(zhí)行結(jié)果向 MQ Server 提交二次確認(Commit 或是 Rollback),MQ Server 收到Commit 狀態(tài)則將半消息標記為可投遞,訂閱方最終將收到該消息;MQ Server 收到 Rollback 狀態(tài)則刪除半消息,訂閱方將不會接受該消息。
(5) 在斷網(wǎng)或者是應(yīng)用重啟的特殊情況下,上述步驟4提交的二次確認最終未到達 MQ Server,經(jīng)過固定時間后MQ Server 將對該消息發(fā)起消息回查。
(6) 發(fā)送方收到消息回查后,需要檢查對應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。
(7) 發(fā)送方根據(jù)檢查得到的本地事務(wù)的最終狀態(tài)再次提交二次確認,MQ Server 仍按照步驟4對半消息進行操作。

2 工程

在這里插入圖片描述

2.1 pom

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.71</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-collections4</artifactId>
            <version>4.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.3.2</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.3.0.RELEASE</version>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

2.2 application.yml

rocketmq:
  name-server: 192.168.38.50:9876
  producer:
    group: transcation-group

2.3 TransactionListenerImpl

@RocketMQTransactionListener(txProducerGroup = "transaction-producer-group")
@Slf4j
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {

    private static Map<String, RocketMQLocalTransactionState> STATE_MAP = new HashMap<>();

    /**
     *  執(zhí)行業(yè)務(wù)邏輯
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        String transId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
        try {
            System.out.println("用戶A賬戶減500元.");
            System.out.println("用戶B賬戶加500元.");
            STATE_MAP.put(transId, RocketMQLocalTransactionState.COMMIT);
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            e.printStackTrace();
        }

        STATE_MAP.put(transId, RocketMQLocalTransactionState.ROLLBACK);
        return RocketMQLocalTransactionState.UNKNOWN;

    }

    /**
     * 回查
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        String transId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
        log.info("回查消息 -> transId ={} , state = {}", transId, STATE_MAP.get(transId));
        return STATE_MAP.get(transId);
    }
}

2.4 SpringTransactionProducer

@Component
@Slf4j
public class SpringTransactionProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 發(fā)送消息
     *
     */
    public void sendMsg(String topic, String msg) {
        Message<String> message = MessageBuilder.withPayload(msg).build();
        this.rocketMQTemplate.sendMessageInTransaction("transaction-producer-group", topic, message, null);
        log.info("發(fā)送成功");
    }
}

2.5 SpringTxConsumer

@Component
@RocketMQMessageListener(topic = "pay_topic",
        consumerGroup = "transaction-consumer-group",
        selectorExpression = "*")
@Slf4j
public class SpringTxConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String msg) {
        log.info("接收到消息 -> {}", msg);
    }
}

2.6 ProducerController

@RestController
@RequestMapping("/producer")
public class ProducerController {

    @Autowired
    private SpringTransactionProducer springTransactionProducer;

    @GetMapping("/sendMsg")
    public String sendMsg() {
        springTransactionProducer.sendMsg("pay_topic", "用戶A賬戶減500元,用戶B賬戶加500元。");
        return "發(fā)送成功";
    }

}

2.7 RocketApplication

@SpringBootApplication
public class RocketApplication {

    public static void main(String[] args) {
        SpringApplication.run(RocketApplication.class);
    }

}

3 測試

3.1 正常消費測試

描述: 正常啟動及可。

在這里插入圖片描述

在這里插入圖片描述

3.2 回查代碼測試

描述: 執(zhí)行本地事務(wù)時添加異常,重啟測試,發(fā)現(xiàn)消費者沒有收到消息。

在這里插入圖片描述

在這里插入圖片描述

在這里插入圖片描述

到此這篇關(guān)于springboot整合rocketmq實現(xiàn)分布式事務(wù)的文章就介紹到這了,更多相關(guān)springboot 分布式事務(wù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java8新特性O(shè)ptional常用方法

    Java8新特性O(shè)ptional常用方法

    optional類是Java8新增加的一個對象容器,主要的功能有對象的創(chuàng)建、獲取、判斷、過濾,映射等,下面這篇文章主要給大家介紹了關(guān)于Java8新特性O(shè)ptional常用方法的相關(guān)資料,需要的朋友可以參考下
    2024-02-02
  • Hashmap非線程安全關(guān)于hash值沖突處理

    Hashmap非線程安全關(guān)于hash值沖突處理

    這篇文章主要為大家介紹了Hashmap非線程安全關(guān)于hash值沖突的處理,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-04-04
  • SpringBoot?@Transactional事務(wù)不生效排查方式

    SpringBoot?@Transactional事務(wù)不生效排查方式

    這篇文章主要介紹了SpringBoot?@Transactional事務(wù)不生效排查方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-01-01
  • Java架構(gòu)師的5大基本能力你知道嗎

    Java架構(gòu)師的5大基本能力你知道嗎

    這篇文章主要為大家介紹了Java架構(gòu)師的基本能力,具有一定的參考價值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助<BR>
    2022-01-01
  • spring event 事件異步處理方式(發(fā)布,監(jiān)聽,異步處理)

    spring event 事件異步處理方式(發(fā)布,監(jiān)聽,異步處理)

    這篇文章主要介紹了spring event 事件異步處理方式(發(fā)布,監(jiān)聽,異步處理),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-02-02
  • 淺談Java編程中的synthetic關(guān)鍵字

    淺談Java編程中的synthetic關(guān)鍵字

    這篇文章主要介紹了淺談Java編程中的synthetic關(guān)鍵字的相關(guān)內(nèi)容,包括其簡單的介紹和實例,需要的朋友可以了解下。
    2017-09-09
  • java實現(xiàn)大文件分割與合并的實例代碼

    java實現(xiàn)大文件分割與合并的實例代碼

    java實現(xiàn)大文件分割與合并的實例代碼,需要的朋友可以參考一下
    2013-03-03
  • RabbitMQ之死信隊列深入解析

    RabbitMQ之死信隊列深入解析

    這篇文章主要介紹了RabbitMQ之死信隊列深入解析,?死信,顧名思義就是無法被消費的消息,字面意思可以這樣理解,一般來說,producer將消息投遞到 broker 或者直接到 queue 里了,consumer 從 queue 取消息進行消費,需要的朋友可以參考下
    2023-09-09
  • SpringBoot詳細講解yaml配置文件的用法

    SpringBoot詳細講解yaml配置文件的用法

    這篇文章主要介紹了SpringBoot中的yaml配置文件問題,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2022-06-06
  • 淺析Java編程中枚舉類型的定義與使用

    淺析Java編程中枚舉類型的定義與使用

    這篇文章主要介紹了Java編程中枚舉類型的定義與使用,簡單講解了enum關(guān)鍵字與枚舉類的用法,需要的朋友可以參考下
    2016-05-05

最新評論