RabbitMQ高級應(yīng)用之消費端限流策略basicQos詳解
業(yè)務(wù)場景
高并發(fā)情況下,隊列里面一瞬間就就積累了上萬條數(shù)據(jù),但是消費者無法同時處理這么多請求,這個時候當(dāng)我們打開客戶端,瞬間就有巨量的信息給推送過來
但是客戶端是沒有辦法同時處理這么多數(shù)據(jù)的,結(jié)果就是消費者(客戶端)掛掉了…
這種場景下我們就需要對消費端進行限流
限流策略實現(xiàn)
限流策略關(guān)鍵代碼:
channel.basicQos();
編寫生產(chǎn)者
// 生產(chǎn)者
public class Producer {
private static final String QUEUE_NAME = "queue_limit_1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 100; i++) {
channel.basicPublish("", QUEUE_NAME, null, ("消費端限流策略—測試數(shù)據(jù):" + i).getBytes());
}
channel.close();
connection.close();
}
}編寫消費者1
// 消費者1
public class Consumer {
private static final String QUEUE_NAME = "queue_limit_1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者1接收到信息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
}
}編寫消費者2
// 消費者2
public class Consumer2 {
private static final String QUEUE_NAME = "queue_limit_1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/** 設(shè)置限流機制
* param1: prefetchSize,消息本身的大小 如果設(shè)置為0 那么表示對消息本身的大小不限制
* param2: prefetchCount,告訴rabbitmq不要一次性給消費者推送大于N個消息
* param3:global,是否將上面的設(shè)置應(yīng)用于整個通道,false表示只應(yīng)用于當(dāng)前消費者
*/
channel.basicQos(0, 5, false);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者2接收到信息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
}
}運行結(jié)果


小結(jié)
- 限流的核心代碼就是
channel.basicQos(); - 限流情況 ack 不能設(shè)置自動簽收,一定要手動簽收 channel.basicQos()
/**
* @param prefetchSize maximum amount of content (measured in
* octets) that the server will deliver, 0 if unlimited
* @param prefetchCount maximum number of messages that the server
* will deliver, 0 if unlimited
* @param global true if the settings should be applied to the
* entire channel rather than each consumer
*/
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;該方法的作用是:進行消費端的限流
- param1:prefetchSize,消息本身的大小 如果設(shè)置為0 那么表示對消息本身的大小不限制
- param2:prefetchCount,告訴rabbitmq不要一次性給消費者推送大于N個消息
- param3:global,是否將上面的設(shè)置應(yīng)用于整個通道
- false:表示只應(yīng)用于當(dāng)前消費者
- true:表示當(dāng)前通道的所有消費者都應(yīng)用這個限流策略
到此這篇關(guān)于RabbitMQ高級應(yīng)用之消費端限流策略basicQos詳解的文章就介紹到這了,更多相關(guān)RabbitMQ消費端限流策略basicQos內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot使用validation-api實現(xiàn)對枚舉類參數(shù)校驗的方法
這篇文章主要介紹了SpringBoot使用validation-api實現(xiàn)對枚舉類參數(shù)校驗,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-11-11
springboot結(jié)合vue實現(xiàn)增刪改查及分頁查詢
本文主要介紹了springboot結(jié)合vue實現(xiàn)增刪改查及分頁查詢,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-09-09
使用Apache Ignite實現(xiàn)Java數(shù)據(jù)網(wǎng)格
今天我們來探討如何使用Apache Ignite來實現(xiàn)Java數(shù)據(jù)網(wǎng)格,Apache Ignite是一個高性能的內(nèi)存計算平臺,它提供了分布式緩存、數(shù)據(jù)網(wǎng)格和計算功能,可以顯著提高大規(guī)模應(yīng)用的數(shù)據(jù)處理性能,感興趣的小伙伴跟著小編一起來看看吧2024-08-08
SpringBoot2 task scheduler 定時任務(wù)調(diào)度器四種方式
這篇文章主要介紹了SpringBoot2 task scheduler 定時任務(wù)調(diào)度器四種方式,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-03-03

