RabbitMQ消費者限流實現(xiàn)消息處理優(yōu)化
目錄結(jié)構(gòu)
導(dǎo)入依賴
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.5.0</version> </dependency> </dependencies>
修改yml
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
listener:
simple:
acknowledge-mode: manual # 手動確認模式
prefetch: 1 # 每次消費僅1條消息
業(yè)務(wù)邏輯
為了驗證是否一定要手動確認才能真正消費消息,如下我進行了測試:首先我先讓生產(chǎn)者生產(chǎn)兩條消息在隊列當中,如下圖1所示。其次再看代碼邏輯。當啟動了消費者代碼后入下圖2所示:雖然確確實實被限流了,有一條未確認的消息,但當我們關(guān)閉消費者端的應(yīng)用后,就又會變成圖1所示。
圖1
/** * 消費者的限流機制 * 1、確保Ack機制為手動機制:acknowledge-mode: manual * 2、每次消費消息的個數(shù):prefetch: 1 只有手動確認完后才會拉取下一條消息 */ @Component public class QosListener implements ChannelAwareMessageListener { @RabbitListener(queues = "test_queue_name") @Override public void onMessage(Message message, Channel channel) throws Exception { System.out.println("消費者接受的消息為:" + new String(message.getBody())); } }
圖2
所以可以當我們再次改變業(yè)務(wù)邏輯:進行手動確認后就可以發(fā)現(xiàn)消息確確實實被消費了,如圖3所示。要注意哈:第二個是否批量簽收參數(shù)表示的是開啟消費者后是否只會讀取一次消息,而消費者限流prefetch表示的是每次讀取只能為一條消息。兩者的概念是不一樣的。
/** * 消費者的限流機制 * 1、確保Ack機制為手動機制:acknowledge-mode: manual * 2、每次消費消息的個數(shù):prefetch: 1 只有手動確認完后才會拉取下一條消息 */ @Component public class QosListener implements ChannelAwareMessageListener { @RabbitListener(queues = "test_queue_name") @Override public void onMessage(Message message, Channel channel) throws Exception { Thread.sleep(5000); long deliveryTag = message.getMessageProperties().getDeliveryTag();// 消息的唯一標識id System.out.println("消費者接受的消息為:" + new String(message.getBody())); channel.basicAck(deliveryTag,true);//每5s讀一次消息(限流后每次為一條) } }
到此這篇關(guān)于RabbitMQ消費者限流實現(xiàn)消息處理優(yōu)化的文章就介紹到這了,更多相關(guān)RabbitMQ消費者限流內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Synchronized?和?ReentrantLock?的實現(xiàn)原理及區(qū)別
這篇文章主要介紹了Synchronized?和?ReentrantLock?的實現(xiàn)原理及區(qū)別,文章為榮啊主題展開詳細的內(nèi)容介紹,具有一定的參考價值,需要的小伙伴可以參考一下2022-09-09SpringBoot使用mybatis-plus分頁查詢無效的問題解決
MyBatis-Plus提供了很多便捷的功能,包括分頁查詢,本文主要介紹了SpringBoot使用mybatis-plus分頁查詢無效的問題解決,具有一定的參考價值,感興趣的可以了解一下2023-12-12springboot如何接收復(fù)雜參數(shù)(同時接收JSON與文件)
文章介紹了在Spring Boot中同時處理JSON和文件上傳時使用`@RequestPart`注解的方法,`@RequestPart`可以接收多種格式的參數(shù),包括JSON和文件,并且可以作為`multipart/form-data`格式中的key2025-02-02SpringBoot同時支持HTTPS與HTTP的實現(xiàn)示例
本文主要介紹了SpringBoot同時支持HTTPS與HTTP的實現(xiàn)示例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2022-07-07Java中的關(guān)鍵字_動力節(jié)點Java學院整理
關(guān)鍵字也稱為保留字,是指Java語言中規(guī)定了特定含義的標示符。對于保留字,用戶只能按照系統(tǒng)規(guī)定的方式使用,不能自行定義2017-04-04