RabbitMQ消費者限流實現(xiàn)消息處理優(yōu)化
目錄結構

導入依賴
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.5.0</version>
</dependency>
</dependencies>修改yml
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
listener:
simple:
acknowledge-mode: manual # 手動確認模式
prefetch: 1 # 每次消費僅1條消息
業(yè)務邏輯
為了驗證是否一定要手動確認才能真正消費消息,如下我進行了測試:首先我先讓生產(chǎn)者生產(chǎn)兩條消息在隊列當中,如下圖1所示。其次再看代碼邏輯。當啟動了消費者代碼后入下圖2所示:雖然確確實實被限流了,有一條未確認的消息,但當我們關閉消費者端的應用后,就又會變成圖1所示。

圖1
/**
* 消費者的限流機制
* 1、確保Ack機制為手動機制:acknowledge-mode: manual
* 2、每次消費消息的個數(shù):prefetch: 1 只有手動確認完后才會拉取下一條消息
*/
@Component
public class QosListener implements ChannelAwareMessageListener {
@RabbitListener(queues = "test_queue_name")
@Override
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println("消費者接受的消息為:" + new String(message.getBody()));
}
}
圖2
所以可以當我們再次改變業(yè)務邏輯:進行手動確認后就可以發(fā)現(xiàn)消息確確實實被消費了,如圖3所示。要注意哈:第二個是否批量簽收參數(shù)表示的是開啟消費者后是否只會讀取一次消息,而消費者限流prefetch表示的是每次讀取只能為一條消息。兩者的概念是不一樣的。
/**
* 消費者的限流機制
* 1、確保Ack機制為手動機制:acknowledge-mode: manual
* 2、每次消費消息的個數(shù):prefetch: 1 只有手動確認完后才會拉取下一條消息
*/
@Component
public class QosListener implements ChannelAwareMessageListener {
@RabbitListener(queues = "test_queue_name")
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Thread.sleep(5000);
long deliveryTag = message.getMessageProperties().getDeliveryTag();// 消息的唯一標識id
System.out.println("消費者接受的消息為:" + new String(message.getBody()));
channel.basicAck(deliveryTag,true);//每5s讀一次消息(限流后每次為一條)
}
}到此這篇關于RabbitMQ消費者限流實現(xiàn)消息處理優(yōu)化的文章就介紹到這了,更多相關RabbitMQ消費者限流內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Synchronized?和?ReentrantLock?的實現(xiàn)原理及區(qū)別
這篇文章主要介紹了Synchronized?和?ReentrantLock?的實現(xiàn)原理及區(qū)別,文章為榮啊主題展開詳細的內(nèi)容介紹,具有一定的參考價值,需要的小伙伴可以參考一下2022-09-09
SpringBoot使用mybatis-plus分頁查詢無效的問題解決
MyBatis-Plus提供了很多便捷的功能,包括分頁查詢,本文主要介紹了SpringBoot使用mybatis-plus分頁查詢無效的問題解決,具有一定的參考價值,感興趣的可以了解一下2023-12-12
springboot如何接收復雜參數(shù)(同時接收JSON與文件)
文章介紹了在Spring Boot中同時處理JSON和文件上傳時使用`@RequestPart`注解的方法,`@RequestPart`可以接收多種格式的參數(shù),包括JSON和文件,并且可以作為`multipart/form-data`格式中的key2025-02-02
SpringBoot同時支持HTTPS與HTTP的實現(xiàn)示例
本文主要介紹了SpringBoot同時支持HTTPS與HTTP的實現(xiàn)示例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2022-07-07

