RabbitMQ消息隊(duì)列之持久化機(jī)制詳解
一、RabbitMQ 持久化機(jī)制
1、RabbitMQ 持久化概述
持久化,即將原本存在于內(nèi)存中的數(shù)據(jù)寫入到磁盤上永久保存數(shù)據(jù),防止服務(wù)宕機(jī)時(shí)內(nèi)存數(shù)據(jù)的丟失。
Rabbitmq 的持久化分為隊(duì)列持久化、消息持久化和交換器持久化。
對(duì)于消息來(lái)說(shuō),不管是持久化的消息還是非持久化的消息都可以被寫入到磁盤。
持久化的消息會(huì)同時(shí)寫入磁盤和內(nèi)存(加快讀取速度),非持久化消息會(huì)在內(nèi)存不夠用時(shí),將消息寫入磁盤(一般重啟之后就沒(méi)有了)。
2、隊(duì)列持久化
隊(duì)列的持久化是在定義隊(duì)列時(shí)的通過(guò) durable
參數(shù)來(lái)決定的,當(dāng) durable 為 true 時(shí),才代表隊(duì)列會(huì)持久化。例如:
Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //第二個(gè)餐胡設(shè)置為true,代表隊(duì)列持久化 channel.queueDeclare("queue.persistent.name", true, false, false, null);
關(guān)鍵的是第二個(gè)參數(shù)設(shè)置為 true,即 durable = true。Channel 類中 queueDeclare 的完整定義如下:
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
參數(shù)說(shuō)明:
- queue:queue 的名稱
- exclusive:排他隊(duì)列,如果一個(gè)隊(duì)列被聲明為排他隊(duì)列,該隊(duì)列僅對(duì)首次申明它的連接可見(jiàn),并在連接斷開時(shí)自動(dòng)刪除。這里需要注意三點(diǎn):
- 排他隊(duì)列是基于連接可見(jiàn)的,同一連接的不同信道是可以同時(shí)訪問(wèn)同一連接創(chuàng)建的排他隊(duì)列;
- “首次”,如果一個(gè)連接已經(jīng)聲明了一個(gè)排他隊(duì)列,其他連接是不允許建立同名的排他隊(duì)列的,這個(gè)與普通隊(duì)列不同;
- 即使該隊(duì)列是持久化的,一旦連接關(guān)閉或者客戶端退出,該排他隊(duì)列都會(huì)被自動(dòng)刪除的,這種隊(duì)列適用于一個(gè)客戶端發(fā)送讀取消息的應(yīng)用場(chǎng)景。
- autoDelete:自動(dòng)刪除,如果該隊(duì)列沒(méi)有任何訂閱的消費(fèi)者的話,該隊(duì)列會(huì)被自動(dòng)刪除。這種隊(duì)列適用于臨時(shí)隊(duì)列。
總結(jié):如果將 queue 的持久化標(biāo)識(shí) durable 設(shè)置為 true,則代表是一個(gè)持久的隊(duì)列。當(dāng)服務(wù)重啟之后,隊(duì)列仍然會(huì)存在,這是因?yàn)榉?wù)會(huì)把持久化的 queue 存放在硬盤上,當(dāng)服務(wù)重啟的時(shí)候,會(huì)重新加載這些被持久化的 queue。
3、消息持久化
隊(duì)列是可以被持久化,但是里面的消息是否為持久化那還要看消息的持久化設(shè)置。
也就是說(shuō),重啟之前 queue 里面如果還有未發(fā)出去的消息的話,重啟之后,消息是否還存在隊(duì)列里面就要取決于在發(fā)送消息時(shí)對(duì)消息的設(shè)置。
消息持久化的實(shí)現(xiàn)需要在發(fā)送消息時(shí)設(shè)置消息的持久化標(biāo)識(shí),例如:
channel.basicPublish("exchange01", "routing_key01", MessageProperties.PERSISTENT_TEXT_PLAIN, "persistent_message".getBytes());
方法原型是:
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
這里關(guān)鍵的是 BasicProperties props
這個(gè)參數(shù),它的定義如下:
public BasicProperties( String contentType,//消息類型如:text/plain String contentEncoding,//編碼 Map<String,Object> headers, Integer deliveryMode,//1:nonpersistent 2:persistent Integer priority,//優(yōu)先級(jí) String correlationId, String replyTo,//反饋隊(duì)列 String expiration,//expiration到期時(shí)間 String messageId, Date timestamp, String type, String userId, String appId, String clusterId )
其中 deliveryMode=1
代表不持久化, deliveryMode=2
代表持久化。而代碼實(shí)現(xiàn)中的 MessageProperties.PERSISTENT_PLAIN
值是官方提供的一個(gè)將 deliveryMode 設(shè)置為 2 的 BasicProperties 的對(duì)象:
public static final BasicProperties PERSISTENT_TEXT_PLAIN = new BasicProperties( "text/plain", null, null, 2, 0, null, null, null, null, null, null, null, null, null );
除此之外,我們也可以使用另一種方式來(lái)設(shè)置消息持久化標(biāo)志位:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.deliveryMode(2); // 將 deliveryMode 值設(shè)置為 2 表示消息持久化 AMQP.BasicProperties properties = builder.build(); channel.basicPublish("exchange01", "routing_key01", properties, "persistent_message".getBytes());
至此,我們可以知道:當(dāng) broker 服務(wù)其重啟后,想要消息不丟失,既需要設(shè)置隊(duì)列持久化,也需要設(shè)置消息持久化。
擴(kuò)展知識(shí):
basicPublish 方法還有另外兩個(gè)重載方法:
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException; void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
這里有兩個(gè)關(guān)鍵的參數(shù): mandatory
和 immediate
。這兩個(gè)標(biāo)識(shí)位都有當(dāng)消息傳遞過(guò)程中不可達(dá)目的地時(shí)將消息返回給生產(chǎn)者的功能。下面簡(jiǎn)單講解下這兩個(gè)標(biāo)識(shí)位:
1)mandatory
- 當(dāng) mandatory 標(biāo)志位設(shè)置為 true 時(shí):如果 exchange 根據(jù)自身類型和消息 routeKey 無(wú)法找到一個(gè)符合條件的 queue,那么會(huì)調(diào)用
basic.return
方法將消息返回給生產(chǎn)者(Basic.Return + Content-Header + Content-Body); - 當(dāng) mandatory 設(shè)置為 false 時(shí),如果出現(xiàn)上述情形的話,broker 會(huì)直接將消息扔掉。
2)immediate
- 當(dāng) immediate 標(biāo)志位設(shè)置為 true 時(shí):如果 exchange 在將消息路由到 queue(s) 時(shí)發(fā)現(xiàn)對(duì)于的 queue 上么有消費(fèi)者,那么這條消息不會(huì)放入隊(duì)列中。當(dāng)與消息 routeKey 關(guān)聯(lián)的所有 queue(一個(gè)或者多個(gè))都沒(méi)有消費(fèi)者時(shí),該消息會(huì)通過(guò)
basic.return
方法返還給生產(chǎn)者。
概括來(lái)說(shuō)就是:
- mandatory 標(biāo)志告訴服務(wù)器至少將該消息 route 到一個(gè)隊(duì)列中,否則將消息返還給生產(chǎn)者;
- immediate 標(biāo)志告訴服務(wù)器如果該消息關(guān)聯(lián)的 queue上有消費(fèi)者,則馬上將消息投遞給它,如果所有 queue 都沒(méi)有消費(fèi)者,直接把消息返還給生產(chǎn)者,不用將消息入隊(duì)列等待消費(fèi)者了。
4、交換器持久化
對(duì)于消息的可靠性來(lái)說(shuō),只需要設(shè)置隊(duì)列的持久化和消息的持久化即可。exchange 的持久化并沒(méi)有什么影響,但是,如果 exchange 不設(shè)置持久化的話,當(dāng) broker 服務(wù)重啟之后,exchange 將不復(fù)存在,這樣會(huì)導(dǎo)致消息發(fā)送者 producer 無(wú)法正常發(fā)送消息。
所以,建議同樣設(shè)置 exchange 的持久化。而 exchange 的持久化設(shè)置也特別簡(jiǎn)單,設(shè)置方法原型如下:
Exchange.DeclareOk exchangeDeclare(String exchange,String type,boolean durable)throws IOException;
- exchange:交換器的名稱;
- type:交換器的類型,常見(jiàn)的如
fanout direct topic
; - durable:持久話標(biāo)志位, durable 設(shè)置為
true
表示持久化, 反之為非持久 。
所以,只需要在聲明的時(shí)候?qū)?durable 字段設(shè)置為 true 即可:
channel.exchangeDeclare(exchangeName, “direct/topic/header/fanout”, true);
二、RabbitMQ 知識(shí)擴(kuò)展
1、內(nèi)存告警與內(nèi)存換頁(yè)
1-1、內(nèi)存告警
當(dāng)內(nèi)存使用超過(guò)配置的閾值時(shí),RabbitMQ 會(huì)暫時(shí)阻塞客戶端的連接,并停止接收從客戶端發(fā)來(lái)的消息,以避免服務(wù)崩潰,客戶端與服務(wù)端的心跳檢測(cè)也會(huì)失敗。
當(dāng)出現(xiàn)內(nèi)存告警時(shí),可以通過(guò)管理命令臨時(shí)調(diào)整內(nèi)存大?。?/p>
RabbitMQctl set_vm_memory_high_watermark <fraction>
fraction
為內(nèi)存閾值,RabbitMQ 默認(rèn)是 0.4,表示當(dāng) RabbitMQ 使用的內(nèi)存超過(guò)總內(nèi)存的 40% 時(shí),就會(huì)產(chǎn)生告警并阻塞所有生產(chǎn)則連接。
通過(guò)此命令修改的閾值在 RabbitMQ 重啟之后將會(huì)失效,如果想要設(shè)置的閾值永久有效需要修改配置文件:
# 相對(duì)值,也就是前面的fraction,建議設(shè)置在0.4~0.66之間,不要超過(guò)0.7 vm_memory_high_watermark.relative=0.4 # 絕對(duì)值,單位為KB,MB,GB,對(duì)應(yīng)的臨時(shí)命令是:RabbitMQctl set_vm_memory_high_watermark absolute <value> #vm_memory_high_watermark.absolute=1GB
修改完配置文件后,需要重啟服務(wù)才會(huì)生效。
1-2、內(nèi)存換頁(yè)
在某個(gè) broker 節(jié)點(diǎn)觸及內(nèi)存閾值并阻塞生產(chǎn)者之前,它會(huì)嘗試將隊(duì)列內(nèi)存中的消息換頁(yè)存儲(chǔ)到磁盤以釋放內(nèi)存空間。持久化和非持久化的消息都會(huì)被轉(zhuǎn)儲(chǔ)到磁盤中,其中持久化的消息本身就在磁盤中有一個(gè)備份,所以這里會(huì)將持久化的消息直接從內(nèi)存中清除掉。
默認(rèn)情況下,在內(nèi)存使用達(dá)到設(shè)置的閾值的 50% 時(shí)會(huì)進(jìn)行換頁(yè)操作。也就是說(shuō),在默認(rèn)的內(nèi)存閾值 40% 的情況下,當(dāng)內(nèi)存超過(guò) 40% * 50% = 20%
時(shí)會(huì)經(jīng)行換頁(yè)動(dòng)作。
內(nèi)存換頁(yè)閾值可以通過(guò)在配置文件中設(shè)置來(lái)進(jìn)行調(diào)整:
vm_memory_high_watermark_paging_ratio=0.75
上面的配置將會(huì)在 RabbitMQ 內(nèi)存使用率達(dá)到 30%(假設(shè)內(nèi)存閾值是 0.4)時(shí)進(jìn)行換頁(yè)動(dòng)作,并在 40% 時(shí)阻塞生產(chǎn)者(當(dāng) vm_memory_high_watermark_paging_ratio 的值大于 1 時(shí),相當(dāng)于禁用了換頁(yè)功能)。
2、磁盤告警與配置
2-1、磁盤告警
當(dāng)磁盤剩余空間低于設(shè)置的閾值時(shí),RabbitMQ 同樣會(huì)阻塞生產(chǎn)者,這樣可以避免因非持久化的消息持續(xù)換頁(yè)而耗盡磁盤空間導(dǎo)致服務(wù)崩潰。
默認(rèn)情況下,磁盤的閾值是50M,表示當(dāng)磁盤剩余空間低于50M時(shí),會(huì)阻塞生產(chǎn)者并停止內(nèi)存中消息的換頁(yè)動(dòng)作。
這個(gè)閾值的設(shè)置可以減小,但不能完全消除因磁盤耗盡而導(dǎo)致崩潰的可能性。比如在兩次磁盤空間檢測(cè)期間內(nèi),磁盤空間從大于50M被耗盡到0M。
2-2、修改磁盤告警閾值
可以通過(guò)以下命令臨時(shí)調(diào)整磁盤閾值:
#設(shè)置具體大小,單位為KB/MB/GB RabbitMQctl set_disk_free_limit <disk_limit> #設(shè)置相對(duì)值,建議取值為1.0~2.0(相對(duì)于內(nèi)存的倍數(shù),如內(nèi)存大小是8G,若為1.0,則表示磁盤剩余8G時(shí),阻塞) RabbitMQctl set_disk_free_limit mem_relative <fraction>
如果要永久生效需要對(duì)應(yīng)的配置文件,配置如下(需要重啟生效):
disk_free_limit.relative=2.0 #disk_free_limit_absolute=50MB
這里有個(gè)建議:一個(gè)相對(duì)謹(jǐn)慎的做法是將磁盤閾值設(shè)置為與操作系統(tǒng)所顯示的內(nèi)存大小一致。
3、數(shù)據(jù)寫入磁盤時(shí)機(jī)
- 消息的正常寫入磁盤流程為:消息數(shù)據(jù)寫入到緩存 Buffer 中(大小為 1 M),Buffer 數(shù)據(jù)滿了之后會(huì)寫入內(nèi)存文件中,最后再刷新到磁盤文件中;
- 存在個(gè)固定的刷盤時(shí)間:25ms,也就是不管 Buffe r滿不滿,每隔 25ms,Buffer 里的數(shù)據(jù)及未刷新到磁盤的文件內(nèi)容必定會(huì)刷到磁盤;
- 每次消息寫入后,如果沒(méi)有后續(xù)寫入請(qǐng)求,則會(huì)直接將已寫入的消息刷到磁盤:使用 Erlang 的 receive x after 0 來(lái)實(shí)現(xiàn)。只要進(jìn)程的信箱里沒(méi)有消息,則產(chǎn)生一個(gè) timeout 消息,而 timeout 會(huì)觸發(fā)刷盤操作。
4、磁盤消息格式
消息保存于 $MNESIA/msg_store_persistent/x.rdq 文件中,其中 x 為數(shù)字編號(hào),從 1 開始,每個(gè)文件最大為 16M(16777216),超過(guò)這個(gè)大小會(huì)生成新的文件,文件編號(hào)加 1。
文件中的消息格式如下:
<<Size:64, MsgId:16/binary, MsgBody>>
- MsgId 為 RabbitMQ 通過(guò)
rabbit_guid:gen()
每一個(gè)消息生成的 GUID; - MsgBody 會(huì)包含消息對(duì)應(yīng)的 exchange,routing_keys,消息的內(nèi)容,消息對(duì)應(yīng)的協(xié)議版本,消息內(nèi)容格式(二進(jìn)制還是其它)等等。
5、磁盤文件刪除機(jī)制
- 當(dāng)所有磁盤文件中的垃圾消息(已經(jīng)被刪除的消息)比例大于閾值(GARBAGE_FRACTION = 0.5)時(shí),會(huì)觸發(fā)文件合并操作(至少有三個(gè)文件存在的情況下),以提高磁盤利用率。
- publish 消息時(shí)寫入內(nèi)容,ack 消息時(shí)刪除內(nèi)容(更新該文件的有用數(shù)據(jù)大?。?dāng)一個(gè)文件的有用數(shù)據(jù)等于 0 時(shí),刪除該文件。
到此這篇關(guān)于RabbitMQ消息隊(duì)列之持久化機(jī)制詳解的文章就介紹到這了,更多相關(guān)RabbitMQ持久化機(jī)制內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springboot結(jié)合mysql主從來(lái)實(shí)現(xiàn)讀寫分離的方法示例
這篇文章主要介紹了springboot結(jié)合mysql主從來(lái)實(shí)現(xiàn)讀寫分離的方法示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04SpringBoot配置MyBatis-Plus實(shí)現(xiàn)增刪查改
本文主要介紹了SpringBoot配置MyBatis-Plus實(shí)現(xiàn)增刪查改,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-08-08淺談SpringBoot項(xiàng)目打成war和jar的區(qū)別
這篇文章主要介紹了淺談SpringBoot項(xiàng)目打成war和jar的區(qū)別,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11Java實(shí)現(xiàn)猜數(shù)字小游戲(有次數(shù)限制)
這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)猜數(shù)字小游戲,有次數(shù)限制,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2020-05-05spring boot使用logback實(shí)現(xiàn)多環(huán)境日志配置詳解
這篇文章主要介紹了spring boot使用logback實(shí)現(xiàn)多環(huán)境日志配置詳解,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-08-08