RabbitMQ中Confirm消息確認(rèn)機(jī)制保障生產(chǎn)端消息的可靠性詳解
1. 概述
生產(chǎn)者將數(shù)據(jù)發(fā)送到 RabbitMQ 的時(shí)候,可能數(shù)據(jù)就在半路給搞丟了,因?yàn)榫W(wǎng)絡(luò)問(wèn)題啥的,都 有可能。此時(shí)可以開(kāi)啟 confirm 模式,在生產(chǎn)者那里設(shè)置開(kāi)啟 confirm 模式之后,你每次寫(xiě)的消息都會(huì)分配一個(gè)唯一的 id,然后如果寫(xiě)入了 RabbitMQ 中,RabbitMQ 會(huì)給你回傳一個(gè) ack 消息,告訴你說(shuō)這個(gè)消息 ok 了。如果RabbitMQ 沒(méi)能處理這個(gè)消息,會(huì)回調(diào)你的一個(gè) nack 接口,告訴你這個(gè)消息接收失敗,你可以重試。而且你可以結(jié)合這個(gè)機(jī)制自己在內(nèi)存里維護(hù)每個(gè)消息 id 的狀態(tài),如果超過(guò)一定時(shí)間還 沒(méi)接收到這個(gè)消息的回調(diào),那么你可以重發(fā)。

在實(shí)際項(xiàng)目中,可以利用這一機(jī)制保障消息的可靠性投遞,如果消息未發(fā)送成功,可以在監(jiān)聽(tīng)事件中記錄日志、重新發(fā)送消息等操作。
2.原生API中開(kāi)啟Confirm消息確認(rèn)機(jī)制
- 在生產(chǎn)者的channel上開(kāi)啟確認(rèn)機(jī)制: channel.confirmSelect();
- 在channel上添加Confirm監(jiān)聽(tīng)事件: channel.addConfirmListener(new ConfirmListener() ...
2.1 代碼演示
生產(chǎn)者代碼
監(jiān)聽(tīng)事件的兩個(gè)方法:handleAck() 消息投遞成功后回調(diào),handleNack 消息未成功投遞回調(diào)
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//設(shè)置虛擬主機(jī)
connectionFactory.setVirtualHost("/");
//創(chuàng)建一個(gè)鏈接
Connection connection = connectionFactory.newConnection();
//創(chuàng)建channel
Channel channel = connection.createChannel();
//消息的確認(rèn)模式
channel.confirmSelect();
String exchangeName="test_confirm_exchange";
String routeKey="confirm.test";
String msg="RabbitMQ send message confirm test!";
for (int i=0;i<5;i++){
channel.basicPublish(exchangeName,routeKey,null,msg.getBytes());
}
//確定監(jiān)聽(tīng)事件
channel.addConfirmListener(new ConfirmListener() {
/**
* 消息成功發(fā)送
* @param deliveryTag 消息唯一標(biāo)簽
* @param multiple 是否批量
* @throws IOException
*/
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("**********Ack*********");
}
/**
* 消息沒(méi)有成功發(fā)送
* @param deliveryTag
* @param multiple
* @throws IOException
*/
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("**********No Ack*********");
}
});
}消費(fèi)者端代碼
public static void main(String[] args) throws Exception{
System.out.println("======消息接收start==========");
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//設(shè)置虛擬主機(jī)
connectionFactory.setVirtualHost("/");
//創(chuàng)建鏈接
Connection connection = connectionFactory.newConnection();
//創(chuàng)建channel
Channel channel = connection.createChannel();
String exchangeName="test_confirm_exchange";
String exchangeType="topic";
//聲明Exchange
channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
String queueName="test_confirm_queue";
//聲明隊(duì)列
channel.queueDeclare(queueName,true,false,false,null);
String routeKey="confirm.#";
//綁定隊(duì)列和交換機(jī)
channel.queueBind(queueName,exchangeName,routeKey);
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息::"+new String(body));
}
});
}到此這篇關(guān)于RabbitMQ中Confirm消息確認(rèn)機(jī)制保障生產(chǎn)端消息的可靠性詳解的文章就介紹到這了,更多相關(guān)Confirm消息確認(rèn)機(jī)制內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
圖文教程教你IDEA中的Spring環(huán)境搭建+簡(jiǎn)單入門(mén)
這篇文章主要介紹了圖文教程教你IDEA中的Spring環(huán)境搭建+簡(jiǎn)單入門(mén),Spring的環(huán)境搭建使用Maven更加方便,需要的朋友可以參考下2023-03-03
Java高并發(fā)BlockingQueue重要的實(shí)現(xiàn)類詳解
這篇文章主要給大家介紹了關(guān)于Java高并發(fā)BlockingQueue重要的實(shí)現(xiàn)類的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-01-01
Java實(shí)現(xiàn)較大二進(jìn)制文件的讀、寫(xiě)方法
本篇文章主要介紹了Java實(shí)現(xiàn)較大二進(jìn)制文件的讀、寫(xiě)方法,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-02-02
Java搭建簡(jiǎn)單Netty開(kāi)發(fā)環(huán)境入門(mén)教程
這篇文章主要介紹了Java搭建簡(jiǎn)單Netty開(kāi)發(fā)環(huán)境入門(mén)教程,有詳細(xì)的代碼展示和maven依賴,能夠幫助你快速上手Netty開(kāi)發(fā)框架,需要的朋友可以參考下2021-06-06
基于MyBatis的parameterType傳入?yún)?shù)類型
這篇文章主要介紹了基于MyBatis的parameterType傳入?yún)?shù)類型,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09
java.imageIo給圖片添加水印的實(shí)現(xiàn)代碼
最近項(xiàng)目在做一個(gè)商城項(xiàng)目, 項(xiàng)目上的圖片要添加水?、?添加圖片水印;②:添加文字水印;一下提供下個(gè)方法,希望大家可以用得著2013-07-07
Java接口請(qǐng)求重試機(jī)制的幾種常見(jiàn)方法
Java接口請(qǐng)求重試機(jī)制是保證系統(tǒng)穩(wěn)定性和容錯(cuò)能力的重要手段之一,當(dāng)接口請(qǐng)求發(fā)生失敗或暫時(shí)性錯(cuò)誤時(shí),通過(guò)重試機(jī)制可以提高請(qǐng)求的成功率,本文將詳細(xì)介紹Java接口請(qǐng)求重試機(jī)制的幾種常見(jiàn)方法,需要的朋友可以參考下2023-11-11
淺析Spring IOC bean為什么默認(rèn)是單例
單例的意思就是說(shuō)在 Spring IoC 容器中只會(huì)存在一個(gè) bean 的實(shí)例,無(wú)論一次調(diào)用還是多次調(diào)用,始終指向的都是同一個(gè) bean 對(duì)象,本文小編將和大家一起分析Spring IOC bean為什么默認(rèn)是單例,需要的朋友可以參考下2023-12-12

