RabbitMq中channel接口的幾種常用參數(shù)詳解
1. 背景概述
為了保證消息從隊(duì)列可靠地達(dá)到消費(fèi)者, RabbitMQ 提供了消息確認(rèn)機(jī)制( message acknowledgement), 消費(fèi)者在訂閱隊(duì)列時(shí),可以指定autoAck參數(shù),
- 當(dāng)autoAck 等于false時(shí),RabbitMQ會(huì)等待消費(fèi)者顯式地回復(fù)確認(rèn)信號后才從內(nèi)存(或者磁盤)中移去消息(實(shí)質(zhì)上是先打上刪除標(biāo)記,之后再刪除) 。
- 當(dāng)autoAck 等于true時(shí),RabbitMQ 會(huì)自動(dòng)把發(fā)送出去的消息置為確認(rèn), 然后從內(nèi)存(或者磁盤)中刪除,而不管消費(fèi)者是否真正地消費(fèi)到了這些消息。
采用消息確認(rèn)機(jī)制后,只要設(shè)置autoAck 參數(shù)為false ,消費(fèi)者就有足夠的時(shí)間處理消息(任務(wù)) ,不用擔(dān)心處理消息過程中消費(fèi)者進(jìn)程掛掉后消息丟失的問題,因?yàn)镽abbitMQ 會(huì)一直等待持有消息直到消費(fèi)者顯式調(diào)用Basic.Ack 命令為止。
當(dāng)autoAck 參數(shù)置為false ,對于RabbitMQ 服務(wù)端而言,隊(duì)列中的消息分成了兩個(gè)部分:一部分是等待投遞給消費(fèi)者的消息:一部分是己經(jīng)投遞給消費(fèi)者,但是還沒有收到消費(fèi)者確認(rèn)信號的消息。如果RabbitMQ 一直沒有收到消費(fèi)者的確認(rèn)信號,并且消費(fèi)此消息的消費(fèi)者己經(jīng)斷開連接, 則RabbitMQ 會(huì)安排該消息重新進(jìn)入隊(duì)列,等待投遞給下一個(gè)消費(fèi)者,當(dāng)然也有可能還是原來的那個(gè)消費(fèi)者。
RabbitMQ 不會(huì)為未確認(rèn)的消息設(shè)置過期時(shí)間,它判斷此消息是否需要重新投遞給消費(fèi)者的唯一依據(jù)是消費(fèi)該消息的消費(fèi)者連接是否己經(jīng)斷開, 這么設(shè)計(jì)的原因是RabbitMQ 允許消費(fèi)者消費(fèi)一條消息的時(shí)間可以很久很久。
RabbtiMQ 的Web 管理平臺(tái)(15672端口)上可以看到當(dāng)前隊(duì)列中的" Ready" 狀態(tài)和"Unacknowledged" 狀態(tài)的消息數(shù),分別對應(yīng)上文中的等待投遞給消費(fèi)者的消息數(shù)和己經(jīng)投遞給消費(fèi)者但是未收到確認(rèn)信號的消息數(shù)
也可以通過相應(yīng)的命令來查看上述信息:
rabbitmqctl list_queues name messages_ready messages_unacknowledged
在消費(fèi)者接收到消息后,如果想明確拒絕當(dāng)前的消息而不是確認(rèn),那么應(yīng)該怎么做呢?
RabbitMQ 在2 .0.0 版本開始引入了Basic.Reject 這個(gè)命令,消費(fèi)者客戶端可以調(diào)用與其對應(yīng)的channel.basicReject 方法來告訴RabbitMQ 拒絕這個(gè)消息。
Channel 類中的basicReject 方法定義如下:
void basicReject(long deliveryTag, boolean requeue) throws IOException;
其中deliveryTag 可以看作消息的編號,它是一個(gè)64 位的長整型值,最大值是9223372036854775807, 如果requeue 參數(shù)設(shè)置為true ,則RabbitMQ 會(huì)重新將這條消息存入隊(duì)列,以便可以發(fā)送給下一個(gè)訂閱的消費(fèi)者, 如果requeue 參數(shù)設(shè)置為false ,則RabbitMQ立即會(huì)把消息從隊(duì)列中移除,而不會(huì)把它發(fā)送給新的消費(fèi)者。
Basic.Reject 命令一次只能拒絕一條消息,如果想要批量拒絕消息,則可以使用Basic.Nack 這個(gè)命令, 消費(fèi)者客戶端可以調(diào)用channel.basicNack 方法來實(shí)現(xiàn),方法定義如下:
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
其中deliveryTag 和requeue 的含義可以參考basicReject 方法。
- multiple 參數(shù)設(shè)置為false 則表示拒絕編號為deliveryTag的這一條消息,這時(shí)候basicNack 和basicReject 方法一樣;
- multiple 參數(shù)設(shè)置為true 則表示拒絕deliveryTag 編號之前所有未被當(dāng)前消費(fèi)者確認(rèn)的消息。
注意要點(diǎn):
將channel.basicReject 或者channel.basicNack 中的requeue 設(shè)置為false ,可以啟用" 死信隊(duì)列 "的功能。
死信隊(duì)列可以通過檢測被拒絕或者未送達(dá)的消息來追蹤問題。 對于requeue , AMQP 中還有一個(gè)命令Basic.Recover 具備可重入隊(duì)列的特性。其對應(yīng)的客戶端方法為:
1.Basic.RecoverOk basicRecover() throws IOException;
2.Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
這個(gè)channel.basicRecover 方法用來請求RabbitMQ 重新發(fā)送還未被確認(rèn)的消息。
如果requeue 參數(shù)設(shè)置為true, 則未被確認(rèn)的消息會(huì)被重新加入到隊(duì)列中, 這樣對于同一條消息來說,可能會(huì)被分配給與之前不同的消費(fèi)者。
如果requeue 參數(shù)設(shè)置為false ,那么同一條消息會(huì)被分配給與之前相同的消費(fèi)者, 默認(rèn)情況下,如果不設(shè)置requeue 這個(gè)參數(shù),相當(dāng)于channel.basicRecover(true) ,即requeue 默認(rèn)為true
2. 通常參數(shù)解釋
- consumerTag :會(huì)話的標(biāo)簽,是固定的 ;
- deliveryTag : 每次接收消息+1,可以做此消息處理通道的名字。
因此 deliveryTag 可以用來回傳告訴 rabbitmq 這個(gè)消息處理成功 清除此消息(basicAck方法)。
3. Channel一些Api解釋
3.1. basicNack 不確認(rèn)消息
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
簡單理解就是: 不確認(rèn)deliveryTag對應(yīng)的消息
- 參數(shù)1: 消息
- 參數(shù)2: 是否應(yīng)用于多消息
- 參數(shù)3: 是否重新放回隊(duì)列,否則丟棄或者進(jìn)入死信隊(duì)列
第二個(gè)參數(shù),怎么理解basic.nack多消息,比如現(xiàn)在有多條消息去調(diào)用這個(gè)nack方法,他是怎么執(zhí)行的?
- 有個(gè)先后順序,就是調(diào)用nack時(shí),之前所有沒有ack的消息都會(huì)被標(biāo)記為nack,多條消息同時(shí)調(diào)用,則調(diào)用的這個(gè)語句執(zhí)行前,如果還有未執(zhí)行回復(fù)確認(rèn)的消息就會(huì)被回復(fù)nack,后續(xù)的消息回復(fù)nack可能只作用于當(dāng)條消息。
注意: nack后的消息也會(huì)被自己消費(fèi);
3.2. basicReject 拒絕消息
Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
簡單理解就是:拒絕deliveryTag對應(yīng)的消息
- 參數(shù)1: 消息
- 參數(shù)2: 是否重新放回隊(duì)列,否則丟棄或者進(jìn)入死信隊(duì)列
區(qū)別在于:
- basicReject一次只能拒絕接收一個(gè)消息
- basicNack方法可以支持一次0個(gè)或多個(gè)消息的拒收
3.3. RecoverOk 是否恢復(fù)消息到隊(duì)列
Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
是否恢復(fù)消息到隊(duì)列,參數(shù)是是否requeue,true則重新入隊(duì)列,并且盡可能的將之前recover的消息投遞給其他消費(fèi)者消費(fèi),而不是自己再次消費(fèi)。 false則消息會(huì)重新被投遞給自己。
3.4. exchangeDeclare 聲明交換機(jī)
有多個(gè)重載方法,這些方法都是由下面這個(gè)方法中的缺省參數(shù)構(gòu)成的
Exchange.DeclareOk exchangeDeclare(String exchange,String type,boolean durable,boolean autoDelete,boolean internal,Map<String,Object> arguments) throws IOException;
- exchange:交換機(jī)名稱
- type:交換機(jī)類型 有direct、fanout、topic三種
- durable:設(shè)置是否持久化。durable設(shè)置true表示持久化 ,服務(wù)器重啟會(huì)將Exchange(交換機(jī))存盤。注意:僅設(shè)置此選項(xiàng),不代表消息持久化。即不保證重啟后消息還在。
- autoDelete: 設(shè)置是否自動(dòng)刪除 。.當(dāng)已經(jīng)沒有消費(fèi)者時(shí),服務(wù)器是否可以刪除該Exchange。自動(dòng)刪除的前提是至少有一個(gè)隊(duì)列或者交換機(jī)與這個(gè)交換器綁定的隊(duì)列或者交換器都與之解綁;
- internal:設(shè)置是否內(nèi)置的。如果設(shè)置為true,則表示是內(nèi)置的交換器,客戶端程序無法直接發(fā)送消息到這個(gè)交換器中,只能通過交換器路由到交換器這種方式
- argument:其他一些結(jié)構(gòu)化參數(shù),比如alternate-exchange
3.5. queueDeclare 聲明隊(duì)列
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
- 隊(duì)列的名字
- 隊(duì)列里面的消息是否支持持計(jì)化
- 設(shè)置該隊(duì)列,是否可以供對個(gè)消費(fèi)者消費(fèi)
- 是否自動(dòng)刪除消息
- 其他參數(shù)
3.6. queueBind 綁定隊(duì)列
Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
- queue: 隊(duì)列名
- exchange: 交換器名稱
- routingKey :路由key或者綁定key
- arguments: 一些參數(shù)
3.7. queueUnbind 解綁隊(duì)列
Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
- queue: 隊(duì)列名
- exchange: 交換器名稱
- routingKey :路由key或者綁定key
- arguments: 一些參數(shù)
3.8. exchangeBind 綁定交換機(jī)
Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
- destination :目標(biāo)交換器
- source :源交換器
- routingKey 路由key
- arguments: 一些相關(guān)參數(shù)
消息從source交換器轉(zhuǎn)發(fā)到destination交換器存儲(chǔ)在destination綁定的隊(duì)列queue中
3.9. exchangeUnbind 解綁交換機(jī)
Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
- destination :目標(biāo)交換器
- source :源交換器
- routingKey 路由key
- arguments: 一些相關(guān)參數(shù)
3.10. basicQos 消息流量
有多個(gè)重載方法,這些方法都是由下面這個(gè)方法中的缺省參數(shù)構(gòu)成的,
void basicQos(int prefetchSize, int prefetchCount, boolean global)
- param1:prefetchSize,消息本身的大小 如果設(shè)置為0 那么表示對消息本身的大小不限制
- param2:prefetchCount,告訴rabbitmq不要一次性給消費(fèi)者推送大于N個(gè)消息
- param3:global,是否將上面的設(shè)置應(yīng)用于整個(gè)通道
- false:表示只應(yīng)用于當(dāng)前消費(fèi)者
- true:表示當(dāng)前通道的所有消費(fèi)者都應(yīng)用這個(gè)限流策略
消費(fèi)者在接收到隊(duì)列里的消息但沒有返回確認(rèn)結(jié)果之前,隊(duì)列不會(huì)將新的消息分發(fā)給該消費(fèi)者。
隊(duì)列中沒有被消費(fèi)的消息不會(huì)被刪除,還是存在于隊(duì)列中。
一般和channel.basicAck配套使用
3.11. basicAck 消息確認(rèn)
void basicAck(long deliveryTag, boolean multiple) throws IOException
- deliveryTag:該消息的index
- multiple:是否批量.true:將一次性ack所有小于deliveryTag的消息。
3.12. basicConsume 消息消費(fèi)
該重載方法有點(diǎn)多,具體我就不列舉了,參數(shù)解釋一下:
- queue:隊(duì)列名
- autoAck:是否自動(dòng)確認(rèn)消息
- deliverCallback: 當(dāng)一個(gè)消息發(fā)送過來后的回調(diào)接口
- cancelCallback:當(dāng)一個(gè)消費(fèi)者取消訂閱時(shí)的回調(diào)接口;取消消費(fèi)者訂閱隊(duì)列時(shí)除了使用{@link Channel#basicCancel}之外的所有方式都會(huì)調(diào)用該回調(diào)方法
- callback: 消費(fèi)者對象的回調(diào)接口
- shutdownSignalCallback: 當(dāng)channel/connection 關(guān)閉后回調(diào)
- arguments: 消費(fèi)的一組參數(shù)
- consumerTag: 客戶端生成的用于建立上線文的使用者標(biāo)識(shí)
- nolocal:如果服務(wù)器不應(yīng)將在此通道連接上發(fā)布的消息傳遞給此使用者,則為true;請注意RabbitMQ服務(wù)器上不支持此標(biāo)記
- exclusive: 如果是單個(gè)消費(fèi)者,則為true
啟動(dòng)一個(gè)消費(fèi)者,并返回服務(wù)端生成的消費(fèi)者標(biāo)識(shí)
3.13. basicPublish 發(fā)布消息
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
- exchange:要將消息發(fā)送到的Exchange(交換器)
- routingKey:路由的 key 是哪個(gè)
- 其他參數(shù)
- mandatory:true 如果mandatory標(biāo)記被設(shè)置
- immediate: true 如果immediate標(biāo)記被設(shè)置,注意:RabbitMQ服務(wù)端不支持此標(biāo)記
- props:其它的一些屬性,如:{@linkMessageProperties.PERSISTENT_TEXT_PLAIN}
- body:發(fā)送消息的消息體
3.14. basicGet 主動(dòng)拉取隊(duì)列中的一條消息
GetResponse basicGet(String queue, boolean autoAck)
- 參數(shù)1: 隊(duì)列名
- 參數(shù)2: 是否自動(dòng)確認(rèn)
3.15. basicCancel 取消消費(fèi)者對隊(duì)列的訂閱關(guān)系
void basicCancel(String consumerTag)
consumerTag:服務(wù)器端生成的消費(fèi)者標(biāo)識(shí)
4. 消息確認(rèn)一些觀點(diǎn)
- 消息監(jiān)聽內(nèi)必須使用channel對消息進(jìn)行確認(rèn),不管是確認(rèn)消費(fèi)成功還是確認(rèn)消費(fèi)失敗
- 消息監(jiān)聽內(nèi)的異常處理有兩種方式:
- 內(nèi)部catch后直接處理,然后使用channel對消息進(jìn)行確認(rèn)
- 配置RepublishMessageRecoverer將處理異常的消息發(fā)送到指定隊(duì)列專門處理或記錄
- 監(jiān)聽的方法內(nèi)拋出異常貌似沒有太大用處。因?yàn)閽伋霎惓>退闶侵卦囈卜浅S锌赡軙?huì)繼續(xù)出現(xiàn)異常,當(dāng)重試次數(shù)完了之后消息就只有重啟應(yīng)用才能接收到了,很有可能導(dǎo)致消息消費(fèi)不及時(shí)。當(dāng)然可以配置RepublishMessageRecoverer來解決,但是萬一RepublishMessageRecoverer發(fā)送失敗了呢。。那就可能造成消息消費(fèi)不及時(shí)了。所以即使需要將處理出現(xiàn)異常的消息統(tǒng)一放到另外隊(duì)列去處理,個(gè)人建議兩種方式:
- catch異常后,手動(dòng)發(fā)送到指定隊(duì)列,然后使用channel給rabbitmq確認(rèn)消息已消費(fèi)
- 給Queue綁定死信隊(duì)列,使用nack(requque為false)確認(rèn)消息消費(fèi)失敗
到此這篇關(guān)于RabbitMq中channel接口的幾種常用參數(shù)詳解的文章就介紹到這了,更多相關(guān)channel接口常用參數(shù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
用Maven打成可執(zhí)行jar,包含maven依賴,本地依賴的操作
這篇文章主要介紹了用Maven打成可執(zhí)行jar,包含maven依賴,本地依賴的操作,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-08-08VerifyCodeServlet(一次性驗(yàn)證碼)
這篇文章主要介紹了VerifyCodeServlet一次性驗(yàn)證碼的使用方法2017-05-05Spring定時(shí)服務(wù)QuartZ原理及代碼案例
這篇文章主要介紹了Spring定時(shí)服務(wù)QuartZ原理及代碼案例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-11-11springboot?集成identityserver4身份驗(yàn)證的過程解析
這篇文章主要介紹了springboot?集成identityserver4身份驗(yàn)證的相關(guān)知識(shí),本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2024-01-01基于JAVA的短信驗(yàn)證碼api調(diào)用代碼實(shí)例
這篇文章主要為大家詳細(xì)介紹了基于JAVA的短信驗(yàn)證碼api調(diào)用代碼實(shí)例,感興趣的小伙伴們可以參考一下2016-05-05Java那些鮮為人知的關(guān)鍵字volatile詳析
這篇文章主要給大家介紹了關(guān)于Java那些鮮為人知的關(guān)鍵字volatile的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-03-03Springboot使用influxDB時(shí)序數(shù)據(jù)庫的實(shí)現(xiàn)
項(xiàng)目中需要存放大量設(shè)備日志,且需要對其進(jìn)行簡單的數(shù)據(jù)分析,信息提取工作,所以本文就介紹一下Springboot使用influxDB時(shí)序數(shù)據(jù)庫,感興趣的可以了解一下2021-08-08