RabbitMQ中的prefetch_count參數(shù)詳解
前提
在某一次用戶(hù)標(biāo)簽服務(wù)中大量用到異步流程,使用了RabbitMQ
進(jìn)行解耦。
其中,為了提高消費(fèi)者的處理效率針對(duì)了不同節(jié)點(diǎn)任務(wù)的消費(fèi)者線程數(shù)和prefetch_count
參數(shù)都做了調(diào)整和測(cè)試,得到一個(gè)相對(duì)合理的組合。
這里深入分析一下prefetch_count
參數(shù)在RabbitMQ
中的作用。
prefetch_count參數(shù)的含義
先從AMQP
(Advanced Message Queuing Protocol
,即高級(jí)消息隊(duì)列協(xié)議,RabbitMQ
實(shí)現(xiàn)了此協(xié)議的0-9-1
版本的大部分內(nèi)容)和RabbitMQ
的具體實(shí)現(xiàn)去理解prefetch_count
參數(shù)的含義,可以查閱對(duì)應(yīng)的文檔(見(jiàn)文末參考資料)。
AMQP 0-9-1
定義了basic.qos
方法去限制消費(fèi)者基于某一個(gè)Channel
或者Connection
上未進(jìn)行ack
的最大消息數(shù)量上限。
basic.qos
方法支持兩個(gè)參數(shù):
global
:布爾值。prefetch_count
:整數(shù)。
這兩個(gè)參數(shù)在AMQP 0-9-1
定義中的含義和RabbitMQ
具體實(shí)現(xiàn)時(shí)有所不同,見(jiàn)下表:
global參數(shù)值 | AMQP 0-9-1中prefetch_count參數(shù)的含義 | RabbitMQ中prefetch_count參數(shù)的含義 |
---|---|---|
false | prefetch_count值在當(dāng)前Channel的所有消費(fèi)者共享 | prefetch_count對(duì)于基于當(dāng)前Channel創(chuàng)建的消費(fèi)者生效 |
true | prefetch_count值在當(dāng)前Connection的所有消費(fèi)者共享 | prefetch_count值在當(dāng)前Channel的所有消費(fèi)者共享 |
或者用簡(jiǎn)潔的英文表格理解:
global | prefetch_count in AMQP 0-9-1 | prefetch_count in RabbitMQ |
---|---|---|
false | Per channel limit | Per customer limit |
true | Per connection limit | Per channel limit |
這里畫(huà)一個(gè)圖理解一下:
上圖僅僅為了區(qū)分協(xié)議本身和RabbitMQ
中實(shí)現(xiàn)的不同,接著說(shuō)說(shuō)prefetch_count
對(duì)于消費(fèi)者(線程)和待消費(fèi)消息的作用。
假定一個(gè)前提:RabbitMQ
客戶(hù)端從RabbitMQ
服務(wù)端獲取到隊(duì)列消息的速度比消費(fèi)者線程消費(fèi)速度快,目前有兩個(gè)消費(fèi)者線程共用一個(gè)Channel
實(shí)例。
當(dāng)global
參數(shù)為false
時(shí)候,效果如下:
而當(dāng)global
參數(shù)為true
時(shí)候,效果如下:
在消費(fèi)者線程處理速度遠(yuǎn)低于RabbitMQ
客戶(hù)端從RabbitMQ
服務(wù)端獲取到隊(duì)列消息的速度的場(chǎng)景下,prefetch_count
條未進(jìn)行ack
的消息會(huì)暫時(shí)存放在一個(gè)隊(duì)列(準(zhǔn)確來(lái)說(shuō)是阻塞隊(duì)列,然后阻塞隊(duì)列中的消息任務(wù)會(huì)流轉(zhuǎn)到一個(gè)列表中遍歷回調(diào)消費(fèi)者句柄,見(jiàn)下一節(jié)的源碼分析)中等待被消費(fèi)者處理。
這部分消息會(huì)占據(jù)JVM
的堆內(nèi)存,所以在性能調(diào)優(yōu)或者設(shè)定應(yīng)用程序的初始化和最大堆內(nèi)存的時(shí)候,如果剛好用到RabbitMQ
的消費(fèi)者,必須要考慮這些"預(yù)取消息"的內(nèi)存占用量。
不過(guò)值得注意的是:「prefetch_count
是RabbitMQ
服務(wù)端的參數(shù),它的設(shè)置值或者快照都不會(huì)存放在RabbitMQ
客戶(hù)端」。
同時(shí)需要注意prefetch_count
生效的條件和特性(從參數(shù)設(shè)置的一些demo
和源碼上感知):
prefetch_count
參數(shù)僅僅在basic.consume
的autoAck
參數(shù)設(shè)置為false
的前提下才生效,也就是不能使用自動(dòng)確認(rèn),自動(dòng)確認(rèn)的消息沒(méi)有辦法限流。basic.consume
如果在非自動(dòng)確認(rèn)模式下忘記了手動(dòng)調(diào)用basic.ack
,那么prefetch_count
正是未ack
消息數(shù)量的最大上限。prefetch_count
是由RabbitMQ
服務(wù)端控制,一般情況下能保證各個(gè)消費(fèi)者線程中的未ack
消息分發(fā)是均衡的,這點(diǎn)筆者猜測(cè)是consumerTag
起到了關(guān)鍵作用。
RabbitMQ客戶(hù)端中prefetch_count源碼跟蹤
編寫(xiě)本文的時(shí)候引入的RabbitMQ客戶(hù)端版本為:com.rabbitmq:amqp-client:5.9.0
上面說(shuō)了這么多都只是根據(jù)官方的文檔或者博客中的理論依據(jù)進(jìn)行分析,其實(shí)更加根本的分析方法是直接閱讀RabbitMQ
的Java
客戶(hù)端源碼,主要是針對(duì)basic.qos
和basic.consume
兩個(gè)方法,對(duì)應(yīng)的是com.rabbitmq.client.impl.ChannelN#basicQos()
和com.rabbitmq.client.impl.ChannelN#basicConsume()
兩個(gè)方法。
先看ChannelN#basicQos()
:
這里的basicQos()
方法多了一個(gè)prefetchSize
參數(shù),用于限制分發(fā)內(nèi)容的大小上限,默認(rèn)值0
代表無(wú)限制,而prefetchCount
的取值范圍是[0,65535]
,取值為0
也是代表無(wú)限制。
這里的ChannelN#basicQos()
實(shí)現(xiàn)中直接封裝basic.qos
方法參數(shù)進(jìn)行一次RPC
調(diào)用,意味著直接更變RabbitMQ
服務(wù)端的配置,即時(shí)生效,同時(shí)參數(shù)值完全沒(méi)有保存在客戶(hù)端代碼中,印證了前面一節(jié)的結(jié)論。
接著看ChannelN#basicConsume()
方法:
上圖已經(jīng)把關(guān)鍵部分用紅圈圈出,因?yàn)檎麄€(gè)消息消費(fèi)過(guò)程是異步的,涉及太多的類(lèi)和方法,這里不全量貼出,整理了一個(gè)流程圖:
整個(gè)消息消費(fèi)過(guò)程,prefetch_count
參數(shù)并未出現(xiàn)在客戶(hù)端代碼中,又再次印證了前面一節(jié)的結(jié)論,即prefetch_count
參數(shù)的行為和作用完全由RabbitMQ
服務(wù)端控制。
而最終Customer
或者常用的DefaultCustomer
句柄是在WorkPoolRunnable
中回調(diào)的,這類(lèi)任務(wù)的執(zhí)行線程來(lái)自于ConsumerWorkService
內(nèi)部的線程池,而這個(gè)線程池又使用了Executors.newFixedThreadPool()
去構(gòu)建,使用了默認(rèn)的線程工廠類(lèi),因此在Customer#handleDelivery()
方法內(nèi)部打印的線程名稱(chēng)的樣子是pool-1-thread-*
。
這里VariableLinkedBlockingQueue就是前一節(jié)中的message queue的原型
prefetch_count參數(shù)使用
設(shè)置prefetch_count
參數(shù)比較簡(jiǎn)單,就是調(diào)用Channel#basicQos()
方法:
public class RabbitQos { static String QUEUE = "qos.test"; public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE, true, false, false, null); channel.basicQos(2); channel.basicConsume("qos.test", false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("1------" + Thread.currentThread().getName()); sleep(); } }); channel.basicConsume("qos.test", false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("2------" + Thread.currentThread().getName()); sleep(); } }); for (int i = 0; i < 20; i++) { channel.basicPublish("", QUEUE, MessageProperties.TEXT_PLAIN, String.valueOf(i).getBytes()); } sleep(); } private static void sleep() { try { Thread.sleep(Long.MAX_VALUE); } catch (Exception ignore) { } } }
上面是原生的amqp-client
的寫(xiě)法,如果使用了spring-amqp
(spring-boot-starter-amqp
),可以通過(guò)配置文件中的spring.rabbitmq.listener.direct.prefetch
屬性指定所有消費(fèi)者線程的prefetch_count
,如果要針對(duì)部分消費(fèi)者線程進(jìn)行該屬性的設(shè)置,則需要針對(duì)RabbitListenerContainerFactory
進(jìn)行改造。
prefetch_count參數(shù)最佳實(shí)踐
關(guān)于prefetch_count
參數(shù)的設(shè)置,RabbitMQ
官方有一篇文章進(jìn)行了分析:《Finding bottlenecks with RabbitMQ 3.3》。
該文章分析了消息流控的整個(gè)流程,其中提到了prefetch_count
參數(shù)的一些指標(biāo):
這里指出了,如果prefetch_count
的值超過(guò)了30
,那么網(wǎng)絡(luò)帶寬限制開(kāi)始占主導(dǎo)地位,此時(shí)進(jìn)一步增加prefetch_count
的值就會(huì)變得收效甚微。
也就是說(shuō),「官方是建議把prefetch_count
設(shè)置為30
」。
這里再參看一下spring-boot-starter-amqp
中對(duì)此參數(shù)定義的默認(rèn)值,具體是AbstractMessageListenerContainer
中的DEFAULT_PREFETCH_COUNT
:
如果沒(méi)有通過(guò)spring.rabbitmq.listener.direct.prefetch
進(jìn)行覆蓋,那么使用spring-boot-starter-amqp
中的注解定義的消費(fèi)者線程中設(shè)置的prefetch_count
就是250
。
筆者認(rèn)為,應(yīng)該綜合帶寬、每條消息的數(shù)據(jù)報(bào)大小、消費(fèi)者線程處理的速率等等角度去考慮prefetch_count
的設(shè)置。
總結(jié)如下(個(gè)人經(jīng)驗(yàn)僅供參考):
- 當(dāng)消費(fèi)者線程的處理速度十分慢,而隊(duì)列的消息量十分少的場(chǎng)景下,可以考慮把
prefetch_count
設(shè)置為1
。 - 當(dāng)隊(duì)列中的每條消息的數(shù)據(jù)報(bào)十分大的時(shí)候,要計(jì)算好客戶(hù)端可以容納的未
ack
總消息量的內(nèi)存極限,從而設(shè)計(jì)一個(gè)合理的prefetch_count
值。 - 當(dāng)消費(fèi)者線程的處理速度十分快,遠(yuǎn)遠(yuǎn)大于
RabbitMQ
服務(wù)端的消息分發(fā),在網(wǎng)絡(luò)帶寬充足的前提下,設(shè)置可以把prefetch_count
值設(shè)置為0
,不做任何的消息流控。 - 一般場(chǎng)景下,建議使用
RabbitMQ
官方的建議值30
或者spring-boot-starter-amqp
中的默認(rèn)值250
。
總結(jié)
prefetch_count
是RabbitMQ
服務(wù)端的參數(shù),設(shè)置后即時(shí)生效。prefetch_count
對(duì)于AMQP-0-9-1
中的定義與RabbitMQ
中的實(shí)現(xiàn)不完全相同。prefetch_count
值設(shè)置建議使用框架提供的默認(rèn)值或者通過(guò)分組實(shí)驗(yàn)結(jié)合數(shù)據(jù)報(bào)大小進(jìn)行計(jì)算和評(píng)估出一個(gè)合理值。
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
MyEclipse設(shè)置Console輸出到文件的實(shí)現(xiàn)方法
下面小編就為大家?guī)?lái)一篇MyEclipse設(shè)置Console輸出到文件的實(shí)現(xiàn)方法。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-07-07Redis監(jiān)聽(tīng)過(guò)期的key實(shí)現(xiàn)流程詳解
本文主要介紹了Redis監(jiān)聽(tīng)key的過(guò)期時(shí)間,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-02-02Eureka注冊(cè)不上或注冊(cè)后IP不對(duì)(多網(wǎng)卡的坑及解決)
這篇文章主要介紹了Eureka注冊(cè)不上或注冊(cè)后IP不對(duì)(多網(wǎng)卡的坑及解決),具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-11-11詳解如何獨(dú)立使用ribbon實(shí)現(xiàn)業(yè)務(wù)客戶(hù)端負(fù)載均衡
這篇文章主要為大家介紹了詳解如何獨(dú)立使用ribbon實(shí)現(xiàn)業(yè)務(wù)客戶(hù)端負(fù)載均衡,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-06-06完美解決idea moudle沒(méi)有藍(lán)色的小方塊的問(wèn)題
這篇文章主要介紹了完美解決idea moudle沒(méi)有藍(lán)色的小方塊的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-02-02Java面試必備之AQS阻塞隊(duì)列和條件隊(duì)列
我們大概知道AQS就是一個(gè)框架,把很多功能都給實(shí)現(xiàn)了(比如入隊(duì)規(guī)則,喚醒節(jié)點(diǎn)中的線程等),我們?nèi)绻褂玫脑?huà)只需要實(shí)現(xiàn)其中的一些方法(比如tryAcquire等)就行了!這次主要說(shuō)說(shuō)AQS中阻塞隊(duì)列的的入隊(duì)規(guī)則還有條件變量,需要的朋友可以參考下2021-06-06Hadoop集成Spring的使用詳細(xì)教程(快速入門(mén)大數(shù)據(jù))
這篇文章主要介紹了Hadoop集成Spring的使用詳細(xì)教程(快速入門(mén)大數(shù)據(jù)),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-01-01IDEA 2020.1 for Mac 下載安裝配置及出現(xiàn)的問(wèn)題小結(jié)
這篇文章主要介紹了IDEA 2020.1 for Mac 下載安裝配置及出現(xiàn)的問(wèn)題小結(jié),本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-03-03