欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RabbitMq中channel接口的幾種常用參數(shù)詳解

 更新時(shí)間:2023年08月28日 09:38:31   作者:Alan0517  
這篇文章主要介紹了RabbitMq中channel接口的幾種常用參數(shù)詳解,RabbitMQ 不會(huì)為未確認(rèn)的消息設(shè)置過期時(shí)間,它判斷此消息是否需要重新投遞給消費(fèi)者的唯一依據(jù)是消費(fèi)該消息的消費(fèi)者連接是否己經(jīng)斷開,需要的朋友可以參考下

1. 背景概述

為了保證消息從隊(duì)列可靠地達(dá)到消費(fèi)者, RabbitMQ 提供了消息確認(rèn)機(jī)制( message acknowledgement), 消費(fèi)者在訂閱隊(duì)列時(shí),可以指定autoAck參數(shù),

  • 當(dāng)autoAck 等于false時(shí),RabbitMQ會(huì)等待消費(fèi)者顯式地回復(fù)確認(rèn)信號后才從內(nèi)存(或者磁盤)中移去消息(實(shí)質(zhì)上是先打上刪除標(biāo)記,之后再刪除) 。
  • 當(dāng)autoAck 等于true時(shí),RabbitMQ 會(huì)自動(dòng)把發(fā)送出去的消息置為確認(rèn), 然后從內(nèi)存(或者磁盤)中刪除,而不管消費(fèi)者是否真正地消費(fèi)到了這些消息。

采用消息確認(rèn)機(jī)制后,只要設(shè)置autoAck 參數(shù)為false ,消費(fèi)者就有足夠的時(shí)間處理消息(任務(wù)) ,不用擔(dān)心處理消息過程中消費(fèi)者進(jìn)程掛掉后消息丟失的問題,因?yàn)镽abbitMQ 會(huì)一直等待持有消息直到消費(fèi)者顯式調(diào)用Basic.Ack 命令為止。

當(dāng)autoAck 參數(shù)置為false ,對于RabbitMQ 服務(wù)端而言,隊(duì)列中的消息分成了兩個(gè)部分:一部分是等待投遞給消費(fèi)者的消息:一部分是己經(jīng)投遞給消費(fèi)者,但是還沒有收到消費(fèi)者確認(rèn)信號的消息。如果RabbitMQ 一直沒有收到消費(fèi)者的確認(rèn)信號,并且消費(fèi)此消息的消費(fèi)者己經(jīng)斷開連接, 則RabbitMQ 會(huì)安排該消息重新進(jìn)入隊(duì)列,等待投遞給下一個(gè)消費(fèi)者,當(dāng)然也有可能還是原來的那個(gè)消費(fèi)者。

RabbitMQ 不會(huì)為未確認(rèn)的消息設(shè)置過期時(shí)間,它判斷此消息是否需要重新投遞給消費(fèi)者的唯一依據(jù)是消費(fèi)該消息的消費(fèi)者連接是否己經(jīng)斷開, 這么設(shè)計(jì)的原因是RabbitMQ 允許消費(fèi)者消費(fèi)一條消息的時(shí)間可以很久很久。

RabbtiMQ 的Web 管理平臺(tái)(15672端口)上可以看到當(dāng)前隊(duì)列中的" Ready" 狀態(tài)和"Unacknowledged" 狀態(tài)的消息數(shù),分別對應(yīng)上文中的等待投遞給消費(fèi)者的消息數(shù)和己經(jīng)投遞給消費(fèi)者但是未收到確認(rèn)信號的消息數(shù)

也可以通過相應(yīng)的命令來查看上述信息:

rabbitmqctl list_queues name messages_ready messages_unacknowledged

在消費(fèi)者接收到消息后,如果想明確拒絕當(dāng)前的消息而不是確認(rèn),那么應(yīng)該怎么做呢?

RabbitMQ 在2 .0.0 版本開始引入了Basic.Reject 這個(gè)命令,消費(fèi)者客戶端可以調(diào)用與其對應(yīng)的channel.basicReject 方法來告訴RabbitMQ 拒絕這個(gè)消息。

Channel 類中的basicReject 方法定義如下:

void basicReject(long deliveryTag, boolean requeue) throws IOException;

其中deliveryTag 可以看作消息的編號,它是一個(gè)64 位的長整型值,最大值是9223372036854775807, 如果requeue 參數(shù)設(shè)置為true ,則RabbitMQ 會(huì)重新將這條消息存入隊(duì)列,以便可以發(fā)送給下一個(gè)訂閱的消費(fèi)者, 如果requeue 參數(shù)設(shè)置為false ,則RabbitMQ立即會(huì)把消息從隊(duì)列中移除,而不會(huì)把它發(fā)送給新的消費(fèi)者。

Basic.Reject 命令一次只能拒絕一條消息,如果想要批量拒絕消息,則可以使用Basic.Nack 這個(gè)命令, 消費(fèi)者客戶端可以調(diào)用channel.basicNack 方法來實(shí)現(xiàn),方法定義如下:

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

其中deliveryTag 和requeue 的含義可以參考basicReject 方法。

  • multiple 參數(shù)設(shè)置為false 則表示拒絕編號為deliveryTag的這一條消息,這時(shí)候basicNack 和basicReject 方法一樣;
  • multiple 參數(shù)設(shè)置為true 則表示拒絕deliveryTag 編號之前所有未被當(dāng)前消費(fèi)者確認(rèn)的消息。

注意要點(diǎn):

將channel.basicReject 或者channel.basicNack 中的requeue 設(shè)置為false ,可以啟用" 死信隊(duì)列 "的功能。

死信隊(duì)列可以通過檢測被拒絕或者未送達(dá)的消息來追蹤問題。 對于requeue , AMQP 中還有一個(gè)命令Basic.Recover 具備可重入隊(duì)列的特性。其對應(yīng)的客戶端方法為:

1.Basic.RecoverOk basicRecover() throws IOException;

2.Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

這個(gè)channel.basicRecover 方法用來請求RabbitMQ 重新發(fā)送還未被確認(rèn)的消息。

如果requeue 參數(shù)設(shè)置為true, 則未被確認(rèn)的消息會(huì)被重新加入到隊(duì)列中, 這樣對于同一條消息來說,可能會(huì)被分配給與之前不同的消費(fèi)者。

如果requeue 參數(shù)設(shè)置為false ,那么同一條消息會(huì)被分配給與之前相同的消費(fèi)者, 默認(rèn)情況下,如果不設(shè)置requeue 這個(gè)參數(shù),相當(dāng)于channel.basicRecover(true) ,即requeue 默認(rèn)為true

2. 通常參數(shù)解釋

  • consumerTag :會(huì)話的標(biāo)簽,是固定的 ;
  • deliveryTag : 每次接收消息+1,可以做此消息處理通道的名字。

因此 deliveryTag 可以用來回傳告訴 rabbitmq 這個(gè)消息處理成功 清除此消息(basicAck方法)。

3. Channel一些Api解釋

3.1. basicNack 不確認(rèn)消息

    void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

簡單理解就是: 不確認(rèn)deliveryTag對應(yīng)的消息

  • 參數(shù)1: 消息
  • 參數(shù)2: 是否應(yīng)用于多消息
  • 參數(shù)3: 是否重新放回隊(duì)列,否則丟棄或者進(jìn)入死信隊(duì)列

第二個(gè)參數(shù),怎么理解basic.nack多消息,比如現(xiàn)在有多條消息去調(diào)用這個(gè)nack方法,他是怎么執(zhí)行的?

  • 有個(gè)先后順序,就是調(diào)用nack時(shí),之前所有沒有ack的消息都會(huì)被標(biāo)記為nack,多條消息同時(shí)調(diào)用,則調(diào)用的這個(gè)語句執(zhí)行前,如果還有未執(zhí)行回復(fù)確認(rèn)的消息就會(huì)被回復(fù)nack,后續(xù)的消息回復(fù)nack可能只作用于當(dāng)條消息。

注意: nack后的消息也會(huì)被自己消費(fèi);

3.2. basicReject 拒絕消息

Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

簡單理解就是:拒絕deliveryTag對應(yīng)的消息

  • 參數(shù)1: 消息
  • 參數(shù)2: 是否重新放回隊(duì)列,否則丟棄或者進(jìn)入死信隊(duì)列

區(qū)別在于:

  • basicReject一次只能拒絕接收一個(gè)消息
  • basicNack方法可以支持一次0個(gè)或多個(gè)消息的拒收

3.3. RecoverOk 是否恢復(fù)消息到隊(duì)列

Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

是否恢復(fù)消息到隊(duì)列,參數(shù)是是否requeue,true則重新入隊(duì)列,并且盡可能的將之前recover的消息投遞給其他消費(fèi)者消費(fèi),而不是自己再次消費(fèi)。 false則消息會(huì)重新被投遞給自己。

3.4. exchangeDeclare 聲明交換機(jī)

有多個(gè)重載方法,這些方法都是由下面這個(gè)方法中的缺省參數(shù)構(gòu)成的

Exchange.DeclareOk exchangeDeclare(String exchange,String type,boolean durable,boolean autoDelete,boolean internal,Map<String,Object> arguments) throws IOException;
  • exchange:交換機(jī)名稱
  • type:交換機(jī)類型 有direct、fanout、topic三種
  • durable:設(shè)置是否持久化。durable設(shè)置true表示持久化 ,服務(wù)器重啟會(huì)將Exchange(交換機(jī))存盤。注意:僅設(shè)置此選項(xiàng),不代表消息持久化。即不保證重啟后消息還在。
  • autoDelete: 設(shè)置是否自動(dòng)刪除 。.當(dāng)已經(jīng)沒有消費(fèi)者時(shí),服務(wù)器是否可以刪除該Exchange。自動(dòng)刪除的前提是至少有一個(gè)隊(duì)列或者交換機(jī)與這個(gè)交換器綁定的隊(duì)列或者交換器都與之解綁;
  • internal:設(shè)置是否內(nèi)置的。如果設(shè)置為true,則表示是內(nèi)置的交換器,客戶端程序無法直接發(fā)送消息到這個(gè)交換器中,只能通過交換器路由到交換器這種方式
  • argument:其他一些結(jié)構(gòu)化參數(shù),比如alternate-exchange

3.5. queueDeclare 聲明隊(duì)列

  Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map&lt;String, Object&gt; arguments) throws IOException;
  • 隊(duì)列的名字
  • 隊(duì)列里面的消息是否支持持計(jì)化
  • 設(shè)置該隊(duì)列,是否可以供對個(gè)消費(fèi)者消費(fèi)
  • 是否自動(dòng)刪除消息
  • 其他參數(shù)

3.6. queueBind 綁定隊(duì)列

 Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map&lt;String, Object&gt; arguments) throws IOException;
  • queue: 隊(duì)列名
  • exchange: 交換器名稱
  • routingKey :路由key或者綁定key
  • arguments: 一些參數(shù)

3.7. queueUnbind 解綁隊(duì)列

Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map&lt;String, Object&gt; arguments) throws IOException;
  • queue: 隊(duì)列名
  • exchange: 交換器名稱
  • routingKey :路由key或者綁定key
  • arguments: 一些參數(shù)

3.8. exchangeBind 綁定交換機(jī)

  Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
  • destination :目標(biāo)交換器
  • source :源交換器
  • routingKey 路由key
  • arguments: 一些相關(guān)參數(shù)

消息從source交換器轉(zhuǎn)發(fā)到destination交換器存儲(chǔ)在destination綁定的隊(duì)列queue中

3.9. exchangeUnbind 解綁交換機(jī)

 Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map&lt;String, Object&gt; arguments) throws IOException;
  • destination :目標(biāo)交換器
  • source :源交換器
  • routingKey 路由key
  • arguments: 一些相關(guān)參數(shù)

3.10. basicQos 消息流量

有多個(gè)重載方法,這些方法都是由下面這個(gè)方法中的缺省參數(shù)構(gòu)成的,

void basicQos(int prefetchSize, int prefetchCount, boolean global)
  • param1:prefetchSize,消息本身的大小 如果設(shè)置為0 那么表示對消息本身的大小不限制
  • param2:prefetchCount,告訴rabbitmq不要一次性給消費(fèi)者推送大于N個(gè)消息
  • param3:global,是否將上面的設(shè)置應(yīng)用于整個(gè)通道
    • false:表示只應(yīng)用于當(dāng)前消費(fèi)者
    • true:表示當(dāng)前通道的所有消費(fèi)者都應(yīng)用這個(gè)限流策略

消費(fèi)者在接收到隊(duì)列里的消息但沒有返回確認(rèn)結(jié)果之前,隊(duì)列不會(huì)將新的消息分發(fā)給該消費(fèi)者。

隊(duì)列中沒有被消費(fèi)的消息不會(huì)被刪除,還是存在于隊(duì)列中。

一般和channel.basicAck配套使用

3.11. basicAck 消息確認(rèn)

 void basicAck(long deliveryTag, boolean multiple) throws IOException
  • deliveryTag:該消息的index
  • multiple:是否批量.true:將一次性ack所有小于deliveryTag的消息。

3.12. basicConsume 消息消費(fèi)

該重載方法有點(diǎn)多,具體我就不列舉了,參數(shù)解釋一下:

  • queue:隊(duì)列名
  • autoAck:是否自動(dòng)確認(rèn)消息
  • deliverCallback: 當(dāng)一個(gè)消息發(fā)送過來后的回調(diào)接口
  • cancelCallback:當(dāng)一個(gè)消費(fèi)者取消訂閱時(shí)的回調(diào)接口;取消消費(fèi)者訂閱隊(duì)列時(shí)除了使用{@link Channel#basicCancel}之外的所有方式都會(huì)調(diào)用該回調(diào)方法
  • callback: 消費(fèi)者對象的回調(diào)接口
  • shutdownSignalCallback: 當(dāng)channel/connection 關(guān)閉后回調(diào)
  • arguments: 消費(fèi)的一組參數(shù)
  • consumerTag: 客戶端生成的用于建立上線文的使用者標(biāo)識(shí)
  • nolocal:如果服務(wù)器不應(yīng)將在此通道連接上發(fā)布的消息傳遞給此使用者,則為true;請注意RabbitMQ服務(wù)器上不支持此標(biāo)記
  • exclusive: 如果是單個(gè)消費(fèi)者,則為true

啟動(dòng)一個(gè)消費(fèi)者,并返回服務(wù)端生成的消費(fèi)者標(biāo)識(shí)

3.13. basicPublish 發(fā)布消息

void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
  • exchange:要將消息發(fā)送到的Exchange(交換器)
  • routingKey:路由的 key 是哪個(gè)
  • 其他參數(shù)
    • mandatory:true 如果mandatory標(biāo)記被設(shè)置
    • immediate: true 如果immediate標(biāo)記被設(shè)置,注意:RabbitMQ服務(wù)端不支持此標(biāo)記
    • props:其它的一些屬性,如:{@linkMessageProperties.PERSISTENT_TEXT_PLAIN}
  • body:發(fā)送消息的消息體

3.14. basicGet 主動(dòng)拉取隊(duì)列中的一條消息

GetResponse basicGet(String queue, boolean autoAck)
  • 參數(shù)1: 隊(duì)列名
  • 參數(shù)2: 是否自動(dòng)確認(rèn)

3.15. basicCancel 取消消費(fèi)者對隊(duì)列的訂閱關(guān)系

void basicCancel(String consumerTag)

consumerTag:服務(wù)器端生成的消費(fèi)者標(biāo)識(shí)

4. 消息確認(rèn)一些觀點(diǎn)

  • 消息監(jiān)聽內(nèi)必須使用channel對消息進(jìn)行確認(rèn),不管是確認(rèn)消費(fèi)成功還是確認(rèn)消費(fèi)失敗
  • 消息監(jiān)聽內(nèi)的異常處理有兩種方式:
    • 內(nèi)部catch后直接處理,然后使用channel對消息進(jìn)行確認(rèn)
    • 配置RepublishMessageRecoverer將處理異常的消息發(fā)送到指定隊(duì)列專門處理或記錄
  • 監(jiān)聽的方法內(nèi)拋出異常貌似沒有太大用處。因?yàn)閽伋霎惓>退闶侵卦囈卜浅S锌赡軙?huì)繼續(xù)出現(xiàn)異常,當(dāng)重試次數(shù)完了之后消息就只有重啟應(yīng)用才能接收到了,很有可能導(dǎo)致消息消費(fèi)不及時(shí)。當(dāng)然可以配置RepublishMessageRecoverer來解決,但是萬一RepublishMessageRecoverer發(fā)送失敗了呢。。那就可能造成消息消費(fèi)不及時(shí)了。所以即使需要將處理出現(xiàn)異常的消息統(tǒng)一放到另外隊(duì)列去處理,個(gè)人建議兩種方式:
    • catch異常后,手動(dòng)發(fā)送到指定隊(duì)列,然后使用channel給rabbitmq確認(rèn)消息已消費(fèi)
    • 給Queue綁定死信隊(duì)列,使用nack(requque為false)確認(rèn)消息消費(fèi)失敗

到此這篇關(guān)于RabbitMq中channel接口的幾種常用參數(shù)詳解的文章就介紹到這了,更多相關(guān)channel接口常用參數(shù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 用Maven打成可執(zhí)行jar,包含maven依賴,本地依賴的操作

    用Maven打成可執(zhí)行jar,包含maven依賴,本地依賴的操作

    這篇文章主要介紹了用Maven打成可執(zhí)行jar,包含maven依賴,本地依賴的操作,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-08-08
  • VerifyCodeServlet(一次性驗(yàn)證碼)

    VerifyCodeServlet(一次性驗(yàn)證碼)

    這篇文章主要介紹了VerifyCodeServlet一次性驗(yàn)證碼的使用方法
    2017-05-05
  • SpringMVC中的http Caching的具體使用

    SpringMVC中的http Caching的具體使用

    本文主要介紹了SpringMVC中的http Caching的具體使用,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-09-09
  • Spring定時(shí)服務(wù)QuartZ原理及代碼案例

    Spring定時(shí)服務(wù)QuartZ原理及代碼案例

    這篇文章主要介紹了Spring定時(shí)服務(wù)QuartZ原理及代碼案例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-11-11
  • Java時(shí)間戳類Instant的使用詳解

    Java時(shí)間戳類Instant的使用詳解

    這篇文章主要為大家詳細(xì)介紹了Java中時(shí)間戳類Instant的使用方法,文中的示例代碼講解詳細(xì),對我們學(xué)習(xí)Java有一定幫助,需要的可以參考一下
    2022-09-09
  • springboot?集成identityserver4身份驗(yàn)證的過程解析

    springboot?集成identityserver4身份驗(yàn)證的過程解析

    這篇文章主要介紹了springboot?集成identityserver4身份驗(yàn)證的相關(guān)知識(shí),本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧
    2024-01-01
  • 基于JAVA的短信驗(yàn)證碼api調(diào)用代碼實(shí)例

    基于JAVA的短信驗(yàn)證碼api調(diào)用代碼實(shí)例

    這篇文章主要為大家詳細(xì)介紹了基于JAVA的短信驗(yàn)證碼api調(diào)用代碼實(shí)例,感興趣的小伙伴們可以參考一下
    2016-05-05
  • Java那些鮮為人知的關(guān)鍵字volatile詳析

    Java那些鮮為人知的關(guān)鍵字volatile詳析

    這篇文章主要給大家介紹了關(guān)于Java那些鮮為人知的關(guān)鍵字volatile的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-03-03
  • java實(shí)現(xiàn)俄羅斯方塊

    java實(shí)現(xiàn)俄羅斯方塊

    這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)俄羅斯方塊,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2018-06-06
  • Springboot使用influxDB時(shí)序數(shù)據(jù)庫的實(shí)現(xiàn)

    Springboot使用influxDB時(shí)序數(shù)據(jù)庫的實(shí)現(xiàn)

    項(xiàng)目中需要存放大量設(shè)備日志,且需要對其進(jìn)行簡單的數(shù)據(jù)分析,信息提取工作,所以本文就介紹一下Springboot使用influxDB時(shí)序數(shù)據(jù)庫,感興趣的可以了解一下
    2021-08-08

最新評論