欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RabbitMQ中的prefetch_count參數詳解

 更新時間:2023年11月27日 14:21:19   作者:Throwable文摘  
這篇文章主要介紹了RabbitMQ中的prefetch_count參數用法,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教

前提

在某一次用戶標簽服務中大量用到異步流程,使用了RabbitMQ進行解耦。

其中,為了提高消費者的處理效率針對了不同節(jié)點任務的消費者線程數和prefetch_count參數都做了調整和測試,得到一個相對合理的組合。

這里深入分析一下prefetch_count參數在RabbitMQ中的作用。

prefetch_count參數的含義

先從AMQPAdvanced Message Queuing Protocol,即高級消息隊列協議,RabbitMQ實現了此協議的0-9-1版本的大部分內容)和RabbitMQ的具體實現去理解prefetch_count參數的含義,可以查閱對應的文檔(見文末參考資料)。

AMQP 0-9-1定義了basic.qos方法去限制消費者基于某一個Channel或者Connection上未進行ack的最大消息數量上限。

basic.qos方法支持兩個參數:

  • global:布爾值。
  • prefetch_count:整數。

這兩個參數在AMQP 0-9-1定義中的含義和RabbitMQ具體實現時有所不同,見下表:

global參數值AMQP 0-9-1中prefetch_count參數的含義RabbitMQ中prefetch_count參數的含義
falseprefetch_count值在當前Channel的所有消費者共享prefetch_count對于基于當前Channel創(chuàng)建的消費者生效
trueprefetch_count值在當前Connection的所有消費者共享prefetch_count值在當前Channel的所有消費者共享

或者用簡潔的英文表格理解:

globalprefetch_count in AMQP 0-9-1prefetch_count in RabbitMQ
falsePer channel limitPer customer limit
truePer connection limitPer channel limit

這里畫一個圖理解一下:

上圖僅僅為了區(qū)分協議本身和RabbitMQ中實現的不同,接著說說prefetch_count對于消費者(線程)和待消費消息的作用。

假定一個前提:RabbitMQ客戶端從RabbitMQ服務端獲取到隊列消息的速度比消費者線程消費速度快,目前有兩個消費者線程共用一個Channel實例。

global參數為false時候,效果如下:

而當global參數為true時候,效果如下:

在消費者線程處理速度遠低于RabbitMQ客戶端從RabbitMQ服務端獲取到隊列消息的速度的場景下,prefetch_count條未進行ack的消息會暫時存放在一個隊列(準確來說是阻塞隊列,然后阻塞隊列中的消息任務會流轉到一個列表中遍歷回調消費者句柄,見下一節(jié)的源碼分析)中等待被消費者處理。

這部分消息會占據JVM的堆內存,所以在性能調優(yōu)或者設定應用程序的初始化和最大堆內存的時候,如果剛好用到RabbitMQ的消費者,必須要考慮這些"預取消息"的內存占用量。

不過值得注意的是:prefetch_countRabbitMQ服務端的參數,它的設置值或者快照都不會存放在RabbitMQ客戶端」。

同時需要注意prefetch_count生效的條件和特性(從參數設置的一些demo和源碼上感知):

  • prefetch_count參數僅僅在basic.consumeautoAck參數設置為false的前提下才生效,也就是不能使用自動確認,自動確認的消息沒有辦法限流。
  • basic.consume如果在非自動確認模式下忘記了手動調用basic.ack,那么prefetch_count正是未ack消息數量的最大上限。
  • prefetch_count是由RabbitMQ服務端控制,一般情況下能保證各個消費者線程中的未ack消息分發(fā)是均衡的,這點筆者猜測是consumerTag起到了關鍵作用。

RabbitMQ客戶端中prefetch_count源碼跟蹤

編寫本文的時候引入的RabbitMQ客戶端版本為:com.rabbitmq:amqp-client:5.9.0

上面說了這么多都只是根據官方的文檔或者博客中的理論依據進行分析,其實更加根本的分析方法是直接閱讀RabbitMQJava客戶端源碼,主要是針對basic.qosbasic.consume兩個方法,對應的是com.rabbitmq.client.impl.ChannelN#basicQos()com.rabbitmq.client.impl.ChannelN#basicConsume()兩個方法。

先看ChannelN#basicQos()

這里的basicQos()方法多了一個prefetchSize參數,用于限制分發(fā)內容的大小上限,默認值0代表無限制,而prefetchCount的取值范圍是[0,65535],取值為0也是代表無限制。

這里的ChannelN#basicQos()實現中直接封裝basic.qos方法參數進行一次RPC調用,意味著直接更變RabbitMQ服務端的配置,即時生效,同時參數值完全沒有保存在客戶端代碼中,印證了前面一節(jié)的結論。

接著看ChannelN#basicConsume()方法:

上圖已經把關鍵部分用紅圈圈出,因為整個消息消費過程是異步的,涉及太多的類和方法,這里不全量貼出,整理了一個流程圖:

整個消息消費過程,prefetch_count參數并未出現在客戶端代碼中,又再次印證了前面一節(jié)的結論,即prefetch_count參數的行為和作用完全由RabbitMQ服務端控制。

而最終Customer或者常用的DefaultCustomer句柄是在WorkPoolRunnable中回調的,這類任務的執(zhí)行線程來自于ConsumerWorkService內部的線程池,而這個線程池又使用了Executors.newFixedThreadPool()去構建,使用了默認的線程工廠類,因此在Customer#handleDelivery()方法內部打印的線程名稱的樣子是pool-1-thread-*

這里VariableLinkedBlockingQueue就是前一節(jié)中的message queue的原型

prefetch_count參數使用

設置prefetch_count參數比較簡單,就是調用Channel#basicQos()方法:

public class RabbitQos {
 
    static String QUEUE = "qos.test";
 
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE, true, false, false, null);
        channel.basicQos(2);
        channel.basicConsume("qos.test", false, new DefaultConsumer(channel) {
 
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("1------" + Thread.currentThread().getName());
                sleep();
            }
        });
        channel.basicConsume("qos.test", false, new DefaultConsumer(channel) {
 
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("2------" + Thread.currentThread().getName());
                sleep();
            }
        });
        for (int i = 0; i < 20; i++) {
            channel.basicPublish("", QUEUE, MessageProperties.TEXT_PLAIN, String.valueOf(i).getBytes());
        }
        sleep();
    }
 
    private static void sleep() {
        try {
            Thread.sleep(Long.MAX_VALUE);
        } catch (Exception ignore) {
 
        }
    }
}

上面是原生的amqp-client的寫法,如果使用了spring-amqpspring-boot-starter-amqp),可以通過配置文件中的spring.rabbitmq.listener.direct.prefetch屬性指定所有消費者線程的prefetch_count,如果要針對部分消費者線程進行該屬性的設置,則需要針對RabbitListenerContainerFactory進行改造。

prefetch_count參數最佳實踐

關于prefetch_count參數的設置,RabbitMQ官方有一篇文章進行了分析:《Finding bottlenecks with RabbitMQ 3.3》。

該文章分析了消息流控的整個流程,其中提到了prefetch_count參數的一些指標:

這里指出了,如果prefetch_count的值超過了30,那么網絡帶寬限制開始占主導地位,此時進一步增加prefetch_count的值就會變得收效甚微。

也就是說,「官方是建議把prefetch_count設置為30。

這里再參看一下spring-boot-starter-amqp中對此參數定義的默認值,具體是AbstractMessageListenerContainer中的DEFAULT_PREFETCH_COUNT

如果沒有通過spring.rabbitmq.listener.direct.prefetch進行覆蓋,那么使用spring-boot-starter-amqp中的注解定義的消費者線程中設置的prefetch_count就是250。

筆者認為,應該綜合帶寬、每條消息的數據報大小、消費者線程處理的速率等等角度去考慮prefetch_count的設置。

總結如下(個人經驗僅供參考):

  • 當消費者線程的處理速度十分慢,而隊列的消息量十分少的場景下,可以考慮把prefetch_count設置為1。
  • 當隊列中的每條消息的數據報十分大的時候,要計算好客戶端可以容納的未ack總消息量的內存極限,從而設計一個合理的prefetch_count值。
  • 當消費者線程的處理速度十分快,遠遠大于RabbitMQ服務端的消息分發(fā),在網絡帶寬充足的前提下,設置可以把prefetch_count值設置為0,不做任何的消息流控。
  • 一般場景下,建議使用RabbitMQ官方的建議值30或者spring-boot-starter-amqp中的默認值250

總結

  • prefetch_countRabbitMQ服務端的參數,設置后即時生效。
  • prefetch_count對于AMQP-0-9-1中的定義與RabbitMQ中的實現不完全相同。
  • prefetch_count值設置建議使用框架提供的默認值或者通過分組實驗結合數據報大小進行計算和評估出一個合理值。

以上為個人經驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關文章

  • MyEclipse設置Console輸出到文件的實現方法

    MyEclipse設置Console輸出到文件的實現方法

    下面小編就為大家?guī)硪黄狹yEclipse設置Console輸出到文件的實現方法。小編覺得挺不錯的,現在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-07-07
  • Redis監(jiān)聽過期的key實現流程詳解

    Redis監(jiān)聽過期的key實現流程詳解

    本文主要介紹了Redis監(jiān)聽key的過期時間,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-02-02
  • 淺談Timer和TimerTask與線程的關系

    淺談Timer和TimerTask與線程的關系

    下面小編就為大家?guī)硪黄獪\談Timer和TimerTask與線程的關系。小編覺得挺不錯的,現在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-03-03
  • Eureka注冊不上或注冊后IP不對(多網卡的坑及解決)

    Eureka注冊不上或注冊后IP不對(多網卡的坑及解決)

    這篇文章主要介紹了Eureka注冊不上或注冊后IP不對(多網卡的坑及解決),具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-11-11
  • 詳解如何獨立使用ribbon實現業(yè)務客戶端負載均衡

    詳解如何獨立使用ribbon實現業(yè)務客戶端負載均衡

    這篇文章主要為大家介紹了詳解如何獨立使用ribbon實現業(yè)務客戶端負載均衡,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-06-06
  • 完美解決idea moudle沒有藍色的小方塊的問題

    完美解決idea moudle沒有藍色的小方塊的問題

    這篇文章主要介紹了完美解決idea moudle沒有藍色的小方塊的問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-02-02
  • Java面試必備之AQS阻塞隊列和條件隊列

    Java面試必備之AQS阻塞隊列和條件隊列

    我們大概知道AQS就是一個框架,把很多功能都給實現了(比如入隊規(guī)則,喚醒節(jié)點中的線程等),我們如果要使用的話只需要實現其中的一些方法(比如tryAcquire等)就行了!這次主要說說AQS中阻塞隊列的的入隊規(guī)則還有條件變量,需要的朋友可以參考下
    2021-06-06
  • Hadoop集成Spring的使用詳細教程(快速入門大數據)

    Hadoop集成Spring的使用詳細教程(快速入門大數據)

    這篇文章主要介紹了Hadoop集成Spring的使用詳細教程(快速入門大數據),本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-01-01
  • IDEA 2020.1 for Mac 下載安裝配置及出現的問題小結

    IDEA 2020.1 for Mac 下載安裝配置及出現的問題小結

    這篇文章主要介紹了IDEA 2020.1 for Mac 下載安裝配置及出現的問題小結,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-03-03
  • java開發(fā)接口吞吐量提升10多倍技巧

    java開發(fā)接口吞吐量提升10多倍技巧

    這篇文章主要為大家介紹了java開發(fā)技巧之接口吞吐量提升10多倍的方法示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-01-01

最新評論