RabbitMQ消費者限流實現(xiàn)消息處理優(yōu)化
目錄結(jié)構(gòu)

導(dǎo)入依賴
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.5.0</version>
</dependency>
</dependencies>修改yml
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
listener:
simple:
acknowledge-mode: manual # 手動確認(rèn)模式
prefetch: 1 # 每次消費僅1條消息
業(yè)務(wù)邏輯
為了驗證是否一定要手動確認(rèn)才能真正消費消息,如下我進行了測試:首先我先讓生產(chǎn)者生產(chǎn)兩條消息在隊列當(dāng)中,如下圖1所示。其次再看代碼邏輯。當(dāng)啟動了消費者代碼后入下圖2所示:雖然確確實實被限流了,有一條未確認(rèn)的消息,但當(dāng)我們關(guān)閉消費者端的應(yīng)用后,就又會變成圖1所示。

圖1
/**
* 消費者的限流機制
* 1、確保Ack機制為手動機制:acknowledge-mode: manual
* 2、每次消費消息的個數(shù):prefetch: 1 只有手動確認(rèn)完后才會拉取下一條消息
*/
@Component
public class QosListener implements ChannelAwareMessageListener {
@RabbitListener(queues = "test_queue_name")
@Override
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println("消費者接受的消息為:" + new String(message.getBody()));
}
}
圖2
所以可以當(dāng)我們再次改變業(yè)務(wù)邏輯:進行手動確認(rèn)后就可以發(fā)現(xiàn)消息確確實實被消費了,如圖3所示。要注意哈:第二個是否批量簽收參數(shù)表示的是開啟消費者后是否只會讀取一次消息,而消費者限流prefetch表示的是每次讀取只能為一條消息。兩者的概念是不一樣的。
/**
* 消費者的限流機制
* 1、確保Ack機制為手動機制:acknowledge-mode: manual
* 2、每次消費消息的個數(shù):prefetch: 1 只有手動確認(rèn)完后才會拉取下一條消息
*/
@Component
public class QosListener implements ChannelAwareMessageListener {
@RabbitListener(queues = "test_queue_name")
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Thread.sleep(5000);
long deliveryTag = message.getMessageProperties().getDeliveryTag();// 消息的唯一標(biāo)識id
System.out.println("消費者接受的消息為:" + new String(message.getBody()));
channel.basicAck(deliveryTag,true);//每5s讀一次消息(限流后每次為一條)
}
}到此這篇關(guān)于RabbitMQ消費者限流實現(xiàn)消息處理優(yōu)化的文章就介紹到這了,更多相關(guān)RabbitMQ消費者限流內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Synchronized?和?ReentrantLock?的實現(xiàn)原理及區(qū)別
這篇文章主要介紹了Synchronized?和?ReentrantLock?的實現(xiàn)原理及區(qū)別,文章為榮啊主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價值,需要的小伙伴可以參考一下2022-09-09
SpringBoot使用mybatis-plus分頁查詢無效的問題解決
MyBatis-Plus提供了很多便捷的功能,包括分頁查詢,本文主要介紹了SpringBoot使用mybatis-plus分頁查詢無效的問題解決,具有一定的參考價值,感興趣的可以了解一下2023-12-12
springboot如何接收復(fù)雜參數(shù)(同時接收J(rèn)SON與文件)
文章介紹了在Spring Boot中同時處理JSON和文件上傳時使用`@RequestPart`注解的方法,`@RequestPart`可以接收多種格式的參數(shù),包括JSON和文件,并且可以作為`multipart/form-data`格式中的key2025-02-02
SpringBoot同時支持HTTPS與HTTP的實現(xiàn)示例
本文主要介紹了SpringBoot同時支持HTTPS與HTTP的實現(xiàn)示例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07
Java中的關(guān)鍵字_動力節(jié)點Java學(xué)院整理
關(guān)鍵字也稱為保留字,是指Java語言中規(guī)定了特定含義的標(biāo)示符。對于保留字,用戶只能按照系統(tǒng)規(guī)定的方式使用,不能自行定義2017-04-04

