RabbitMQ實現消費端限流的步驟
概述
在 RabbitMQ 中,可以通過消費者端限流(Consumer Prefetch)來控制消費端處理消息的速度,以避免消費端處理能力不足或處理過慢而導致消息堆積。消費者端限流的主要目的是控制消費者每次從 RabbitMQ 中獲取的消息數量,從而實現消息處理的流量控制。
RabbitMQ 提供了一種 QOS(服務質量保證)功能,即在非自動確認消息的前提下,如果一定數目的消息還未被消費確認,則不進行新消息的消費。
RabbitMQ 為我們提供了三種機制
● 對內存和磁盤使用量設置閾值
● 于credit flow 的流控機制
● QoS保證機制
channel.basicQos()
channel.basicQos(int prefetchSize,int prefetchCount,boolean global)
一定要注意的是,如果做限流,那么no_ask是要設置為false,也就是手工簽收而不是自動簽收的情況下才可以做限流。
參數:
prefetchSize:消息的大小
prefetchCount:會告訴RabbitMQ不要同時給一個消費者推送多于N個消息,即一旦有N個消息還沒有ack,則該consumer將block掉,直到有消息ack
global:是否將上面設置應用于channel,簡單點說,就是上面限制是channel級別的還是consumer級別
注意,prefetchSize和golobal參數還沒有實現。
Channel的詳細介紹:
ConnectionFactory、Connection、Channel都是RabbitMQ對外提供的API中最基本的對象。
Connection是RabbitMQ的socket鏈接,它封裝了socket協(xié)議相關部分邏輯。
ConnectionFactory如名稱,是客戶端與broker的tcp連接工廠,負責根據uri創(chuàng)建Connection。
Channel是我們與RabbitMQ打交道的最重要的一個接口,我們大部分的業(yè)務操作是在Channel這個接口中完成的,包括定義Queue、定義Exchange、綁定Queue與Exchange、發(fā)布消息等。如果每一次訪問RabbitMQ都建立一個Connection,在消息量大的時候建立TCP Connection的開銷將是巨大的,效率也較低。Channel是在connection內部建立的邏輯連接,如果應用程序支持多線程,通常每個thread創(chuàng)建單獨的channel進行通訊,AMQP method包含了channel id幫助客戶端和message broker識別channel,所以channel之間是完全隔離的。Channel作為輕量級的Connection極大減少了操作系統(tǒng)建立TCP connection的開銷。
注意,rabbitmq提供了服務質量保障功能,即在非自動確認消息的前提下,如果一定數目的消息未被確認,不進行消費新的消息。也就是說,我們要使用非自動ack
@Configuration
public class DirectRabbitConfig {
public static final String DEAD_LETTER_EXCHANGE = "dead.latter.exchange";
public static final String DEAD_LETTER_QUEUE = "dead.latter.queue";
public static final String DEAD_LETTER_KEY = "dead.latter.key";
//隊列 起名:TestDirectQueue
@Bean
public Queue TestDirectQueue() {
//實例化隊列時各個參數的含義如下:
//name:隊列名稱
// durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啟時仍然存在,暫存隊列:當前連接有效
// exclusive:默認也是false,只能被當前創(chuàng)建的連接使用,而且當連接關閉后隊列即被刪除。此參考優(yōu)先級高于durable
// autoDelete:默認是false,是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。
//一般設置一下隊列的持久化就好,其余兩個就是默認false(消息和交換機也可以持久化,但是消息持久化的前提是需要和隊列,交換機持久化一起使用)
//為業(yè)務隊列綁定一個死信交換機,當業(yè)務隊列里的消息過期了就被轉發(fā)到死信交換機,再由死信交換機發(fā)給死信隊列處理
// Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 這里聲明當前隊列綁定的死信交換機
// args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 這里聲明當前隊列的死信路由key
// args.put("x-dead-letter-routing-key", DEAD_LETTER_KEY);
//5000毫秒
// args.put("x-message-ttl", 5000);
// return new Queue("TestDirectQueue",false,false,false,args);
return new Queue("TestDirectQueue",false,false,false);
}
//Direct交換機 起名:TestDirectExchange
@Bean
DirectExchange TestDirectExchange() {
return new DirectExchange("TestDirectExchange",true,false);
}
//綁定 將隊列和交換機綁定, 并設置用于匹配鍵:TestDirectRouting
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
}
/**
* ========================死信隊列==================================
*/
// 聲明死信Exchange
@Bean
public DirectExchange deadLetterExchange(){
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
// 聲明死信隊列A
@Bean
public Queue deadLetterQueueA(){
return new Queue(DEAD_LETTER_QUEUE);
}
@Bean
public Binding deadLetterBindingA(){
return BindingBuilder.bind(deadLetterQueueA()).to(deadLetterExchange()).with(DEAD_LETTER_KEY);
}
}
@Component
@RabbitListener(queues = "TestDirectQueue")//監(jiān)聽的隊列名稱 TestDirectQueue
public class DirectReceiver {
Integer index = 0;
@RabbitHandler
public void process(Channel channel,String msg) throws Exception {
++index;
channel.basicQos(0, 1, false);
//設置非自動ack
DefaultConsumer consumer = new DefaultConsumer(channel);
channel.basicConsume("TestDirectQueue", false,consumer);
//假設業(yè)務處理需要3秒,那么當消費者接受到消息的時候,只處理一條,且要處理3秒,那么在服務器堆積的多條信息就不會瘋狂涌入
Thread.sleep(3000);
System.out.println(" DirectReceiver消費者收到消息 : " + msg + ",第" +index+ "條"+"======"+ new Date().toString());
}
}RabbitMQ 中實現消費端限流的步驟
1.設置消費者端的預取值(Prefetch Count):
在創(chuàng)建消費者時,可以通過設置 basicQos(prefetchCount) 方法來指定消費者端的預取值,即每次從 RabbitMQ 中預取的消息數量。
2.確保消費者端開啟手動應答模式:
在設置預取值之前,確保消費者端已經開啟了手動應答模式(manual ack mode),這樣消費者可以自主控制何時應答消息。
3.消費者端處理消息時進行手動應答:
當消費者端接收到消息后,在處理完消息之后,需要顯式地發(fā)送應答(ack)給 RabbitMQ,表示該消息已經被消費。這樣,消費者才能繼續(xù)接收下一批消息。
下面是一個簡單的 Java 示例代碼,演示了如何在 RabbitMQ 中實現消費端限流:
import com.rabbitmq.client.*;
public class Consumer {
private final static String QUEUE_NAME = "queue_name";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
int prefetchCount = 5; // 設置預取值為 5
channel.basicQos(prefetchCount);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Received: " + message);
// 模擬消息處理
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); // 手動應答消息
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
}在上述代碼中,我們首先創(chuàng)建了連接和信道,并聲明了一個名為 “queue_name” 的隊列。然后,通過 channel.basicQos(prefetchCount) 方法設置了消費者端的預取值為 5。接著,我們定義了一個 DeliverCallback 回調函數,在其中處理消息并手動應答。最后,通過 channel.basicConsume() 方法啟動消費者端。
通過設置預取值和手動應答,消費者端可以控制自身處理消息的速度,有效地實現消費端的限流。希望這個示例能幫助您理解如何在 RabbitMQ 中實現消費端限流!
到此這篇關于RabbitMQ如何實現消費端限流的文章就介紹到這了,更多相關RabbitMQ消費端限流內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Java?中?hashCode()?與?equals()?的關系(面試)
這篇文章主要介紹了Java中hashCode()與equals()的關系,ava中hashCode()和equals()的關系是面試中的??键c,文章對hashCode與equals的關系做出詳解,需要的小伙伴可以參考一下2022-09-09
springboot使用RedisRepository操作數據的實現
本文主要介紹了springboot使用RedisRepository操作數據的實現,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2022-05-05
SpringBoot整合sharding-jdbc?實現分庫分表操作的示例代碼
在Spring?Boot中使用ShardingSphere的Sharding-JDBC來實現數據庫的分庫分表是一個常見的需求,下面就拉具體介紹一下實現步驟,需要的朋友們下面隨著小編來一起學習學習吧2025-04-04

