RabbitMQ中Confirm消息確認(rèn)機(jī)制保障生產(chǎn)端消息的可靠性詳解
1. 概述
生產(chǎn)者將數(shù)據(jù)發(fā)送到 RabbitMQ 的時候,可能數(shù)據(jù)就在半路給搞丟了,因?yàn)榫W(wǎng)絡(luò)問題啥的,都 有可能。此時可以開啟 confirm 模式,在生產(chǎn)者那里設(shè)置開啟 confirm 模式之后,你每次寫的消息都會分配一個唯一的 id,然后如果寫入了 RabbitMQ 中,RabbitMQ 會給你回傳一個 ack 消息,告訴你說這個消息 ok 了。如果RabbitMQ 沒能處理這個消息,會回調(diào)你的一個 nack 接口,告訴你這個消息接收失敗,你可以重試。而且你可以結(jié)合這個機(jī)制自己在內(nèi)存里維護(hù)每個消息 id 的狀態(tài),如果超過一定時間還 沒接收到這個消息的回調(diào),那么你可以重發(fā)。
在實(shí)際項(xiàng)目中,可以利用這一機(jī)制保障消息的可靠性投遞,如果消息未發(fā)送成功,可以在監(jiān)聽事件中記錄日志、重新發(fā)送消息等操作。
2.原生API中開啟Confirm消息確認(rèn)機(jī)制
- 在生產(chǎn)者的channel上開啟確認(rèn)機(jī)制: channel.confirmSelect();
- 在channel上添加Confirm監(jiān)聽事件: channel.addConfirmListener(new ConfirmListener() ...
2.1 代碼演示
生產(chǎn)者代碼
監(jiān)聽事件的兩個方法:handleAck() 消息投遞成功后回調(diào),handleNack 消息未成功投遞回調(diào)
public static void main(String[] args) throws Exception{ ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //設(shè)置虛擬主機(jī) connectionFactory.setVirtualHost("/"); //創(chuàng)建一個鏈接 Connection connection = connectionFactory.newConnection(); //創(chuàng)建channel Channel channel = connection.createChannel(); //消息的確認(rèn)模式 channel.confirmSelect(); String exchangeName="test_confirm_exchange"; String routeKey="confirm.test"; String msg="RabbitMQ send message confirm test!"; for (int i=0;i<5;i++){ channel.basicPublish(exchangeName,routeKey,null,msg.getBytes()); } //確定監(jiān)聽事件 channel.addConfirmListener(new ConfirmListener() { /** * 消息成功發(fā)送 * @param deliveryTag 消息唯一標(biāo)簽 * @param multiple 是否批量 * @throws IOException */ @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("**********Ack*********"); } /** * 消息沒有成功發(fā)送 * @param deliveryTag * @param multiple * @throws IOException */ @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("**********No Ack*********"); } }); }
消費(fèi)者端代碼
public static void main(String[] args) throws Exception{ System.out.println("======消息接收start=========="); ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //設(shè)置虛擬主機(jī) connectionFactory.setVirtualHost("/"); //創(chuàng)建鏈接 Connection connection = connectionFactory.newConnection(); //創(chuàng)建channel Channel channel = connection.createChannel(); String exchangeName="test_confirm_exchange"; String exchangeType="topic"; //聲明Exchange channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null); String queueName="test_confirm_queue"; //聲明隊(duì)列 channel.queueDeclare(queueName,true,false,false,null); String routeKey="confirm.#"; //綁定隊(duì)列和交換機(jī) channel.queueBind(queueName,exchangeName,routeKey); channel.basicConsume(queueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到消息::"+new String(body)); } }); }
到此這篇關(guān)于RabbitMQ中Confirm消息確認(rèn)機(jī)制保障生產(chǎn)端消息的可靠性詳解的文章就介紹到這了,更多相關(guān)Confirm消息確認(rèn)機(jī)制內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
圖文教程教你IDEA中的Spring環(huán)境搭建+簡單入門
這篇文章主要介紹了圖文教程教你IDEA中的Spring環(huán)境搭建+簡單入門,Spring的環(huán)境搭建使用Maven更加方便,需要的朋友可以參考下2023-03-03Java高并發(fā)BlockingQueue重要的實(shí)現(xiàn)類詳解
這篇文章主要給大家介紹了關(guān)于Java高并發(fā)BlockingQueue重要的實(shí)現(xiàn)類的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-01-01Java實(shí)現(xiàn)較大二進(jìn)制文件的讀、寫方法
本篇文章主要介紹了Java實(shí)現(xiàn)較大二進(jìn)制文件的讀、寫方法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-02-02Java搭建簡單Netty開發(fā)環(huán)境入門教程
這篇文章主要介紹了Java搭建簡單Netty開發(fā)環(huán)境入門教程,有詳細(xì)的代碼展示和maven依賴,能夠幫助你快速上手Netty開發(fā)框架,需要的朋友可以參考下2021-06-06基于MyBatis的parameterType傳入?yún)?shù)類型
這篇文章主要介紹了基于MyBatis的parameterType傳入?yún)?shù)類型,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09java.imageIo給圖片添加水印的實(shí)現(xiàn)代碼
最近項(xiàng)目在做一個商城項(xiàng)目, 項(xiàng)目上的圖片要添加水?、?添加圖片水印;②:添加文字水印;一下提供下個方法,希望大家可以用得著2013-07-07淺析Spring IOC bean為什么默認(rèn)是單例
單例的意思就是說在 Spring IoC 容器中只會存在一個 bean 的實(shí)例,無論一次調(diào)用還是多次調(diào)用,始終指向的都是同一個 bean 對象,本文小編將和大家一起分析Spring IOC bean為什么默認(rèn)是單例,需要的朋友可以參考下2023-12-12