rabbitmq學(xué)習(xí)系列教程之消息應(yīng)答(autoAck)、隊(duì)列持久化(durable)及消息持久化
一、前言
Boolean autoAck = false; channel.basicConsume(queue_name, autoAck ,consumer);
在simple queue 和 work queue(輪詢) 處理中,我們?cè)O(shè)置的消費(fèi)者的消息監(jiān)聽都采用 channel.basicConsume(queue_name,true, consumer),其中參數(shù)二 boolean autoAck為true,但在fair prefetch 公平分發(fā)中設(shè)置為false,這個(gè)設(shè)置在整個(gè)消息隊(duì)列和消息消費(fèi)者之間有什么影響呢?
二、autoAck 參數(shù)的討論
我們都明白一點(diǎn),autoAck設(shè)置為true時(shí),消息隊(duì)列可以不用在意消息消費(fèi)者是否處理完消息,一直發(fā)送全部消息。但在公平分發(fā)中,也就是autoAck設(shè)置為false,在發(fā)送一個(gè)消息后到?jīng)]收到消息消費(fèi)者成功消費(fèi)消息的信息回執(zhí)之間,是不會(huì)繼續(xù)給這個(gè)消息繼續(xù)發(fā)送消息的。
1、當(dāng) autoAck設(shè)置為true時(shí),也就是自動(dòng)確認(rèn)模式,一旦消息隊(duì)列將消息發(fā)送給消息消費(fèi)者后,就會(huì)從內(nèi)存中將這個(gè)消息刪除。
2、當(dāng)autoAck設(shè)置為false時(shí),也就是手動(dòng)模式,如果此時(shí)的有一個(gè)消費(fèi)者宕機(jī),消息隊(duì)列就會(huì)將這條消息繼續(xù)發(fā)送給其他的消費(fèi)者,這樣數(shù)據(jù)在消息消費(fèi)者集群的環(huán)境下,也就算是不丟失了。
在 Boolean autoAck = true的情況下,消息隊(duì)列不會(huì)管消費(fèi)者是否收到了消息,如果消費(fèi)者宕機(jī),消息也就丟失了。
在 Boolean autoAck = false的情況下,如果消費(fèi)者1宕機(jī)了,消息隊(duì)列沒有收到消費(fèi)者發(fā)送回的應(yīng)答,就會(huì)將這個(gè)消息發(fā)送給下一個(gè)消費(fèi)者處理。直到消費(fèi)者處理完這個(gè)消息,并向消息隊(duì)列發(fā)送了一個(gè)消息應(yīng)答,告訴消息隊(duì)列此時(shí)這個(gè)消息已經(jīng)處理完成,消息隊(duì)列才會(huì)將這個(gè)消息從內(nèi)存中刪除。
由此我們可以思考一個(gè)問題:從上面兩個(gè)設(shè)置中,我們當(dāng)然會(huì)認(rèn)為false比較好了,但大家可能會(huì)忽略一個(gè)小問題,消息隊(duì)列保存的消息是在內(nèi)存中的,消息隊(duì)列宕機(jī)了,內(nèi)存中的消息也就清除了,如何做到消息的持久化保存呢?
三、rabbitmq 隊(duì)列持久化操作
我們之前在消息的生產(chǎn)者和消息的消費(fèi)者中都聲明了一個(gè)消息隊(duì)列。
channel.queueDeclare(queue_name, false, false, false, null);
他的源碼介紹為:
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
其中各項(xiàng)參數(shù)的含義:
queue:聲明隊(duì)列的名稱
durable:如果我們聲明一個(gè)持久隊(duì)列,則為true(該隊(duì)列將在服務(wù)器重啟后保留下來)
exclusive:如果我們聲明一個(gè)獨(dú)占隊(duì)列,則為true(僅限此連接)
autoDelete:如果我們聲明一個(gè)自動(dòng)刪除隊(duì)列(服務(wù)器將在不再使用它時(shí)將其刪除)
arguments:隊(duì)列的其他屬性(構(gòu)造參數(shù))
【擴(kuò)展:】arguments參數(shù)干啥用的?
當(dāng)前的arguments
參數(shù)的含義,用于設(shè)定隊(duì)列的屬性,如下所示:
1、x-expires
設(shè)定隊(duì)列有效期
。表示隊(duì)列在指定時(shí)間內(nèi)未使用,則會(huì)刪除。
2、x-message-ttl
設(shè)定消息延遲發(fā)送時(shí)間
。
3、x-dead-letter-exchange
設(shè)置死信交換機(jī)
。
4、x-dead-letter-routing-key
設(shè)置路由
。
參照文章:
rabbitmq創(chuàng)建queue時(shí)arguments參數(shù)注釋
小細(xì)節(jié):從上面的參數(shù)信息中我們發(fā)現(xiàn)一個(gè)參數(shù)durable,發(fā)現(xiàn)這個(gè)參數(shù)是聲明隊(duì)列為持久化隊(duì)列,那我們改成 true 是否就可以了呢?我們嘗試下!
修改send代碼中的聲明隊(duì)列:
boolean durable = true; channel.queueDeclare(queue_name, durable, false, false, null);
運(yùn)行起來,結(jié)果。。
注意:出現(xiàn)這種情況的原因是我的rabbitmq中本身就存在一個(gè)設(shè)置好了的queue,如下所示:
如果在已存在的消息隊(duì)列上,依據(jù)修改代碼變更持久化隊(duì)列操作,則會(huì)出現(xiàn)如上所述的異常信息。
但如果rabbitmq中不存在對(duì)應(yīng)的消息隊(duì)列時(shí),則不會(huì)造成影響。
結(jié)論:
rabbitmq不允許對(duì)一個(gè)已存在的隊(duì)列重新定義參數(shù)信息。
由上面的測(cè)試發(fā)現(xiàn):
1、如果在localhost:15672中刪除指定的queue,則可以創(chuàng)建出一個(gè)持久化隊(duì)列。
2、重新定義一個(gè)網(wǎng)址上不存在
的名稱作為持久化隊(duì)列。
最后再強(qiáng)調(diào)一點(diǎn):
消息生產(chǎn)者和消息消費(fèi)者的隊(duì)列聲明(隊(duì)列設(shè)置),必須保持一致。
原因:rabbitmq不允許對(duì)一個(gè)已存在的隊(duì)列重新定義參數(shù)信息
有些大佬說無需在消費(fèi)者中聲明隊(duì)列,其實(shí)最好還是需要聲明,原因在于,如果rabbitmq中不存在指定的queue_name的消息隊(duì)列時(shí),運(yùn)行代碼將會(huì)出現(xiàn)報(bào)錯(cuò)信息?。?/p>
四、2019.11.04 問題補(bǔ)充
上面的兩個(gè)參數(shù)信息消息應(yīng)答(autoAck)與消息持久化(durable),都往持久化的方向設(shè)置了,消息
會(huì)持久化保存嗎?
答案:錯(cuò)。
1、消息應(yīng)答設(shè)置為手動(dòng)模式,只是確保消息能夠正常的被消費(fèi)掉,而并非標(biāo)識(shí)消息的持久化。
2、durable設(shè)置為true,只是說我們?cè)O(shè)置一個(gè)消息隊(duì)列的屬性為持久化隊(duì)列
,在rabbitmq中有很多個(gè)通道和隊(duì)列,并非
標(biāo)識(shí)整體的消息
就是持久化
了。
為什么說按照上述設(shè)置條件,設(shè)定了消息隊(duì)列后,消息隊(duì)列中的消息還是不能持久化保存呢?
消息生產(chǎn)者生產(chǎn)50個(gè)消息并放入消息隊(duì)列中
重啟rabbitmq服務(wù)(模擬宕機(jī))
重啟完成后,訪問 localhost:15672 查看 Queues屬性,
發(fā)現(xiàn)消息隊(duì)列在重啟服務(wù)后是存在的,但其中的消息卻不存在了。
要想徹底實(shí)現(xiàn)服務(wù)宕機(jī)等操作后,消息依舊能夠?qū)崿F(xiàn)持久化保存(硬盤保存),還需要繼續(xù)進(jìn)行學(xué)習(xí),研究。
五、2019.11.07消息的持久化
通過上面的測(cè)試我們發(fā)現(xiàn):
durable 只是表明消息隊(duì)列的持久化,不表示消息的持久化。
在消息生產(chǎn)者生產(chǎn)消息推送至消息隊(duì)列中(或消息轉(zhuǎn)發(fā)器)時(shí),我們使用了一個(gè)方法
channel.basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
其中的參數(shù)三 BasicProperties props表示額外配置屬性。
那么這個(gè)屬性在源碼中有什么呢?
public static class BasicProperties extends com.rabbitmq.client.impl.AMQBasicProperties { private String contentType;//消息類型如:text/plain private String contentEncoding;//編碼 private Map<String,Object> headers; private Integer deliveryMode;//1:nonpersistent 2:persistent private Integer priority;//優(yōu)先級(jí) private String correlationId; private String replyTo;//反饋隊(duì)列 private String expiration;//expiration到期時(shí)間 private String messageId; private Date timestamp; private String type; private String userId; private String appId; private String clusterId; ...
從源碼中我們看到 BasicProperties中存在一條屬性 deliveryMode,
1表示不持久化; 2表示持久化!
如何使用代碼實(shí)現(xiàn)呢?
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.AMQP.BasicProperties.Builder; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import cn.linkpower.util.MqConnectUtil; /** * 公平分發(fā)--誰做的快誰就多做!<br> * 只有在消息消費(fèi)者成功消費(fèi)消息,發(fā)送消費(fèi)成功的指令給隊(duì)列后,消息隊(duì)列才會(huì)繼續(xù)向該消費(fèi)者發(fā)送下一條消息指令。<br> * @author 76519 * */ public class Send { private static final String queue_name = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //1、建立連接 Connection mqConnection = MqConnectUtil.getMqConnection(); //2、建立信道(通道) Channel channel = mqConnection.createChannel(); //3、聲明隊(duì)列(開啟持久化) boolean durable = true; channel.queueDeclare(queue_name, durable, false, false, null); //公平分發(fā)--- //為了開啟公平分發(fā)操作,在消息消費(fèi)者發(fā)送確認(rèn)收到的指示后,消息隊(duì)列才會(huì)給這個(gè)消費(fèi)者繼續(xù)發(fā)送下一條消息。 //此處的 1 表示 限制發(fā)送給每個(gè)消費(fèi)者每次最大的消息數(shù)。 channel.basicQos(1); //4、發(fā)送消息 for (int i = 0; i < 10; i++) { String string = "hello xiangjiao "+i; System.out.println("send msg = "+string); //發(fā)送消息 //channel.basicPublish("", queue_name, null, string.getBytes()); //消息持久化測(cè)試 Builder builder = new Builder(); builder.deliveryMode(2); BasicProperties properties = builder.build(); channel.basicPublish("", queue_name, properties, string.getBytes()); //消息發(fā)送慢一點(diǎn) Thread.sleep(i*5); } //5、使用完畢后,需要及時(shí)的關(guān)閉流應(yīng)用 channel.close(); mqConnection.close(); } }
測(cè)試操作:
1、運(yùn)行代碼,查看 local’host:15672 登陸指定的賬號(hào),查詢queue信息
2、重啟rabbitmq服務(wù)。
3、重啟后,再次查看 web 控制臺(tái)
發(fā)現(xiàn):當(dāng)重新完全啟動(dòng) rabbitmq 后,他會(huì)自動(dòng)加載之前的消息至消息隊(duì)列中。
但是此時(shí)并不能說明問題,我們是否忽略了一點(diǎn),你確定了這個(gè)消息隊(duì)列的消息了沒有?
so 我們運(yùn)行消息消費(fèi)者 查看這個(gè)消息隊(duì)列里面的消息到底是什么?
六、2022.02.09 增加隊(duì)列持久化說明
在之前的代碼中,設(shè)置隊(duì)列屬性為createChannel.queueDeclare(simpleQueueName, false, false, false, null)
,其中參數(shù)二
代表該隊(duì)列是否是一個(gè)持久化隊(duì)列
,此處設(shè)置的為false
,表示非持久化
。
這是什么含義呢?
執(zhí)行消息創(chuàng)建并添加至隊(duì)列的代碼邏輯。此時(shí)隊(duì)列中存在數(shù)據(jù),其次也存在該隊(duì)列。
將Rabbitmq重啟,再次查看web:
/sbin/service rabbitmq-server stop
/sbin/service rabbitmq-server start
此時(shí)通過web界面得知,該隊(duì)列
在rabbitmq重啟后
,隊(duì)列沒了!
結(jié)語
到此為止,消息持久化、隊(duì)列持久化 算是琢磨個(gè)差不多了。
到此這篇關(guān)于rabbitmq學(xué)習(xí)系列教程之消息應(yīng)答(autoAck)、隊(duì)列持久化(durable)及消息持久化的文章就介紹到這了,更多相關(guān)rabbitmq消息應(yīng)答隊(duì)列持久化消息持久化內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
使用Java找出兩個(gè)List中的重復(fù)元素三種方法
在Java編程中,我們經(jīng)常需要找出兩個(gè)列表(List)中的重復(fù)元素,在本文中,我們將探討三種方法來實(shí)現(xiàn)這一目標(biāo),需要的朋友可以參考下2023-10-10java操作json對(duì)象出現(xiàn)StackOverflow錯(cuò)誤的問題及解決
這篇文章主要介紹了java操作json對(duì)象出現(xiàn)StackOverflow錯(cuò)誤的問題及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-06-06利用java實(shí)現(xiàn)一個(gè)客戶信息管理系統(tǒng)
這篇文章主要給大家介紹了關(guān)于利用java實(shí)現(xiàn)一個(gè)客戶信息管理系統(tǒng)的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04IDEA2019.3配置Hibernate的詳細(xì)教程(未使用IDEA的自動(dòng)化)
這篇文章主要介紹了IDEA2019.3配置Hibernate的詳細(xì)教程,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-05-05Spring Security 安全認(rèn)證的示例代碼
這篇文章主要介紹了Spring Security 安全認(rèn)證的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-10-10