欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RabbitMQ實(shí)現(xiàn)消費(fèi)端限流的步驟

 更新時(shí)間:2024年03月08日 11:36:26   作者:思靜語(yǔ)  
消費(fèi)者端限流的主要目的是控制消費(fèi)者每次從 RabbitMQ 中獲取的消息數(shù)量,從而實(shí)現(xiàn)消息處理的流量控制,這篇文章主要介紹了RabbitMQ如何實(shí)現(xiàn)消費(fèi)端限流,需要的朋友可以參考下

概述

在 RabbitMQ 中,可以通過(guò)消費(fèi)者端限流(Consumer Prefetch)來(lái)控制消費(fèi)端處理消息的速度,以避免消費(fèi)端處理能力不足或處理過(guò)慢而導(dǎo)致消息堆積。消費(fèi)者端限流的主要目的是控制消費(fèi)者每次從 RabbitMQ 中獲取的消息數(shù)量,從而實(shí)現(xiàn)消息處理的流量控制。
RabbitMQ 提供了一種 QOS(服務(wù)質(zhì)量保證)功能,即在非自動(dòng)確認(rèn)消息的前提下,如果一定數(shù)目的消息還未被消費(fèi)確認(rèn),則不進(jìn)行新消息的消費(fèi)。
RabbitMQ 為我們提供了三種機(jī)制
● 對(duì)內(nèi)存和磁盤(pán)使用量設(shè)置閾值
● 于credit flow 的流控機(jī)制
● QoS保證機(jī)制
channel.basicQos()
channel.basicQos(int prefetchSize,int prefetchCount,boolean global)
一定要注意的是,如果做限流,那么no_ask是要設(shè)置為false,也就是手工簽收而不是自動(dòng)簽收的情況下才可以做限流。
參數(shù):
prefetchSize:消息的大小
prefetchCount:會(huì)告訴RabbitMQ不要同時(shí)給一個(gè)消費(fèi)者推送多于N個(gè)消息,即一旦有N個(gè)消息還沒(méi)有ack,則該consumer將block掉,直到有消息ack
global:是否將上面設(shè)置應(yīng)用于channel,簡(jiǎn)單點(diǎn)說(shuō),就是上面限制是channel級(jí)別的還是consumer級(jí)別
注意,prefetchSize和golobal參數(shù)還沒(méi)有實(shí)現(xiàn)。
Channel的詳細(xì)介紹:
ConnectionFactory、Connection、Channel都是RabbitMQ對(duì)外提供的API中最基本的對(duì)象。
Connection是RabbitMQ的socket鏈接,它封裝了socket協(xié)議相關(guān)部分邏輯。
ConnectionFactory如名稱,是客戶端與broker的tcp連接工廠,負(fù)責(zé)根據(jù)uri創(chuàng)建Connection。
Channel是我們與RabbitMQ打交道的最重要的一個(gè)接口,我們大部分的業(yè)務(wù)操作是在Channel這個(gè)接口中完成的,包括定義Queue、定義Exchange、綁定Queue與Exchange、發(fā)布消息等。如果每一次訪問(wèn)RabbitMQ都建立一個(gè)Connection,在消息量大的時(shí)候建立TCP Connection的開(kāi)銷將是巨大的,效率也較低。Channel是在connection內(nèi)部建立的邏輯連接,如果應(yīng)用程序支持多線程,通常每個(gè)thread創(chuàng)建單獨(dú)的channel進(jìn)行通訊,AMQP method包含了channel id幫助客戶端和message broker識(shí)別channel,所以channel之間是完全隔離的。Channel作為輕量級(jí)的Connection極大減少了操作系統(tǒng)建立TCP connection的開(kāi)銷。
注意,rabbitmq提供了服務(wù)質(zhì)量保障功能,即在非自動(dòng)確認(rèn)消息的前提下,如果一定數(shù)目的消息未被確認(rèn),不進(jìn)行消費(fèi)新的消息。也就是說(shuō),我們要使用非自動(dòng)ack

@Configuration
public class DirectRabbitConfig {
    public static final String DEAD_LETTER_EXCHANGE = "dead.latter.exchange";
    public static final String DEAD_LETTER_QUEUE = "dead.latter.queue";
    public static final String DEAD_LETTER_KEY = "dead.latter.key";
    //隊(duì)列 起名:TestDirectQueue
    @Bean
    public Queue TestDirectQueue() {
        //實(shí)例化隊(duì)列時(shí)各個(gè)參數(shù)的含義如下:
        //name:隊(duì)列名稱
        // durable:是否持久化,默認(rèn)是false,持久化隊(duì)列:會(huì)被存儲(chǔ)在磁盤(pán)上,當(dāng)消息代理重啟時(shí)仍然存在,暫存隊(duì)列:當(dāng)前連接有效
        // exclusive:默認(rèn)也是false,只能被當(dāng)前創(chuàng)建的連接使用,而且當(dāng)連接關(guān)閉后隊(duì)列即被刪除。此參考優(yōu)先級(jí)高于durable
        // autoDelete:默認(rèn)是false,是否自動(dòng)刪除,當(dāng)沒(méi)有生產(chǎn)者或者消費(fèi)者使用此隊(duì)列,該隊(duì)列會(huì)自動(dòng)刪除。
        //一般設(shè)置一下隊(duì)列的持久化就好,其余兩個(gè)就是默認(rèn)false(消息和交換機(jī)也可以持久化,但是消息持久化的前提是需要和隊(duì)列,交換機(jī)持久化一起使用)
        //為業(yè)務(wù)隊(duì)列綁定一個(gè)死信交換機(jī),當(dāng)業(yè)務(wù)隊(duì)列里的消息過(guò)期了就被轉(zhuǎn)發(fā)到死信交換機(jī),再由死信交換機(jī)發(fā)給死信隊(duì)列處理
//        Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    這里聲明當(dāng)前隊(duì)列綁定的死信交換機(jī)
//        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
//       x-dead-letter-routing-key  這里聲明當(dāng)前隊(duì)列的死信路由key
//        args.put("x-dead-letter-routing-key", DEAD_LETTER_KEY);
        //5000毫秒
//        args.put("x-message-ttl", 5000);
//        return new Queue("TestDirectQueue",false,false,false,args);
        return new Queue("TestDirectQueue",false,false,false);
    }
    //Direct交換機(jī) 起名:TestDirectExchange
    @Bean
    DirectExchange TestDirectExchange() {
        return new DirectExchange("TestDirectExchange",true,false);
    }
    //綁定  將隊(duì)列和交換機(jī)綁定, 并設(shè)置用于匹配鍵:TestDirectRouting
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
    }
    /**
     * ========================死信隊(duì)列==================================
     */
    // 聲明死信Exchange
    @Bean
    public DirectExchange deadLetterExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }
    // 聲明死信隊(duì)列A
    @Bean
    public Queue deadLetterQueueA(){
        return new Queue(DEAD_LETTER_QUEUE);
    }
    @Bean
    public Binding deadLetterBindingA(){
        return BindingBuilder.bind(deadLetterQueueA()).to(deadLetterExchange()).with(DEAD_LETTER_KEY);
    }
}
@Component
@RabbitListener(queues = "TestDirectQueue")//監(jiān)聽(tīng)的隊(duì)列名稱 TestDirectQueue
public class DirectReceiver {
     Integer index = 0;
    @RabbitHandler
    public void process(Channel channel,String msg) throws Exception {
        ++index;
        channel.basicQos(0, 1, false);
        //設(shè)置非自動(dòng)ack
        DefaultConsumer consumer = new DefaultConsumer(channel);
        channel.basicConsume("TestDirectQueue", false,consumer);
        //假設(shè)業(yè)務(wù)處理需要3秒,那么當(dāng)消費(fèi)者接受到消息的時(shí)候,只處理一條,且要處理3秒,那么在服務(wù)器堆積的多條信息就不會(huì)瘋狂涌入
        Thread.sleep(3000);
        System.out.println(" DirectReceiver消費(fèi)者收到消息  : " + msg + ",第" +index+ "條"+"======"+ new Date().toString());
    }
}

RabbitMQ 中實(shí)現(xiàn)消費(fèi)端限流的步驟

1.設(shè)置消費(fèi)者端的預(yù)取值(Prefetch Count):
在創(chuàng)建消費(fèi)者時(shí),可以通過(guò)設(shè)置 basicQos(prefetchCount) 方法來(lái)指定消費(fèi)者端的預(yù)取值,即每次從 RabbitMQ 中預(yù)取的消息數(shù)量。

2.確保消費(fèi)者端開(kāi)啟手動(dòng)應(yīng)答模式:
在設(shè)置預(yù)取值之前,確保消費(fèi)者端已經(jīng)開(kāi)啟了手動(dòng)應(yīng)答模式(manual ack mode),這樣消費(fèi)者可以自主控制何時(shí)應(yīng)答消息。

3.消費(fèi)者端處理消息時(shí)進(jìn)行手動(dòng)應(yīng)答:
當(dāng)消費(fèi)者端接收到消息后,在處理完消息之后,需要顯式地發(fā)送應(yīng)答(ack)給 RabbitMQ,表示該消息已經(jīng)被消費(fèi)。這樣,消費(fèi)者才能繼續(xù)接收下一批消息。
下面是一個(gè)簡(jiǎn)單的 Java 示例代碼,演示了如何在 RabbitMQ 中實(shí)現(xiàn)消費(fèi)端限流:

import com.rabbitmq.client.*;
public class Consumer {
    private final static String QUEUE_NAME = "queue_name";
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        int prefetchCount = 5; // 設(shè)置預(yù)取值為 5
        channel.basicQos(prefetchCount);
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Received: " + message);
            // 模擬消息處理
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); // 手動(dòng)應(yīng)答消息
        };
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
    }
}

在上述代碼中,我們首先創(chuàng)建了連接和信道,并聲明了一個(gè)名為 “queue_name” 的隊(duì)列。然后,通過(guò) channel.basicQos(prefetchCount) 方法設(shè)置了消費(fèi)者端的預(yù)取值為 5。接著,我們定義了一個(gè) DeliverCallback 回調(diào)函數(shù),在其中處理消息并手動(dòng)應(yīng)答。最后,通過(guò) channel.basicConsume() 方法啟動(dòng)消費(fèi)者端。
通過(guò)設(shè)置預(yù)取值和手動(dòng)應(yīng)答,消費(fèi)者端可以控制自身處理消息的速度,有效地實(shí)現(xiàn)消費(fèi)端的限流。希望這個(gè)示例能幫助您理解如何在 RabbitMQ 中實(shí)現(xiàn)消費(fèi)端限流!

到此這篇關(guān)于RabbitMQ如何實(shí)現(xiàn)消費(fèi)端限流的文章就介紹到這了,更多相關(guān)RabbitMQ消費(fèi)端限流內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • java中流的使用

    java中流的使用

    本文主要介紹了java中流的使用以及分類。具有一定的參考價(jià)值,下面跟著小編一起來(lái)看下吧
    2017-01-01
  • Java?中?hashCode()?與?equals()?的關(guān)系(面試)

    Java?中?hashCode()?與?equals()?的關(guān)系(面試)

    這篇文章主要介紹了Java中hashCode()與equals()的關(guān)系,ava中hashCode()和equals()的關(guān)系是面試中的??键c(diǎn),文章對(duì)hashCode與equals的關(guān)系做出詳解,需要的小伙伴可以參考一下
    2022-09-09
  • springboot使用RedisRepository操作數(shù)據(jù)的實(shí)現(xiàn)

    springboot使用RedisRepository操作數(shù)據(jù)的實(shí)現(xiàn)

    本文主要介紹了springboot使用RedisRepository操作數(shù)據(jù)的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2022-05-05
  • 使用@CachePut?更新數(shù)據(jù)庫(kù)和更新緩存

    使用@CachePut?更新數(shù)據(jù)庫(kù)和更新緩存

    這篇文章主要介紹了使用@CachePut?更新數(shù)據(jù)庫(kù)和更新緩存方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • Intellij?IDEA如何修改配置文件位置

    Intellij?IDEA如何修改配置文件位置

    這篇文章主要介紹了Intellij?IDEA--修改配置文件位置,文章末尾給大家介紹了Intellij?IDEA--宏的用法記錄操作過(guò)程,對(duì)此文感興趣的朋友跟隨小編一起看看吧
    2022-08-08
  • 深入了解MyBatis參數(shù)

    深入了解MyBatis參數(shù)

    今天小編就為大家分享一篇關(guān)于深入了解MyBatis參數(shù),小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧
    2018-12-12
  • Java基于redis實(shí)現(xiàn)分布式鎖

    Java基于redis實(shí)現(xiàn)分布式鎖

    系統(tǒng)的不斷擴(kuò)大,分布式鎖是最基本的保障。今天我們介紹通過(guò)redis實(shí)現(xiàn)分布式鎖,感興趣的朋友可以參考下
    2021-05-05
  • SpringBoot整合sharding-jdbc?實(shí)現(xiàn)分庫(kù)分表操作的示例代碼

    SpringBoot整合sharding-jdbc?實(shí)現(xiàn)分庫(kù)分表操作的示例代碼

    在Spring?Boot中使用ShardingSphere的Sharding-JDBC來(lái)實(shí)現(xiàn)數(shù)據(jù)庫(kù)的分庫(kù)分表是一個(gè)常見(jiàn)的需求,下面就拉具體介紹一下實(shí)現(xiàn)步驟,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2025-04-04
  • 實(shí)現(xiàn)quartz定時(shí)器及quartz定時(shí)器原理介紹

    實(shí)現(xiàn)quartz定時(shí)器及quartz定時(shí)器原理介紹

    Quartz是一個(gè)大名鼎鼎的Java版開(kāi)源定時(shí)調(diào)度器,功能強(qiáng)悍,使用方便,下面我們看看如何使用它
    2013-12-12
  • JpaRepository如何實(shí)現(xiàn)增刪改查并進(jìn)行單元測(cè)試

    JpaRepository如何實(shí)現(xiàn)增刪改查并進(jìn)行單元測(cè)試

    這篇文章主要介紹了JpaRepository如何實(shí)現(xiàn)增刪改查并進(jìn)行單元測(cè)試,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-11-11

最新評(píng)論