欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RabbitMQ中的prefetch_count參數(shù)詳解

 更新時(shí)間:2023年11月27日 14:21:19   作者:Throwable文摘  
這篇文章主要介紹了RabbitMQ中的prefetch_count參數(shù)用法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教

前提

在某一次用戶(hù)標(biāo)簽服務(wù)中大量用到異步流程,使用了RabbitMQ進(jìn)行解耦。

其中,為了提高消費(fèi)者的處理效率針對(duì)了不同節(jié)點(diǎn)任務(wù)的消費(fèi)者線程數(shù)和prefetch_count參數(shù)都做了調(diào)整和測(cè)試,得到一個(gè)相對(duì)合理的組合。

這里深入分析一下prefetch_count參數(shù)在RabbitMQ中的作用。

prefetch_count參數(shù)的含義

先從AMQPAdvanced Message Queuing Protocol,即高級(jí)消息隊(duì)列協(xié)議,RabbitMQ實(shí)現(xiàn)了此協(xié)議的0-9-1版本的大部分內(nèi)容)和RabbitMQ的具體實(shí)現(xiàn)去理解prefetch_count參數(shù)的含義,可以查閱對(duì)應(yīng)的文檔(見(jiàn)文末參考資料)。

AMQP 0-9-1定義了basic.qos方法去限制消費(fèi)者基于某一個(gè)Channel或者Connection上未進(jìn)行ack的最大消息數(shù)量上限。

basic.qos方法支持兩個(gè)參數(shù):

  • global:布爾值。
  • prefetch_count:整數(shù)。

這兩個(gè)參數(shù)在AMQP 0-9-1定義中的含義和RabbitMQ具體實(shí)現(xiàn)時(shí)有所不同,見(jiàn)下表:

global參數(shù)值AMQP 0-9-1中prefetch_count參數(shù)的含義RabbitMQ中prefetch_count參數(shù)的含義
falseprefetch_count值在當(dāng)前Channel的所有消費(fèi)者共享prefetch_count對(duì)于基于當(dāng)前Channel創(chuàng)建的消費(fèi)者生效
trueprefetch_count值在當(dāng)前Connection的所有消費(fèi)者共享prefetch_count值在當(dāng)前Channel的所有消費(fèi)者共享

或者用簡(jiǎn)潔的英文表格理解:

globalprefetch_count in AMQP 0-9-1prefetch_count in RabbitMQ
falsePer channel limitPer customer limit
truePer connection limitPer channel limit

這里畫(huà)一個(gè)圖理解一下:

上圖僅僅為了區(qū)分協(xié)議本身和RabbitMQ中實(shí)現(xiàn)的不同,接著說(shuō)說(shuō)prefetch_count對(duì)于消費(fèi)者(線程)和待消費(fèi)消息的作用。

假定一個(gè)前提:RabbitMQ客戶(hù)端從RabbitMQ服務(wù)端獲取到隊(duì)列消息的速度比消費(fèi)者線程消費(fèi)速度快,目前有兩個(gè)消費(fèi)者線程共用一個(gè)Channel實(shí)例。

當(dāng)global參數(shù)為false時(shí)候,效果如下:

而當(dāng)global參數(shù)為true時(shí)候,效果如下:

在消費(fèi)者線程處理速度遠(yuǎn)低于RabbitMQ客戶(hù)端從RabbitMQ服務(wù)端獲取到隊(duì)列消息的速度的場(chǎng)景下,prefetch_count條未進(jìn)行ack的消息會(huì)暫時(shí)存放在一個(gè)隊(duì)列(準(zhǔn)確來(lái)說(shuō)是阻塞隊(duì)列,然后阻塞隊(duì)列中的消息任務(wù)會(huì)流轉(zhuǎn)到一個(gè)列表中遍歷回調(diào)消費(fèi)者句柄,見(jiàn)下一節(jié)的源碼分析)中等待被消費(fèi)者處理。

這部分消息會(huì)占據(jù)JVM的堆內(nèi)存,所以在性能調(diào)優(yōu)或者設(shè)定應(yīng)用程序的初始化和最大堆內(nèi)存的時(shí)候,如果剛好用到RabbitMQ的消費(fèi)者,必須要考慮這些"預(yù)取消息"的內(nèi)存占用量。

不過(guò)值得注意的是:prefetch_countRabbitMQ服務(wù)端的參數(shù),它的設(shè)置值或者快照都不會(huì)存放在RabbitMQ客戶(hù)端」

同時(shí)需要注意prefetch_count生效的條件和特性(從參數(shù)設(shè)置的一些demo和源碼上感知):

  • prefetch_count參數(shù)僅僅在basic.consumeautoAck參數(shù)設(shè)置為false的前提下才生效,也就是不能使用自動(dòng)確認(rèn),自動(dòng)確認(rèn)的消息沒(méi)有辦法限流。
  • basic.consume如果在非自動(dòng)確認(rèn)模式下忘記了手動(dòng)調(diào)用basic.ack,那么prefetch_count正是未ack消息數(shù)量的最大上限。
  • prefetch_count是由RabbitMQ服務(wù)端控制,一般情況下能保證各個(gè)消費(fèi)者線程中的未ack消息分發(fā)是均衡的,這點(diǎn)筆者猜測(cè)是consumerTag起到了關(guān)鍵作用。

RabbitMQ客戶(hù)端中prefetch_count源碼跟蹤

編寫(xiě)本文的時(shí)候引入的RabbitMQ客戶(hù)端版本為:com.rabbitmq:amqp-client:5.9.0

上面說(shuō)了這么多都只是根據(jù)官方的文檔或者博客中的理論依據(jù)進(jìn)行分析,其實(shí)更加根本的分析方法是直接閱讀RabbitMQJava客戶(hù)端源碼,主要是針對(duì)basic.qosbasic.consume兩個(gè)方法,對(duì)應(yīng)的是com.rabbitmq.client.impl.ChannelN#basicQos()com.rabbitmq.client.impl.ChannelN#basicConsume()兩個(gè)方法。

先看ChannelN#basicQos()

這里的basicQos()方法多了一個(gè)prefetchSize參數(shù),用于限制分發(fā)內(nèi)容的大小上限,默認(rèn)值0代表無(wú)限制,而prefetchCount的取值范圍是[0,65535],取值為0也是代表無(wú)限制。

這里的ChannelN#basicQos()實(shí)現(xiàn)中直接封裝basic.qos方法參數(shù)進(jìn)行一次RPC調(diào)用,意味著直接更變RabbitMQ服務(wù)端的配置,即時(shí)生效,同時(shí)參數(shù)值完全沒(méi)有保存在客戶(hù)端代碼中,印證了前面一節(jié)的結(jié)論。

接著看ChannelN#basicConsume()方法:

上圖已經(jīng)把關(guān)鍵部分用紅圈圈出,因?yàn)檎麄€(gè)消息消費(fèi)過(guò)程是異步的,涉及太多的類(lèi)和方法,這里不全量貼出,整理了一個(gè)流程圖:

整個(gè)消息消費(fèi)過(guò)程,prefetch_count參數(shù)并未出現(xiàn)在客戶(hù)端代碼中,又再次印證了前面一節(jié)的結(jié)論,即prefetch_count參數(shù)的行為和作用完全由RabbitMQ服務(wù)端控制。

而最終Customer或者常用的DefaultCustomer句柄是在WorkPoolRunnable中回調(diào)的,這類(lèi)任務(wù)的執(zhí)行線程來(lái)自于ConsumerWorkService內(nèi)部的線程池,而這個(gè)線程池又使用了Executors.newFixedThreadPool()去構(gòu)建,使用了默認(rèn)的線程工廠類(lèi),因此在Customer#handleDelivery()方法內(nèi)部打印的線程名稱(chēng)的樣子是pool-1-thread-*

這里VariableLinkedBlockingQueue就是前一節(jié)中的message queue的原型

prefetch_count參數(shù)使用

設(shè)置prefetch_count參數(shù)比較簡(jiǎn)單,就是調(diào)用Channel#basicQos()方法:

public class RabbitQos {
 
    static String QUEUE = "qos.test";
 
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE, true, false, false, null);
        channel.basicQos(2);
        channel.basicConsume("qos.test", false, new DefaultConsumer(channel) {
 
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("1------" + Thread.currentThread().getName());
                sleep();
            }
        });
        channel.basicConsume("qos.test", false, new DefaultConsumer(channel) {
 
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("2------" + Thread.currentThread().getName());
                sleep();
            }
        });
        for (int i = 0; i < 20; i++) {
            channel.basicPublish("", QUEUE, MessageProperties.TEXT_PLAIN, String.valueOf(i).getBytes());
        }
        sleep();
    }
 
    private static void sleep() {
        try {
            Thread.sleep(Long.MAX_VALUE);
        } catch (Exception ignore) {
 
        }
    }
}

上面是原生的amqp-client的寫(xiě)法,如果使用了spring-amqpspring-boot-starter-amqp),可以通過(guò)配置文件中的spring.rabbitmq.listener.direct.prefetch屬性指定所有消費(fèi)者線程的prefetch_count,如果要針對(duì)部分消費(fèi)者線程進(jìn)行該屬性的設(shè)置,則需要針對(duì)RabbitListenerContainerFactory進(jìn)行改造。

prefetch_count參數(shù)最佳實(shí)踐

關(guān)于prefetch_count參數(shù)的設(shè)置,RabbitMQ官方有一篇文章進(jìn)行了分析:《Finding bottlenecks with RabbitMQ 3.3》。

該文章分析了消息流控的整個(gè)流程,其中提到了prefetch_count參數(shù)的一些指標(biāo):

這里指出了,如果prefetch_count的值超過(guò)了30,那么網(wǎng)絡(luò)帶寬限制開(kāi)始占主導(dǎo)地位,此時(shí)進(jìn)一步增加prefetch_count的值就會(huì)變得收效甚微。

也就是說(shuō),「官方是建議把prefetch_count設(shè)置為30。

這里再參看一下spring-boot-starter-amqp中對(duì)此參數(shù)定義的默認(rèn)值,具體是AbstractMessageListenerContainer中的DEFAULT_PREFETCH_COUNT

如果沒(méi)有通過(guò)spring.rabbitmq.listener.direct.prefetch進(jìn)行覆蓋,那么使用spring-boot-starter-amqp中的注解定義的消費(fèi)者線程中設(shè)置的prefetch_count就是250。

筆者認(rèn)為,應(yīng)該綜合帶寬、每條消息的數(shù)據(jù)報(bào)大小、消費(fèi)者線程處理的速率等等角度去考慮prefetch_count的設(shè)置。

總結(jié)如下(個(gè)人經(jīng)驗(yàn)僅供參考):

  • 當(dāng)消費(fèi)者線程的處理速度十分慢,而隊(duì)列的消息量十分少的場(chǎng)景下,可以考慮把prefetch_count設(shè)置為1
  • 當(dāng)隊(duì)列中的每條消息的數(shù)據(jù)報(bào)十分大的時(shí)候,要計(jì)算好客戶(hù)端可以容納的未ack總消息量的內(nèi)存極限,從而設(shè)計(jì)一個(gè)合理的prefetch_count值。
  • 當(dāng)消費(fèi)者線程的處理速度十分快,遠(yuǎn)遠(yuǎn)大于RabbitMQ服務(wù)端的消息分發(fā),在網(wǎng)絡(luò)帶寬充足的前提下,設(shè)置可以把prefetch_count值設(shè)置為0,不做任何的消息流控。
  • 一般場(chǎng)景下,建議使用RabbitMQ官方的建議值30或者spring-boot-starter-amqp中的默認(rèn)值250

總結(jié)

  • prefetch_countRabbitMQ服務(wù)端的參數(shù),設(shè)置后即時(shí)生效。
  • prefetch_count對(duì)于AMQP-0-9-1中的定義與RabbitMQ中的實(shí)現(xiàn)不完全相同。
  • prefetch_count值設(shè)置建議使用框架提供的默認(rèn)值或者通過(guò)分組實(shí)驗(yàn)結(jié)合數(shù)據(jù)報(bào)大小進(jìn)行計(jì)算和評(píng)估出一個(gè)合理值。

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

最新評(píng)論