欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RabbitMQ高級應用之消費端限流策略basicQos詳解

 更新時間:2023年08月28日 10:08:26   作者:瘋狂的帆  
這篇文章主要介紹了RabbitMQ高級應用之消費端限流策略basicQos詳解,高并發(fā)情況下,隊列里面一瞬間就就積累了上萬條數(shù)據(jù),但是消費者無法同時處理這么多請求,這種場景下我們就需要對消費端進行限流,需要的朋友可以參考下

業(yè)務場景

高并發(fā)情況下,隊列里面一瞬間就就積累了上萬條數(shù)據(jù),但是消費者無法同時處理這么多請求,這個時候當我們打開客戶端,瞬間就有巨量的信息給推送過來

但是客戶端是沒有辦法同時處理這么多數(shù)據(jù)的,結果就是消費者(客戶端)掛掉了…

這種場景下我們就需要對消費端進行限流

限流策略實現(xiàn)

限流策略關鍵代碼:

channel.basicQos(); 

編寫生產(chǎn)者

// 生產(chǎn)者
public class Producer {
    private static final String QUEUE_NAME = "queue_limit_1";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        for (int i = 0; i < 100; i++) {
            channel.basicPublish("", QUEUE_NAME, null, ("消費端限流策略—測試數(shù)據(jù):" + i).getBytes());
        }
        channel.close();
        connection.close();
    }
}

編寫消費者1

// 消費者1
public class Consumer {
    private static final String QUEUE_NAME = "queue_limit_1";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者1接收到信息:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
    }
}

編寫消費者2

// 消費者2
public class Consumer2 {
    private static final String QUEUE_NAME = "queue_limit_1";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        /**  設置限流機制
         *  param1: prefetchSize,消息本身的大小 如果設置為0  那么表示對消息本身的大小不限制
         *  param2: prefetchCount,告訴rabbitmq不要一次性給消費者推送大于N個消息
         *  param3:global,是否將上面的設置應用于整個通道,false表示只應用于當前消費者
         */
        channel.basicQos(0, 5, false);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者2接收到信息:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
    }
}

運行結果

在這里插入圖片描述

在這里插入圖片描述

小結

  1. 限流的核心代碼就是 channel.basicQos();
  2. 限流情況 ack 不能設置自動簽收,一定要手動簽收 channel.basicQos()
/**
     * @param prefetchSize maximum amount of content (measured in
     * octets) that the server will deliver, 0 if unlimited
     * @param prefetchCount maximum number of messages that the server
     * will deliver, 0 if unlimited
     * @param global true if the settings should be applied to the
     * entire channel rather than each consumer
     */
    void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

該方法的作用是:進行消費端的限流

  • param1:prefetchSize,消息本身的大小 如果設置為0 那么表示對消息本身的大小不限制
  • param2:prefetchCount,告訴rabbitmq不要一次性給消費者推送大于N個消息
  • param3:global,是否將上面的設置應用于整個通道
    • false:表示只應用于當前消費者
    • true:表示當前通道的所有消費者都應用這個限流策略

到此這篇關于RabbitMQ高級應用之消費端限流策略basicQos詳解的文章就介紹到這了,更多相關RabbitMQ消費端限流策略basicQos內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • SpringBoot使用validation-api實現(xiàn)對枚舉類參數(shù)校驗的方法

    SpringBoot使用validation-api實現(xiàn)對枚舉類參數(shù)校驗的方法

    這篇文章主要介紹了SpringBoot使用validation-api實現(xiàn)對枚舉類參數(shù)校驗,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-11-11
  • springboot結合vue實現(xiàn)增刪改查及分頁查詢

    springboot結合vue實現(xiàn)增刪改查及分頁查詢

    本文主要介紹了springboot結合vue實現(xiàn)增刪改查及分頁查詢,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-09-09
  • java8 多個list對象用lambda求差集操作

    java8 多個list對象用lambda求差集操作

    這篇文章主要介紹了java8 多個list對象用lambda求差集操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-09-09
  • tk.mybatis如何擴展自己的通用mapper

    tk.mybatis如何擴展自己的通用mapper

    這篇文章主要介紹了tk.mybatis如何擴展自己的通用mapper操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-06-06
  • 解讀Java報錯輸出的信息究竟是什么

    解讀Java報錯輸出的信息究竟是什么

    Java報錯輸出的信息主要包括異常的主要描述信息和當前線程的棧幀信息,棧幀是虛擬機棧的基本存儲單元,主要由局部變量表、操作數(shù)棧和幀數(shù)據(jù)三部分組成,局部變量表用于存放方法的參數(shù)和局部變量,操作數(shù)棧用于保存計算過程中產(chǎn)生的中間結果
    2024-12-12
  • 使用Apache Ignite實現(xiàn)Java數(shù)據(jù)網(wǎng)格

    使用Apache Ignite實現(xiàn)Java數(shù)據(jù)網(wǎng)格

    今天我們來探討如何使用Apache Ignite來實現(xiàn)Java數(shù)據(jù)網(wǎng)格,Apache Ignite是一個高性能的內(nèi)存計算平臺,它提供了分布式緩存、數(shù)據(jù)網(wǎng)格和計算功能,可以顯著提高大規(guī)模應用的數(shù)據(jù)處理性能,感興趣的小伙伴跟著小編一起來看看吧
    2024-08-08
  • IDEA 2020.1.1好用的plugins插件推薦

    IDEA 2020.1.1好用的plugins插件推薦

    這篇文章主要介紹了IDEA 2020.1.1好用的plugins插件推薦,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-07-07
  • SpringBoot2 task scheduler 定時任務調(diào)度器四種方式

    SpringBoot2 task scheduler 定時任務調(diào)度器四種方式

    這篇文章主要介紹了SpringBoot2 task scheduler 定時任務調(diào)度器四種方式,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2019-03-03
  • SpringBoot實現(xiàn)反向代理的示例代碼

    SpringBoot實現(xiàn)反向代理的示例代碼

    本文主要介紹了SpringBoot實現(xiàn)反向代理的示例代碼,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-06-06
  • JDK集合源碼之解析TreeMap(二)

    JDK集合源碼之解析TreeMap(二)

    下面小編就為大家?guī)硪黄獪\談java中的TreeMap 排序與TreeSet 排序。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2021-07-07

最新評論