RabbitMQ實(shí)現(xiàn)消費(fèi)端限流的步驟
概述
在 RabbitMQ 中,可以通過消費(fèi)者端限流(Consumer Prefetch)來控制消費(fèi)端處理消息的速度,以避免消費(fèi)端處理能力不足或處理過慢而導(dǎo)致消息堆積。消費(fèi)者端限流的主要目的是控制消費(fèi)者每次從 RabbitMQ 中獲取的消息數(shù)量,從而實(shí)現(xiàn)消息處理的流量控制。
RabbitMQ 提供了一種 QOS(服務(wù)質(zhì)量保證)功能,即在非自動確認(rèn)消息的前提下,如果一定數(shù)目的消息還未被消費(fèi)確認(rèn),則不進(jìn)行新消息的消費(fèi)。
RabbitMQ 為我們提供了三種機(jī)制
● 對內(nèi)存和磁盤使用量設(shè)置閾值
● 于credit flow 的流控機(jī)制
● QoS保證機(jī)制
channel.basicQos()
channel.basicQos(int prefetchSize,int prefetchCount,boolean global)
一定要注意的是,如果做限流,那么no_ask是要設(shè)置為false,也就是手工簽收而不是自動簽收的情況下才可以做限流。
參數(shù):
prefetchSize:消息的大小
prefetchCount:會告訴RabbitMQ不要同時(shí)給一個消費(fèi)者推送多于N個消息,即一旦有N個消息還沒有ack,則該consumer將block掉,直到有消息ack
global:是否將上面設(shè)置應(yīng)用于channel,簡單點(diǎn)說,就是上面限制是channel級別的還是consumer級別
注意,prefetchSize和golobal參數(shù)還沒有實(shí)現(xiàn)。
Channel的詳細(xì)介紹:
ConnectionFactory、Connection、Channel都是RabbitMQ對外提供的API中最基本的對象。
Connection是RabbitMQ的socket鏈接,它封裝了socket協(xié)議相關(guān)部分邏輯。
ConnectionFactory如名稱,是客戶端與broker的tcp連接工廠,負(fù)責(zé)根據(jù)uri創(chuàng)建Connection。
Channel是我們與RabbitMQ打交道的最重要的一個接口,我們大部分的業(yè)務(wù)操作是在Channel這個接口中完成的,包括定義Queue、定義Exchange、綁定Queue與Exchange、發(fā)布消息等。如果每一次訪問RabbitMQ都建立一個Connection,在消息量大的時(shí)候建立TCP Connection的開銷將是巨大的,效率也較低。Channel是在connection內(nèi)部建立的邏輯連接,如果應(yīng)用程序支持多線程,通常每個thread創(chuàng)建單獨(dú)的channel進(jìn)行通訊,AMQP method包含了channel id幫助客戶端和message broker識別channel,所以channel之間是完全隔離的。Channel作為輕量級的Connection極大減少了操作系統(tǒng)建立TCP connection的開銷。
注意,rabbitmq提供了服務(wù)質(zhì)量保障功能,即在非自動確認(rèn)消息的前提下,如果一定數(shù)目的消息未被確認(rèn),不進(jìn)行消費(fèi)新的消息。也就是說,我們要使用非自動ack
@Configuration
public class DirectRabbitConfig {
public static final String DEAD_LETTER_EXCHANGE = "dead.latter.exchange";
public static final String DEAD_LETTER_QUEUE = "dead.latter.queue";
public static final String DEAD_LETTER_KEY = "dead.latter.key";
//隊(duì)列 起名:TestDirectQueue
@Bean
public Queue TestDirectQueue() {
//實(shí)例化隊(duì)列時(shí)各個參數(shù)的含義如下:
//name:隊(duì)列名稱
// durable:是否持久化,默認(rèn)是false,持久化隊(duì)列:會被存儲在磁盤上,當(dāng)消息代理重啟時(shí)仍然存在,暫存隊(duì)列:當(dāng)前連接有效
// exclusive:默認(rèn)也是false,只能被當(dāng)前創(chuàng)建的連接使用,而且當(dāng)連接關(guān)閉后隊(duì)列即被刪除。此參考優(yōu)先級高于durable
// autoDelete:默認(rèn)是false,是否自動刪除,當(dāng)沒有生產(chǎn)者或者消費(fèi)者使用此隊(duì)列,該隊(duì)列會自動刪除。
//一般設(shè)置一下隊(duì)列的持久化就好,其余兩個就是默認(rèn)false(消息和交換機(jī)也可以持久化,但是消息持久化的前提是需要和隊(duì)列,交換機(jī)持久化一起使用)
//為業(yè)務(wù)隊(duì)列綁定一個死信交換機(jī),當(dāng)業(yè)務(wù)隊(duì)列里的消息過期了就被轉(zhuǎn)發(fā)到死信交換機(jī),再由死信交換機(jī)發(fā)給死信隊(duì)列處理
// Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 這里聲明當(dāng)前隊(duì)列綁定的死信交換機(jī)
// args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 這里聲明當(dāng)前隊(duì)列的死信路由key
// args.put("x-dead-letter-routing-key", DEAD_LETTER_KEY);
//5000毫秒
// args.put("x-message-ttl", 5000);
// return new Queue("TestDirectQueue",false,false,false,args);
return new Queue("TestDirectQueue",false,false,false);
}
//Direct交換機(jī) 起名:TestDirectExchange
@Bean
DirectExchange TestDirectExchange() {
return new DirectExchange("TestDirectExchange",true,false);
}
//綁定 將隊(duì)列和交換機(jī)綁定, 并設(shè)置用于匹配鍵:TestDirectRouting
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
}
/**
* ========================死信隊(duì)列==================================
*/
// 聲明死信Exchange
@Bean
public DirectExchange deadLetterExchange(){
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
// 聲明死信隊(duì)列A
@Bean
public Queue deadLetterQueueA(){
return new Queue(DEAD_LETTER_QUEUE);
}
@Bean
public Binding deadLetterBindingA(){
return BindingBuilder.bind(deadLetterQueueA()).to(deadLetterExchange()).with(DEAD_LETTER_KEY);
}
}
@Component
@RabbitListener(queues = "TestDirectQueue")//監(jiān)聽的隊(duì)列名稱 TestDirectQueue
public class DirectReceiver {
Integer index = 0;
@RabbitHandler
public void process(Channel channel,String msg) throws Exception {
++index;
channel.basicQos(0, 1, false);
//設(shè)置非自動ack
DefaultConsumer consumer = new DefaultConsumer(channel);
channel.basicConsume("TestDirectQueue", false,consumer);
//假設(shè)業(yè)務(wù)處理需要3秒,那么當(dāng)消費(fèi)者接受到消息的時(shí)候,只處理一條,且要處理3秒,那么在服務(wù)器堆積的多條信息就不會瘋狂涌入
Thread.sleep(3000);
System.out.println(" DirectReceiver消費(fèi)者收到消息 : " + msg + ",第" +index+ "條"+"======"+ new Date().toString());
}
}RabbitMQ 中實(shí)現(xiàn)消費(fèi)端限流的步驟
1.設(shè)置消費(fèi)者端的預(yù)取值(Prefetch Count):
在創(chuàng)建消費(fèi)者時(shí),可以通過設(shè)置 basicQos(prefetchCount) 方法來指定消費(fèi)者端的預(yù)取值,即每次從 RabbitMQ 中預(yù)取的消息數(shù)量。
2.確保消費(fèi)者端開啟手動應(yīng)答模式:
在設(shè)置預(yù)取值之前,確保消費(fèi)者端已經(jīng)開啟了手動應(yīng)答模式(manual ack mode),這樣消費(fèi)者可以自主控制何時(shí)應(yīng)答消息。
3.消費(fèi)者端處理消息時(shí)進(jìn)行手動應(yīng)答:
當(dāng)消費(fèi)者端接收到消息后,在處理完消息之后,需要顯式地發(fā)送應(yīng)答(ack)給 RabbitMQ,表示該消息已經(jīng)被消費(fèi)。這樣,消費(fèi)者才能繼續(xù)接收下一批消息。
下面是一個簡單的 Java 示例代碼,演示了如何在 RabbitMQ 中實(shí)現(xiàn)消費(fèi)端限流:
import com.rabbitmq.client.*;
public class Consumer {
private final static String QUEUE_NAME = "queue_name";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
int prefetchCount = 5; // 設(shè)置預(yù)取值為 5
channel.basicQos(prefetchCount);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Received: " + message);
// 模擬消息處理
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); // 手動應(yīng)答消息
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
}在上述代碼中,我們首先創(chuàng)建了連接和信道,并聲明了一個名為 “queue_name” 的隊(duì)列。然后,通過 channel.basicQos(prefetchCount) 方法設(shè)置了消費(fèi)者端的預(yù)取值為 5。接著,我們定義了一個 DeliverCallback 回調(diào)函數(shù),在其中處理消息并手動應(yīng)答。最后,通過 channel.basicConsume() 方法啟動消費(fèi)者端。
通過設(shè)置預(yù)取值和手動應(yīng)答,消費(fèi)者端可以控制自身處理消息的速度,有效地實(shí)現(xiàn)消費(fèi)端的限流。希望這個示例能幫助您理解如何在 RabbitMQ 中實(shí)現(xiàn)消費(fèi)端限流!
到此這篇關(guān)于RabbitMQ如何實(shí)現(xiàn)消費(fèi)端限流的文章就介紹到這了,更多相關(guān)RabbitMQ消費(fèi)端限流內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java?中?hashCode()?與?equals()?的關(guān)系(面試)
這篇文章主要介紹了Java中hashCode()與equals()的關(guān)系,ava中hashCode()和equals()的關(guān)系是面試中的??键c(diǎn),文章對hashCode與equals的關(guān)系做出詳解,需要的小伙伴可以參考一下2022-09-09
springboot使用RedisRepository操作數(shù)據(jù)的實(shí)現(xiàn)
本文主要介紹了springboot使用RedisRepository操作數(shù)據(jù)的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-05-05
使用@CachePut?更新數(shù)據(jù)庫和更新緩存
這篇文章主要介紹了使用@CachePut?更新數(shù)據(jù)庫和更新緩存方式,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12
SpringBoot整合sharding-jdbc?實(shí)現(xiàn)分庫分表操作的示例代碼
在Spring?Boot中使用ShardingSphere的Sharding-JDBC來實(shí)現(xiàn)數(shù)據(jù)庫的分庫分表是一個常見的需求,下面就拉具體介紹一下實(shí)現(xiàn)步驟,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2025-04-04
實(shí)現(xiàn)quartz定時(shí)器及quartz定時(shí)器原理介紹
Quartz是一個大名鼎鼎的Java版開源定時(shí)調(diào)度器,功能強(qiáng)悍,使用方便,下面我們看看如何使用它2013-12-12
JpaRepository如何實(shí)現(xiàn)增刪改查并進(jìn)行單元測試
這篇文章主要介紹了JpaRepository如何實(shí)現(xiàn)增刪改查并進(jìn)行單元測試,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-11-11

