欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java中RocketMQ使用方法詳解

 更新時(shí)間:2025年02月24日 09:18:28   作者:莫凡的博客  
這篇文章主要介紹了RocketMQ和Kafka在SpringBoot中的使用方法,以及如何保證消息隊(duì)列的順序性、可靠性以及冪等性,文中通過(guò)代碼介紹的非常詳細(xì),需要的朋友可以參考下

在 Spring Boot 中,RocketMQ 和 Kafka 都是常用的消息中間件,它們的使用方法有一些相似之處,也有各自的特點(diǎn)。

一、RocketMQ 在 Spring Boot 中的使用

  • 引入依賴

    • 在項(xiàng)目的pom.xml文件中添加 RocketMQ 的依賴。
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.3</version>
    </dependency>
    
  • 配置 RocketMQ

    • application.propertiesapplication.yml文件中配置 RocketMQ 的相關(guān)參數(shù),如 namesrvAddr(NameServer 地址)等。
    rocketmq.name-server=127.0.0.1:9876
    
  • 生產(chǎn)者

    • 創(chuàng)建一個(gè)生產(chǎn)者類(lèi),使用@Resource注入RocketMQTemplate。
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class RocketMQProducer {
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        public void sendMessage(String topic, String message) {
            rocketMQTemplate.convertAndSend(topic, message);
        }
    }
    
  • 消費(fèi)者

    • 創(chuàng)建一個(gè)消費(fèi)者類(lèi),使用@RocketMQMessageListener注解指定監(jiān)聽(tīng)的主題和消費(fèi)組。
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RocketMQMessageListener(topic = "your_topic", consumerGroup = "your_consumer_group")
    public class RocketMQConsumer implements RocketMQListener<String> {
        @Override
        public void onMessage(String message) {
            // 處理接收到的消息
            System.out.println("Received message: " + message);
        }
    }
    

二、Kafka 在 Spring Boot 中的使用

  • 引入依賴

    • pom.xml文件中添加 Kafka 的依賴。
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.8.12</version>
    </dependency>
    
  • 配置 Kafka

    • application.propertiesapplication.yml文件中配置 Kafka 的相關(guān)參數(shù),如 bootstrapServers(Kafka 服務(wù)器地址)等。
    spring.kafka.bootstrap-servers=127.0.0.1:9092
    
  • 生產(chǎn)者

    • 創(chuàng)建一個(gè)生產(chǎn)者類(lèi),使用@Resource注入KafkaTemplate
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;
    
    @Component
    public class KafkaProducer {
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        public void sendMessage(String topic, String message) {
            kafkaTemplate.send(topic, message);
        }
    }
    
  • 消費(fèi)者

    • 創(chuàng)建一個(gè)消費(fèi)者類(lèi),使用@KafkaListener注解指定監(jiān)聽(tīng)的主題和消費(fèi)組。
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class KafkaConsumer {
        @KafkaListener(topics = "your_topic", groupId = "your_consumer_group")
        public void onMessage(String message) {
            // 處理接收到的消息
            System.out.println("Received message: " + message);
        }
    }
    

總的來(lái)說(shuō),RocketMQ 和 Kafka 在 Spring Boot 中的使用都比較方便,具體選擇哪種消息中間件可以根據(jù)項(xiàng)目的實(shí)際需求來(lái)決定。RocketMQ 在一些場(chǎng)景下可能具有高吞吐量、低延遲等優(yōu)勢(shì),而 Kafka 則在大規(guī)模分布式系統(tǒng)中被廣泛應(yīng)用,具有高可靠性和可擴(kuò)展性。

三、如何保證消息隊(duì)列順序性

1、發(fā)送端保證順序性

  • 合理設(shè)計(jì)業(yè)務(wù)

    • 確保具有順序性要求的消息被發(fā)送到同一個(gè)主題(Topic)的同一個(gè)隊(duì)列(Queue)中。比如,將同一類(lèi)業(yè)務(wù)的消息按照特定規(guī)則進(jìn)行分類(lèi),使得它們都進(jìn)入相同的隊(duì)列。
    • 一個(gè)業(yè)務(wù)場(chǎng)景的消息盡量由一個(gè)發(fā)送端來(lái)發(fā)送消息,避免多個(gè)發(fā)送端發(fā)送可能導(dǎo)致的亂序。
  • 使用同步發(fā)送

    • 在發(fā)送消息時(shí),使用同步發(fā)送方式send(Message msg, long timeout),確保消息成功發(fā)送后再進(jìn)行下一個(gè)消息的發(fā)送。這樣可以避免異步發(fā)送可能導(dǎo)致的消息亂序情況。

2、消費(fèi)端保證順序性

  • 單線程消費(fèi)

    • 消費(fèi)者在消費(fèi)消息時(shí),采用單線程的方式進(jìn)行消費(fèi)。這樣可以確保同一隊(duì)列中的消息按照發(fā)送的順序被依次處理。
    @Component
    @RocketMQMessageListener(topic = "your_topic", consumerGroup = "your_consumer_group")
    public class RocketMQConsumer implements RocketMQListener<String> {
        @Override
        public void onMessage(String message) {
            // 處理接收到的消息
            System.out.println("Received message: " + message);
        }
    }
    

    在實(shí)際應(yīng)用中,可以將消費(fèi)邏輯放在一個(gè)單獨(dú)的方法中,然后在這個(gè)方法中進(jìn)行順序處理,確保消息的順序性。

  • 避免并發(fā)處理

    • 確保在消費(fèi)消息的過(guò)程中,不會(huì)出現(xiàn)并發(fā)處理的情況。比如,不要在消費(fèi)消息的同時(shí)啟動(dòng)其他異步任務(wù)或者多線程處理,以免破壞消息的順序性。

3、設(shè)置隊(duì)列數(shù)量

  • 控制隊(duì)列數(shù)量
    • 如果業(yè)務(wù)對(duì)消息順序性要求非常嚴(yán)格,可以考慮減少主題下的隊(duì)列數(shù)量。通常情況下,一個(gè)主題可以包含多個(gè)隊(duì)列,消息會(huì)被隨機(jī)分發(fā)到不同的隊(duì)列中。如果隊(duì)列數(shù)量較少,那么消息更有可能被發(fā)送到同一個(gè)隊(duì)列中,從而更容易保證順序性。

通過(guò)以上方法,可以在一定程度上保證 RocketMQ 消息的順序性。但需要注意的是,保證消息順序性可能會(huì)犧牲一定的性能和吞吐量,因此需要根據(jù)實(shí)際業(yè)務(wù)需求進(jìn)行權(quán)衡和選擇。

四、如何確保消息隊(duì)列的可靠性

1、發(fā)送端

  • 同步發(fā)送與確認(rèn)

    • 使用同步發(fā)送方式send(Message msg, long timeout),該方法會(huì)等待消息發(fā)送成功的確認(rèn),確保消息被正確地發(fā)送到 Broker。如果發(fā)送失敗或超時(shí),可以進(jìn)行重試或其他錯(cuò)誤處理操作。
    try {
        SendResult sendResult = rocketMQTemplate.syncSend(topic, message);
        System.out.println("Message sent successfully: " + sendResult);
    } catch (Exception e) {
        System.out.println("Failed to send message: " + e.getMessage());
        // 進(jìn)行重試或其他錯(cuò)誤處理
    }
    
  • 事務(wù)消息

    • 對(duì)于一些需要保證事務(wù)一致性的場(chǎng)景,可以使用 RocketMQ 的事務(wù)消息機(jī)制。發(fā)送事務(wù)消息分為兩個(gè)階段,首先發(fā)送半事務(wù)消息,然后執(zhí)行本地事務(wù),根據(jù)本地事務(wù)的結(jié)果決定提交或回滾事務(wù)消息。
    @Service
    public class TransactionProducer {
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        public void sendTransactionMessage() {
            TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("transactionTopic", new Message<>("transactionMessage"), null);
            System.out.println("Transaction message sent: " + result);
        }
    }
    

2、Broker 端

  • 持久化存儲(chǔ)

    • RocketMQ 支持消息的持久化存儲(chǔ),可以將消息存儲(chǔ)在磁盤(pán)上,以防止消息丟失。通過(guò)配置broker.conf文件中的flushDiskType參數(shù),可以選擇同步刷盤(pán)或異步刷盤(pán)方式。同步刷盤(pán)可以保證消息在寫(xiě)入磁盤(pán)后才返回成功響應(yīng),但會(huì)影響性能;異步刷盤(pán)可以提高性能,但在系統(tǒng)故障時(shí)可能會(huì)丟失部分未刷盤(pán)的消息。
  • 高可用部署

    • 部署多主多從的 RocketMQ 集群,當(dāng)主節(jié)點(diǎn)出現(xiàn)故障時(shí),從節(jié)點(diǎn)可以自動(dòng)切換為主節(jié)點(diǎn),保證消息服務(wù)的可用性。同時(shí),可以配置主從同步方式,確保消息在主從節(jié)點(diǎn)之間的可靠同步。

3、消費(fèi)端

  • 消費(fèi)確認(rèn)

    • 消費(fèi)者在成功處理消息后,需要向 Broker 發(fā)送消費(fèi)確認(rèn)??梢酝ㄟ^(guò)設(shè)置consumeModeCONSUME_PASSIVELY(被動(dòng)消費(fèi)模式),并在處理完消息后手動(dòng)調(diào)用acknowledge()方法進(jìn)行確認(rèn)。如果消費(fèi)失敗,可以選擇重試或者將消息發(fā)送到死信隊(duì)列進(jìn)行后續(xù)處理。
    @Component
    @RocketMQMessageListener(topic = "your_topic", consumerGroup = "your_consumer_group")
    public class RocketMQConsumer implements RocketMQListener<String> {
        @Override
        public void onMessage(String message) {
            try {
                // 處理消息
                System.out.println("Received message: " + message);
                // 確認(rèn)消費(fèi)成功
                getRocketMQListenerContainer().acknowledge();
            } catch (Exception e) {
                System.out.println("Failed to process message: " + e.getMessage());
                // 可以選擇重試或者發(fā)送到死信隊(duì)列
            }
        }
    }
    
  • 重試機(jī)制

    • 配置消費(fèi)者的重試次數(shù)和重試時(shí)間間隔,當(dāng)消費(fèi)失敗時(shí),RocketMQ 會(huì)自動(dòng)進(jìn)行重試。可以在application.propertiesapplication.yml中配置rocketmq.retry.timesrocketmq.retry.interval參數(shù)來(lái)控制重試策略。

通過(guò)以上措施,可以在不同階段保證 RocketMQ 消息的可靠性,確保消息在生產(chǎn)、存儲(chǔ)和消費(fèi)過(guò)程中不會(huì)丟失或出現(xiàn)錯(cuò)誤。

五、保證消息處理的冪等性

在 RocketMQ 中,可以通過(guò)以下幾種方式來(lái)保證消息處理的冪等性:

1、業(yè)務(wù)層面設(shè)計(jì)

  • 使用唯一標(biāo)識(shí)

    • 在業(yè)務(wù)中為每條消息生成一個(gè)唯一的標(biāo)識(shí),比如使用業(yè)務(wù)流水號(hào)、訂單號(hào)等作為消息的唯一標(biāo)識(shí)。在消費(fèi)消息時(shí),先根據(jù)這個(gè)唯一標(biāo)識(shí)判斷該消息是否已經(jīng)被處理過(guò)。如果已經(jīng)處理過(guò),則直接忽略該消息。
    • 例如在電商系統(tǒng)中,訂單創(chuàng)建的消息可以使用訂單號(hào)作為唯一標(biāo)識(shí)。消費(fèi)者在處理消息時(shí),先查詢數(shù)據(jù)庫(kù)中是否存在該訂單號(hào)對(duì)應(yīng)的處理記錄,如果存在則說(shuō)明該消息已經(jīng)被處理過(guò),不再重復(fù)處理。
    @Service
    public class OrderProcessingService {
        @Autowired
        private JdbcTemplate jdbcTemplate;
    
        public void processOrderMessage(String orderId) {
            boolean isProcessed = isOrderProcessed(orderId);
            if (isProcessed) {
                return;
            }
            // 處理訂單邏輯
            System.out.println("Processing order: " + orderId);
            markOrderAsProcessed(orderId);
        }
    
        private boolean isOrderProcessed(String orderId) {
            int count = jdbcTemplate.queryForObject(
                    "SELECT COUNT(*) FROM processed_orders WHERE order_id =?",
                    Integer.class, orderId);
            return count > 0;
        }
    
        private void markOrderAsProcessed(String orderId) {
            jdbcTemplate.update(
                    "INSERT INTO processed_orders (order_id) VALUES (?)",
                    orderId);
        }
    }
    
  • 利用數(shù)據(jù)庫(kù)約束

    • 可以在數(shù)據(jù)庫(kù)中使用唯一索引、主鍵約束等方式來(lái)保證業(yè)務(wù)數(shù)據(jù)的唯一性。在處理消息時(shí),如果違反了這些約束,則說(shuō)明該消息已經(jīng)被處理過(guò),不再重復(fù)處理。
    • 比如在用戶注冊(cè)的場(chǎng)景中,可以在數(shù)據(jù)庫(kù)的用戶表中使用用戶名或郵箱作為唯一索引。當(dāng)消費(fèi)用戶注冊(cè)的消息時(shí),嘗試插入用戶數(shù)據(jù),如果插入失敗(因?yàn)檫`反唯一索引約束),則說(shuō)明該用戶已經(jīng)注冊(cè)過(guò),不再重復(fù)處理。
    @Service
    public class UserRegistrationService {
        @Autowired
        private JdbcTemplate jdbcTemplate;
    
        public void registerUser(String username, String password) {
            try {
                jdbcTemplate.update(
                        "INSERT INTO users (username, password) VALUES (?,?)",
                        username, password);
            } catch (DataIntegrityViolationException e) {
                // 處理插入失敗的情況,可能是用戶已存在
                System.out.println("User already exists: " + username);
            }
        }
    }
    

2、技術(shù)層面實(shí)現(xiàn)

  • 分布式鎖
    • 可以使用分布式鎖來(lái)保證同一時(shí)間只有一個(gè)消費(fèi)者實(shí)例在處理特定的消息。在處理消息之前,先獲取分布式鎖,如果獲取成功則處理消息,處理完成后釋放鎖。如果獲取鎖失敗,則說(shuō)明該消息正在被其他實(shí)例處理,當(dāng)前實(shí)例可以選擇等待或者直接忽略該消息。
    • 可以使用 Redis 或 Zookeeper 等實(shí)現(xiàn)分布式鎖。以 Redis 為例,可以使用 SETNX 命令來(lái)實(shí)現(xiàn)分布式鎖。
    @Service
    public class MessageProcessingService {
        @Autowired
        private StringRedisTemplate redisTemplate;
    
        public void processMessage(String messageId) {
            String lockKey = "message_lock_" + messageId;
            boolean locked = tryLock(lockKey);
            if (!locked) {
                return;
            }
            try {
                boolean isProcessed = isMessageProcessed(messageId);
                if (isProcessed) {
                    return;
                }
                // 處理消息邏輯
                System.out.println("Processing message: " + messageId);
                markMessageAsProcessed(messageId);
            } finally {
                releaseLock(lockKey);
            }
        }
    
        private boolean tryLock(String key) {
            return redisTemplate.opsForValue().setIfAbsent(key, "locked", Duration.ofSeconds(30));
        }
    
        private void releaseLock(String key) {
            redisTemplate.delete(key);
        }
    
        private boolean isMessageProcessed(String messageId) {
            // 判斷消息是否已處理的邏輯
            return false;
        }
    
        private void markMessageAsProcessed(String messageId) {
            // 標(biāo)記消息已處理的邏輯
        }
    }
    

通過(guò)以上方法,可以有效地保證 RocketMQ 消息處理的冪等性,避免因重復(fù)消費(fèi)消息而導(dǎo)致的業(yè)務(wù)數(shù)據(jù)不一致問(wèn)題。

總結(jié)

到此這篇關(guān)于Java中RocketMQ使用方法的文章就介紹到這了,更多相關(guān)Java RocketMQ使用內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 微信小程序與Java后端接口交互

    微信小程序與Java后端接口交互

    本文主要介紹了微信小程序與Java后端接口交互,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2022-07-07
  • 解決IDEA報(bào)錯(cuò),無(wú)效的源發(fā)行版 無(wú)效的目標(biāo)發(fā)行版:22問(wèn)題

    解決IDEA報(bào)錯(cuò),無(wú)效的源發(fā)行版 無(wú)效的目標(biāo)發(fā)行版:22問(wèn)題

    在項(xiàng)目編譯過(guò)程中,可能會(huì)出現(xiàn)“無(wú)效的源發(fā)行版”或“無(wú)效的目標(biāo)發(fā)行版”的報(bào)錯(cuò)信息,原因通常是編譯使用的JDK版本與項(xiàng)目設(shè)置的發(fā)布版本不一致,解決這類(lèi)問(wèn)題的辦法是統(tǒng)一JDK版本,具體操作為:在IDE的項(xiàng)目設(shè)置中(如File->ProjectStructure->ProjectSettings)
    2024-10-10
  • mybatis一對(duì)多方式實(shí)現(xiàn)批量插入

    mybatis一對(duì)多方式實(shí)現(xiàn)批量插入

    這篇文章主要介紹了mybatis一對(duì)多方式實(shí)現(xiàn)批量插入,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-11-11
  • 501 Command "HELO" requires an argument問(wèn)題的解決方法

    501 Command "HELO" requires an argument問(wèn)題的解決方法

    換一個(gè)windows服務(wù)器,發(fā)現(xiàn)就沒(méi)這樣的問(wèn)題,僅在一臺(tái)Linux服務(wù)器上可以重現(xiàn),直觀感覺(jué)就是這臺(tái)Linux服務(wù)器某些配置有問(wèn)題
    2013-08-08
  • Mybatis中特殊SQL的執(zhí)行

    Mybatis中特殊SQL的執(zhí)行

    這篇文章主要介紹了Mybatis中特殊SQL的執(zhí)行,介紹內(nèi)容包括模糊查詢、批量刪除、動(dòng)態(tài)設(shè)置表名、添加功能獲取自增的主鍵等相關(guān)資料,需要的小伙伴可以參考一下
    2022-04-04
  • Java去掉字符串最后一個(gè)逗號(hào)的方法

    Java去掉字符串最后一個(gè)逗號(hào)的方法

    Java中去掉字符串的最后一個(gè)逗號(hào)有多種實(shí)現(xiàn)方法,不同的方法適用于不同的場(chǎng)景,本文通過(guò)實(shí)例代碼介紹Java去掉字符串最后一個(gè)逗號(hào)的相關(guān)知識(shí),感興趣的朋友一起看看吧
    2023-12-12
  • Java中ArrayList類(lèi)詳細(xì)介紹

    Java中ArrayList類(lèi)詳細(xì)介紹

    這篇文章主要介紹了Java中ArrayList類(lèi)詳細(xì)介紹的相關(guān)資料,需要的朋友可以參考下
    2017-04-04
  • MyBatis-plus中的模糊查詢解讀

    MyBatis-plus中的模糊查詢解讀

    這篇文章主要介紹了MyBatis-plus中的模糊查詢解讀,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-05-05
  • springBoot的日志文件詳解

    springBoot的日志文件詳解

    日志是程序的重要組成部分,主要可以用來(lái)定位和排查問(wèn)題,在程序中進(jìn)行自定義日志輸出的時(shí)候,也通常是借助于SLF4J框架來(lái)輸出日志,本文給大家分享springBoot的日志文件相關(guān)知識(shí),感興趣的朋友一起看看吧
    2024-06-06
  • SpringBoot中的統(tǒng)一異常處理詳細(xì)解析

    SpringBoot中的統(tǒng)一異常處理詳細(xì)解析

    這篇文章主要介紹了SpringBoot中的統(tǒng)一異常處理詳細(xì)解析,該注解可以把異常處理器應(yīng)用到所有控制器,而不是單個(gè)控制器,借助該注解,我們可以實(shí)現(xiàn):在獨(dú)立的某個(gè)地方,比如單獨(dú)一個(gè)類(lèi),定義一套對(duì)各種異常的處理機(jī)制,需要的朋友可以參考下
    2024-01-01

最新評(píng)論