RabbitMq中channel接口的幾種常用參數(shù)詳解
1. 背景概述
為了保證消息從隊列可靠地達(dá)到消費者, RabbitMQ 提供了消息確認(rèn)機(jī)制( message acknowledgement), 消費者在訂閱隊列時,可以指定autoAck參數(shù),
- 當(dāng)autoAck 等于false時,RabbitMQ會等待消費者顯式地回復(fù)確認(rèn)信號后才從內(nèi)存(或者磁盤)中移去消息(實質(zhì)上是先打上刪除標(biāo)記,之后再刪除) 。
- 當(dāng)autoAck 等于true時,RabbitMQ 會自動把發(fā)送出去的消息置為確認(rèn), 然后從內(nèi)存(或者磁盤)中刪除,而不管消費者是否真正地消費到了這些消息。
采用消息確認(rèn)機(jī)制后,只要設(shè)置autoAck 參數(shù)為false ,消費者就有足夠的時間處理消息(任務(wù)) ,不用擔(dān)心處理消息過程中消費者進(jìn)程掛掉后消息丟失的問題,因為RabbitMQ 會一直等待持有消息直到消費者顯式調(diào)用Basic.Ack 命令為止。
當(dāng)autoAck 參數(shù)置為false ,對于RabbitMQ 服務(wù)端而言,隊列中的消息分成了兩個部分:一部分是等待投遞給消費者的消息:一部分是己經(jīng)投遞給消費者,但是還沒有收到消費者確認(rèn)信號的消息。如果RabbitMQ 一直沒有收到消費者的確認(rèn)信號,并且消費此消息的消費者己經(jīng)斷開連接, 則RabbitMQ 會安排該消息重新進(jìn)入隊列,等待投遞給下一個消費者,當(dāng)然也有可能還是原來的那個消費者。
RabbitMQ 不會為未確認(rèn)的消息設(shè)置過期時間,它判斷此消息是否需要重新投遞給消費者的唯一依據(jù)是消費該消息的消費者連接是否己經(jīng)斷開, 這么設(shè)計的原因是RabbitMQ 允許消費者消費一條消息的時間可以很久很久。
RabbtiMQ 的Web 管理平臺(15672端口)上可以看到當(dāng)前隊列中的" Ready" 狀態(tài)和"Unacknowledged" 狀態(tài)的消息數(shù),分別對應(yīng)上文中的等待投遞給消費者的消息數(shù)和己經(jīng)投遞給消費者但是未收到確認(rèn)信號的消息數(shù)
也可以通過相應(yīng)的命令來查看上述信息:
rabbitmqctl list_queues name messages_ready messages_unacknowledged
在消費者接收到消息后,如果想明確拒絕當(dāng)前的消息而不是確認(rèn),那么應(yīng)該怎么做呢?
RabbitMQ 在2 .0.0 版本開始引入了Basic.Reject 這個命令,消費者客戶端可以調(diào)用與其對應(yīng)的channel.basicReject 方法來告訴RabbitMQ 拒絕這個消息。
Channel 類中的basicReject 方法定義如下:
void basicReject(long deliveryTag, boolean requeue) throws IOException;
其中deliveryTag 可以看作消息的編號,它是一個64 位的長整型值,最大值是9223372036854775807, 如果requeue 參數(shù)設(shè)置為true ,則RabbitMQ 會重新將這條消息存入隊列,以便可以發(fā)送給下一個訂閱的消費者, 如果requeue 參數(shù)設(shè)置為false ,則RabbitMQ立即會把消息從隊列中移除,而不會把它發(fā)送給新的消費者。
Basic.Reject 命令一次只能拒絕一條消息,如果想要批量拒絕消息,則可以使用Basic.Nack 這個命令, 消費者客戶端可以調(diào)用channel.basicNack 方法來實現(xiàn),方法定義如下:
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
其中deliveryTag 和requeue 的含義可以參考basicReject 方法。
- multiple 參數(shù)設(shè)置為false 則表示拒絕編號為deliveryTag的這一條消息,這時候basicNack 和basicReject 方法一樣;
- multiple 參數(shù)設(shè)置為true 則表示拒絕deliveryTag 編號之前所有未被當(dāng)前消費者確認(rèn)的消息。
注意要點:
將channel.basicReject 或者channel.basicNack 中的requeue 設(shè)置為false ,可以啟用" 死信隊列 "的功能。
死信隊列可以通過檢測被拒絕或者未送達(dá)的消息來追蹤問題。 對于requeue , AMQP 中還有一個命令Basic.Recover 具備可重入隊列的特性。其對應(yīng)的客戶端方法為:
1.Basic.RecoverOk basicRecover() throws IOException;
2.Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
這個channel.basicRecover 方法用來請求RabbitMQ 重新發(fā)送還未被確認(rèn)的消息。
如果requeue 參數(shù)設(shè)置為true, 則未被確認(rèn)的消息會被重新加入到隊列中, 這樣對于同一條消息來說,可能會被分配給與之前不同的消費者。
如果requeue 參數(shù)設(shè)置為false ,那么同一條消息會被分配給與之前相同的消費者, 默認(rèn)情況下,如果不設(shè)置requeue 這個參數(shù),相當(dāng)于channel.basicRecover(true) ,即requeue 默認(rèn)為true
2. 通常參數(shù)解釋
- consumerTag :會話的標(biāo)簽,是固定的 ;
- deliveryTag : 每次接收消息+1,可以做此消息處理通道的名字。
因此 deliveryTag 可以用來回傳告訴 rabbitmq 這個消息處理成功 清除此消息(basicAck方法)。
3. Channel一些Api解釋
3.1. basicNack 不確認(rèn)消息
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
簡單理解就是: 不確認(rèn)deliveryTag對應(yīng)的消息
- 參數(shù)1: 消息
- 參數(shù)2: 是否應(yīng)用于多消息
- 參數(shù)3: 是否重新放回隊列,否則丟棄或者進(jìn)入死信隊列
第二個參數(shù),怎么理解basic.nack多消息,比如現(xiàn)在有多條消息去調(diào)用這個nack方法,他是怎么執(zhí)行的?
- 有個先后順序,就是調(diào)用nack時,之前所有沒有ack的消息都會被標(biāo)記為nack,多條消息同時調(diào)用,則調(diào)用的這個語句執(zhí)行前,如果還有未執(zhí)行回復(fù)確認(rèn)的消息就會被回復(fù)nack,后續(xù)的消息回復(fù)nack可能只作用于當(dāng)條消息。
注意: nack后的消息也會被自己消費;
3.2. basicReject 拒絕消息
Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
簡單理解就是:拒絕deliveryTag對應(yīng)的消息
- 參數(shù)1: 消息
- 參數(shù)2: 是否重新放回隊列,否則丟棄或者進(jìn)入死信隊列
區(qū)別在于:
- basicReject一次只能拒絕接收一個消息
- basicNack方法可以支持一次0個或多個消息的拒收
3.3. RecoverOk 是否恢復(fù)消息到隊列
Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
是否恢復(fù)消息到隊列,參數(shù)是是否requeue,true則重新入隊列,并且盡可能的將之前recover的消息投遞給其他消費者消費,而不是自己再次消費。 false則消息會重新被投遞給自己。
3.4. exchangeDeclare 聲明交換機(jī)
有多個重載方法,這些方法都是由下面這個方法中的缺省參數(shù)構(gòu)成的
Exchange.DeclareOk exchangeDeclare(String exchange,String type,boolean durable,boolean autoDelete,boolean internal,Map<String,Object> arguments) throws IOException;
- exchange:交換機(jī)名稱
- type:交換機(jī)類型 有direct、fanout、topic三種
- durable:設(shè)置是否持久化。durable設(shè)置true表示持久化 ,服務(wù)器重啟會將Exchange(交換機(jī))存盤。注意:僅設(shè)置此選項,不代表消息持久化。即不保證重啟后消息還在。
- autoDelete: 設(shè)置是否自動刪除 。.當(dāng)已經(jīng)沒有消費者時,服務(wù)器是否可以刪除該Exchange。自動刪除的前提是至少有一個隊列或者交換機(jī)與這個交換器綁定的隊列或者交換器都與之解綁;
- internal:設(shè)置是否內(nèi)置的。如果設(shè)置為true,則表示是內(nèi)置的交換器,客戶端程序無法直接發(fā)送消息到這個交換器中,只能通過交換器路由到交換器這種方式
- argument:其他一些結(jié)構(gòu)化參數(shù),比如alternate-exchange
3.5. queueDeclare 聲明隊列
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
- 隊列的名字
- 隊列里面的消息是否支持持計化
- 設(shè)置該隊列,是否可以供對個消費者消費
- 是否自動刪除消息
- 其他參數(shù)
3.6. queueBind 綁定隊列
Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
- queue: 隊列名
- exchange: 交換器名稱
- routingKey :路由key或者綁定key
- arguments: 一些參數(shù)
3.7. queueUnbind 解綁隊列
Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
- queue: 隊列名
- exchange: 交換器名稱
- routingKey :路由key或者綁定key
- arguments: 一些參數(shù)
3.8. exchangeBind 綁定交換機(jī)
Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
- destination :目標(biāo)交換器
- source :源交換器
- routingKey 路由key
- arguments: 一些相關(guān)參數(shù)
消息從source交換器轉(zhuǎn)發(fā)到destination交換器存儲在destination綁定的隊列queue中
3.9. exchangeUnbind 解綁交換機(jī)
Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
- destination :目標(biāo)交換器
- source :源交換器
- routingKey 路由key
- arguments: 一些相關(guān)參數(shù)
3.10. basicQos 消息流量
有多個重載方法,這些方法都是由下面這個方法中的缺省參數(shù)構(gòu)成的,
void basicQos(int prefetchSize, int prefetchCount, boolean global)
- param1:prefetchSize,消息本身的大小 如果設(shè)置為0 那么表示對消息本身的大小不限制
- param2:prefetchCount,告訴rabbitmq不要一次性給消費者推送大于N個消息
- param3:global,是否將上面的設(shè)置應(yīng)用于整個通道
- false:表示只應(yīng)用于當(dāng)前消費者
- true:表示當(dāng)前通道的所有消費者都應(yīng)用這個限流策略
消費者在接收到隊列里的消息但沒有返回確認(rèn)結(jié)果之前,隊列不會將新的消息分發(fā)給該消費者。
隊列中沒有被消費的消息不會被刪除,還是存在于隊列中。
一般和channel.basicAck配套使用
3.11. basicAck 消息確認(rèn)
void basicAck(long deliveryTag, boolean multiple) throws IOException
- deliveryTag:該消息的index
- multiple:是否批量.true:將一次性ack所有小于deliveryTag的消息。
3.12. basicConsume 消息消費
該重載方法有點多,具體我就不列舉了,參數(shù)解釋一下:
- queue:隊列名
- autoAck:是否自動確認(rèn)消息
- deliverCallback: 當(dāng)一個消息發(fā)送過來后的回調(diào)接口
- cancelCallback:當(dāng)一個消費者取消訂閱時的回調(diào)接口;取消消費者訂閱隊列時除了使用{@link Channel#basicCancel}之外的所有方式都會調(diào)用該回調(diào)方法
- callback: 消費者對象的回調(diào)接口
- shutdownSignalCallback: 當(dāng)channel/connection 關(guān)閉后回調(diào)
- arguments: 消費的一組參數(shù)
- consumerTag: 客戶端生成的用于建立上線文的使用者標(biāo)識
- nolocal:如果服務(wù)器不應(yīng)將在此通道連接上發(fā)布的消息傳遞給此使用者,則為true;請注意RabbitMQ服務(wù)器上不支持此標(biāo)記
- exclusive: 如果是單個消費者,則為true
啟動一個消費者,并返回服務(wù)端生成的消費者標(biāo)識
3.13. basicPublish 發(fā)布消息
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
- exchange:要將消息發(fā)送到的Exchange(交換器)
- routingKey:路由的 key 是哪個
- 其他參數(shù)
- mandatory:true 如果mandatory標(biāo)記被設(shè)置
- immediate: true 如果immediate標(biāo)記被設(shè)置,注意:RabbitMQ服務(wù)端不支持此標(biāo)記
- props:其它的一些屬性,如:{@linkMessageProperties.PERSISTENT_TEXT_PLAIN}
- body:發(fā)送消息的消息體
3.14. basicGet 主動拉取隊列中的一條消息
GetResponse basicGet(String queue, boolean autoAck)
- 參數(shù)1: 隊列名
- 參數(shù)2: 是否自動確認(rèn)
3.15. basicCancel 取消消費者對隊列的訂閱關(guān)系
void basicCancel(String consumerTag)
consumerTag:服務(wù)器端生成的消費者標(biāo)識
4. 消息確認(rèn)一些觀點
- 消息監(jiān)聽內(nèi)必須使用channel對消息進(jìn)行確認(rèn),不管是確認(rèn)消費成功還是確認(rèn)消費失敗
- 消息監(jiān)聽內(nèi)的異常處理有兩種方式:
- 內(nèi)部catch后直接處理,然后使用channel對消息進(jìn)行確認(rèn)
- 配置RepublishMessageRecoverer將處理異常的消息發(fā)送到指定隊列專門處理或記錄
- 監(jiān)聽的方法內(nèi)拋出異常貌似沒有太大用處。因為拋出異常就算是重試也非常有可能會繼續(xù)出現(xiàn)異常,當(dāng)重試次數(shù)完了之后消息就只有重啟應(yīng)用才能接收到了,很有可能導(dǎo)致消息消費不及時。當(dāng)然可以配置RepublishMessageRecoverer來解決,但是萬一RepublishMessageRecoverer發(fā)送失敗了呢。。那就可能造成消息消費不及時了。所以即使需要將處理出現(xiàn)異常的消息統(tǒng)一放到另外隊列去處理,個人建議兩種方式:
- catch異常后,手動發(fā)送到指定隊列,然后使用channel給rabbitmq確認(rèn)消息已消費
- 給Queue綁定死信隊列,使用nack(requque為false)確認(rèn)消息消費失敗
到此這篇關(guān)于RabbitMq中channel接口的幾種常用參數(shù)詳解的文章就介紹到這了,更多相關(guān)channel接口常用參數(shù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
用Maven打成可執(zhí)行jar,包含maven依賴,本地依賴的操作
這篇文章主要介紹了用Maven打成可執(zhí)行jar,包含maven依賴,本地依賴的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-08-08springboot?集成identityserver4身份驗證的過程解析
這篇文章主要介紹了springboot?集成identityserver4身份驗證的相關(guān)知識,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧2024-01-01Java那些鮮為人知的關(guān)鍵字volatile詳析
這篇文章主要給大家介紹了關(guān)于Java那些鮮為人知的關(guān)鍵字volatile的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-03-03Springboot使用influxDB時序數(shù)據(jù)庫的實現(xiàn)
項目中需要存放大量設(shè)備日志,且需要對其進(jìn)行簡單的數(shù)據(jù)分析,信息提取工作,所以本文就介紹一下Springboot使用influxDB時序數(shù)據(jù)庫,感興趣的可以了解一下2021-08-08