Java RabbitMQ 中的消息長期不消費(fèi)會過期嗎
RabbitMQ 中的消息長期未被消費(fèi)會過期嗎?用過 RabbitMQ 的小伙伴可能都有這樣的疑問,今天松哥就來和大家捋一捋這個問題。
1. 默認(rèn)情況
首先我們來看看默認(rèn)情況。
默認(rèn)情況下,消息是不會過期的,也就是我們平日里在消息發(fā)送時,如果不設(shè)置任何消息過期的相關(guān)參數(shù),那么消息是不會過期的,即使消息沒被消費(fèi)掉,也會一直存儲在隊列中。
這種情況具體代碼就不用我再演示了吧,松哥之前的文章凡是涉及到 RabbitMQ 的,基本上都是這樣的。
2. TTL
TTL(Time-To-Live),消息存活的時間,即消息的有效期。如果我們希望消息能夠有一個存活時間,那么我們可以通過設(shè)置 TTL 來實(shí)現(xiàn)這一需求。如果消息的存活時間超過了 TTL 并且還沒有被消息,此時消息就會變成死信
,關(guān)于死信
以及死信隊列
,松哥后面再和大家介紹。
TTL 的設(shè)置有兩種不同的方式:
- 在聲明隊列的時候,我們可以在隊列屬性中設(shè)置消息的有效期,這樣所有進(jìn)入該隊列的消息都會有一個相同的有效期。
- 在發(fā)送消息的時候設(shè)置消息的有效期,這樣不同的消息就具有不同的有效期。
那如果兩個都設(shè)置了呢?
以時間短的為準(zhǔn)。
當(dāng)我們設(shè)置了消息有效期后,消息過期了就會被從隊列中刪除了(進(jìn)入到死信隊列,后文一樣,不再標(biāo)注),但是兩種方式對應(yīng)的刪除時機(jī)有一些差異:
對于第一種方式,當(dāng)消息隊列設(shè)置過期時間的時候,那么消息過期了就會被刪除,因為消息進(jìn)入 RabbitMQ 后是存在一個消息隊列中,隊列的頭部是最早要過期的消息,所以 RabbitMQ 只需要一個定時任務(wù),從頭部開始掃描是否有過期消息,有的話就直接刪除。對于第二種方式,當(dāng)消息過期后并不會立馬被刪除,而是當(dāng)消息要投遞給消費(fèi)者的時候才會去刪除,因為第二種方式,每條消息的過期時間都不一樣,想要知道哪條消息過期,必須要遍歷隊列中的所有消息才能實(shí)現(xiàn),當(dāng)消息比較多時這樣就比較耗費(fèi)性能,因此對于第二種方式,當(dāng)消息要投遞給消費(fèi)者的時候才去刪除。
介紹完 TTL 之后,接下來我們來看看具體用法。
接下來所有代碼松哥都以 Spring Boot 中封裝的 AMPQ 為例來講解。
2.1 單條消息過期
我們先來看單條消息的過期時間。
首先創(chuàng)建一個 Spring Boot 項目,引入 Web 和 RabbitMQ 依賴,如下:
然后在 application.properties 中配置一下 RabbitMQ 的連接信息,如下:
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
接下來稍微配置一下消息隊列:
@Configuration public class QueueConfig { public static final String JAVABOY_QUEUE_DEMO = "javaboy_queue_demo"; public static final String JAVABOY_EXCHANGE_DEMO = "javaboy_exchange_demo"; public static final String HELLO_ROUTING_KEY = "hello_routing_key"; @Bean Queue queue() { return new Queue(JAVABOY_QUEUE_DEMO, true, false, false); } @Bean DirectExchange directExchange() { return new DirectExchange(JAVABOY_EXCHANGE_DEMO, true, false); } @Bean Binding binding() { return BindingBuilder.bind(queue()) .to(directExchange()) .with(HELLO_ROUTING_KEY); } }
這個配置類主要干了三件事:配置消息隊列、配置交換機(jī)以及將兩者綁定在一起。
- 首先配置一個消息隊列,new 一個 Queue:第一個參數(shù)是消息隊列的名字;第二個參數(shù)表示消息是否持久化;第三個參數(shù)表示消息隊列是否排他,一般我們都是設(shè)置為 false,即不排他;第四個參數(shù)表示如果該隊列沒有任何訂閱的消費(fèi)者的話,該隊列會被自動刪除,一般適用于臨時隊列。
- 配置一個 DirectExchange 交換機(jī)。
- 將交換機(jī)和隊列綁定到一起。
這段配置應(yīng)該很簡單,沒啥好解釋的,有一個排他性,松哥這里稍微多說兩句:
關(guān)于排他性,如果設(shè)置為 true,則該消息隊列只有創(chuàng)建它的 Connection 才能訪問,其他的 Connection 都不能訪問該消息隊列,如果試圖在不同的連接中重新聲明或者訪問排他性隊列,那么系統(tǒng)會報一個資源被鎖定的錯誤。另一方面,對于排他性隊列而言,當(dāng)連接斷掉的時候,該消息隊列也會自動刪除(無論該隊列是否被聲明為持久性隊列都會被刪除)。
接下來提供一個消息發(fā)送接口,如下:
@RestController public class HelloController { @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/hello") public void hello() { Message message = MessageBuilder.withBody("hello javaboy".getBytes()) .setExpiration("10000") .build(); rabbitTemplate.convertAndSend(QueueConfig.JAVABOY_QUEUE_DEMO, message); } }
在創(chuàng)建 Message 對象的時候我們可以設(shè)置消息的過期時間,這里設(shè)置消息的過期時間為 10 秒。
這就可以啦!
接下來我們啟動項目,進(jìn)行消息發(fā)送測試。當(dāng)消息發(fā)送成功之后,由于沒有消費(fèi)者,所以這條消息并不會被消費(fèi)。打開 RabbitMQ 管理頁面,點(diǎn)擊到 Queues 選項卡,10s 之后,我們會發(fā)現(xiàn)消息已經(jīng)不見了:
很簡單吧!
單條消息設(shè)置過期時間,就是在消息發(fā)送的時候設(shè)置一下消息有效期即可。
2.2 隊列消息過期
給隊列設(shè)置消息過期時間,方式如下:
@Bean Queue queue() { Map<String, Object> args = new HashMap<>(); args.put("x-message-ttl", 10000); return new Queue(JAVABOY_QUEUE_DEMO, true, false, false, args); }
設(shè)置完成后,我們修改消息的發(fā)送邏輯,如下:
@RestController public class HelloController { @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/hello") public void hello() { Message message = MessageBuilder.withBody("hello javaboy".getBytes()) .build(); rabbitTemplate.convertAndSend(QueueConfig.JAVABOY_QUEUE_DEMO, message); } }
可以看到,消息正常發(fā)送即可,不用設(shè)置消息過期時間。
OK,啟動項目,發(fā)送一條消息進(jìn)行測試。查看 RabbitMQ 管理頁面,如下:
可以看到,消息隊列的 Features 屬性為 D 和 TTL,D 表示消息隊列中消息持久化,TTL 則表示消息會過期。
10s 之后刷新頁面,發(fā)現(xiàn)消息數(shù)量已經(jīng)恢復(fù)為 0。
這就是給消息隊列設(shè)置消息過期時間,一旦設(shè)置了,所有進(jìn)入到該隊列的消息都有一個過期時間了。
2.3 特殊情況
還有一種特殊情況,就是將消息的過期時間 TTL 設(shè)置為 0,這表示如果消息不能立馬消費(fèi)則會被立即丟掉,這個特性可以部分替代 RabbitMQ3.0 以前支持的 immediate 參數(shù),之所以所部分代替,是因為 immediate 參數(shù)在投遞失敗會有 basic.return 方法將消息體返回(這個功能可以利用死信隊列來實(shí)現(xiàn))。
具體代碼松哥就不演示了,這個應(yīng)該比較容易。
3. 死信隊列
有小伙伴不禁要問,被刪除的消息去哪了?真的被刪除了嗎?非也非也!這就涉及到死信隊列了,接下來我們來看看死信隊列。
3.1 死信交換機(jī)
死信交換機(jī),Dead-Letter-Exchange 即 DLX。
死信交換機(jī)用來接收死信消息(Dead Message)的,那什么是死信消息呢?一般消息變成死信消息有如下幾種情況:
- 消息被拒絕(Basic.Reject/Basic.Nack) ,井且設(shè)置requeue 參數(shù)為false
- 消息過期
- 隊列達(dá)到最大長度
當(dāng)消息在一個隊列中變成了死信消息后,此時就會被發(fā)送到 DLX,綁定 DLX 的消息隊列則稱為死信隊列。
DLX 本質(zhì)上也是一個普普通通的交換機(jī),我們可以為任意隊列指定 DLX,當(dāng)該隊列中存在死信時,RabbitMQ 就會自動的將這個死信發(fā)布到 DLX 上去,進(jìn)而被路由到另一個綁定了 DLX 的隊列上(即死信隊列)。
3.2 死信隊列
這個好理解,綁定了死信交換機(jī)的隊列就是死信隊列。
3.3 實(shí)踐
我們來看一個簡單的例子。
首先我們來創(chuàng)建一個死信交換機(jī),接著創(chuàng)建一個死信隊列,再將死信交換機(jī)和死信隊列綁定到一起:
public static final String DLX_EXCHANGE_NAME = "dlx_exchange_name"; public static final String DLX_QUEUE_NAME = "dlx_queue_name"; public static final String DLX_ROUTING_KEY = "dlx_routing_key"; /** * 配置死信交換機(jī) * * @return */ @Bean DirectExchange dlxDirectExchange() { return new DirectExchange(DLX_EXCHANGE_NAME, true, false); } /** * 配置死信隊列 * @return */ @Bean Queue dlxQueue() { return new Queue(DLX_QUEUE_NAME); } /** * 綁定死信隊列和死信交換機(jī) * @return */ @Bean Binding dlxBinding() { return BindingBuilder.bind(dlxQueue()) .to(dlxDirectExchange()) .with(DLX_ROUTING_KEY); }
這其實(shí)跟普通的交換機(jī),普通的消息隊列沒啥兩樣。
接下來為消息隊列配置死信交換機(jī),如下:
@Bean Queue queue() { Map<String, Object> args = new HashMap<>(); //設(shè)置消息過期時間 args.put("x-message-ttl", 0); //設(shè)置死信交換機(jī) args.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME); //設(shè)置死信 routing_key args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY); return new Queue(JAVABOY_QUEUE_DEMO, true, false, false, args); }
就兩個參數(shù):
- x-dead-letter-exchange:配置死信交換機(jī)。
- x-dead-letter-routing-key:配置死信
routing_key
。
這就配置好了。
將來發(fā)送到這個消息隊列上的消息,如果發(fā)生了 nack、reject 或者過期等問題,就會被發(fā)送到 DLX 上,進(jìn)而進(jìn)入到與 DLX 綁定的消息隊列上。
死信消息隊列的消費(fèi)和普通消息隊列的消費(fèi)并無二致:
@RabbitListener(queues = QueueConfig.DLX_QUEUE_NAME) public void dlxHandle(String msg) { System.out.println("dlx msg = " + msg); }
很容易吧~
4. 小結(jié)
好啦,今天就和小伙伴們聊一聊 RabbitMQ 中的消息過期問題,感興趣的小伙伴可以去試試哦~
參考資料:
到此這篇關(guān)于Java RabbitMQ 中的消息長期不消費(fèi)會過期嗎的文章就介紹到這了,更多相關(guān)Java RabbitMQ 消息過期內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java字節(jié)與字符流永久存儲json數(shù)據(jù)
本篇文章給大家詳細(xì)講述了Java字節(jié)與字符流永久存儲json數(shù)據(jù)的方法,以及代碼分享,有興趣的參考學(xué)習(xí)下。2018-02-02hadoop中實(shí)現(xiàn)java網(wǎng)絡(luò)爬蟲(示例講解)
下面小編就為大家?guī)硪黄猦adoop中實(shí)現(xiàn)java網(wǎng)絡(luò)爬蟲(示例講解)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-09-09詳解Java的Hibernate框架中的搜索工具的運(yùn)用
這篇文章主要介紹了詳解Java的Hibernate框架中的搜索工具的運(yùn)用,Hibernate是Java的SSH三大web開發(fā)框架之一,需要的朋友可以參考下2015-11-11