RabbitMQ工作模式中的發(fā)布確認(rèn)模式示例詳解
發(fā)布確認(rèn)模式
概述
發(fā)布確認(rèn)模式用于確保消息已經(jīng)被正確地發(fā)送到RabbitMQ服務(wù)器,并被成功接收和持久化。通過使用發(fā)布確認(rèn),生產(chǎn)者可以獲得對消息的可靠性保證,避免消息丟失。這一機(jī)制基于通道(Channel)級別,通過兩個(gè)階段的確認(rèn)來保證消息的可靠性。

消息丟失問題
作為消息中間件, 都會(huì)?臨消息丟失的問題.
消息丟失?概分為三種情況:
1. ?產(chǎn)者問題. 因?yàn)閼?yīng)?程序故障, ?絡(luò)抖動(dòng)等各種原因, ?產(chǎn)者沒有成功向broker發(fā)送消息.
2. 消息中間件??問題. ?產(chǎn)者成功發(fā)送給了Broker, 但是Broker沒有把消息保存好, 導(dǎo)致消息丟失.
3. 消費(fèi)者問題. Broker 發(fā)送消息到消費(fèi)者, 消費(fèi)者在消費(fèi)消息時(shí), 因?yàn)闆]有處理好, 導(dǎo)致broker將消費(fèi)失敗的消息從隊(duì)列中刪除了。

RabbitMQ也對上述問題給出了相應(yīng)的解決?案. 問題2可以通過持久化機(jī)制. 問題3可以采?消息應(yīng)答機(jī)制.
針對問題1, 可以采?發(fā)布確認(rèn)(Publisher Confirms)機(jī)制實(shí)現(xiàn).
發(fā)布確認(rèn)的三種模式
RabbitMQ的發(fā)布確認(rèn)模式主要有三種形式:單條確認(rèn)、批量確認(rèn)和異步確認(rèn)。
單條確認(rèn)(Single Publisher Confirm)
特點(diǎn):在發(fā)布一條消息后,等待服務(wù)器確認(rèn)該消息是否成功接收。
優(yōu)點(diǎn):實(shí)現(xiàn)簡單,每條消息的確認(rèn)狀態(tài)清晰。
缺點(diǎn):性能開銷較大,特別是在高并發(fā)的場景下,因?yàn)槊織l消息都需要等待服務(wù)器的確認(rèn)。
批量確認(rèn)(Batch Publisher Confirm)
特點(diǎn):允許在一次性確認(rèn)多個(gè)消息是否成功被服務(wù)器接收。
優(yōu)點(diǎn):在大量消息的場景中可以提高效率,因?yàn)榭梢詼p少確認(rèn)消息的數(shù)量。
缺點(diǎn):當(dāng)一批消息中有一條消息發(fā)送失敗時(shí),整個(gè)批量確認(rèn)失敗。此時(shí)需要重新發(fā)送整批消息,但不知道是哪條消息發(fā)送失敗,增加了調(diào)試和處理的難度。
異步確認(rèn)(Asynchronous Confirm)
特點(diǎn):通過回調(diào)函數(shù)處理消息的確認(rèn)和未確認(rèn)事件,更加靈活。
優(yōu)點(diǎn):在異步場景中能夠更好地處理消息的狀態(tài),提高了系統(tǒng)的并發(fā)性能和響應(yīng)速度。
缺點(diǎn):實(shí)現(xiàn)相對復(fù)雜,需要處理回調(diào)函數(shù)的邏輯和狀態(tài)管理。
實(shí)現(xiàn)步驟
1.設(shè)置通道為發(fā)布確認(rèn)模式:在生產(chǎn)者發(fā)送消息之前,需要將通道設(shè)置為發(fā)布確認(rèn)模式。這可以通過調(diào)用channel.confirmSelect()方法來實(shí)現(xiàn)。
2.發(fā)送消息并等待確認(rèn):生產(chǎn)者發(fā)送消息時(shí),每條消息都會(huì)分配一個(gè)唯一的、遞增的整數(shù)ID(DeliveryTag)。生產(chǎn)者可以通過調(diào)用channel.waitForConfirms()方法來等待所有已發(fā)送消息的確認(rèn),或者通過其他方式處理確認(rèn)回調(diào)。
3.處理確認(rèn)回調(diào):為了處理確認(rèn)回調(diào),需要?jiǎng)?chuàng)建一個(gè)ConfirmCallback接口的實(shí)現(xiàn)。在實(shí)現(xiàn)的handleAck()方法中,可以處理成功接收到確認(rèn)的消息的邏輯;在handleNack()方法中,可以處理未成功接收到確認(rèn)的消息的邏輯。
應(yīng)用場景
發(fā)布確認(rèn)模式適用于對數(shù)據(jù)安全性要求較高的場景,如金融交易、訂單處理等。在這些場景中,消息的丟失或重復(fù)都可能導(dǎo)致嚴(yán)重的業(yè)務(wù)問題。通過使用發(fā)布確認(rèn)模式,可以確保消息被正確地發(fā)送到RabbitMQ服務(wù)器,并被成功接收和持久化,從而提高了系統(tǒng)的可靠性和穩(wěn)定性。
代碼案例
引入依賴
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.21.0</version>
</dependency>常量類
public class Constants {
public static final String HOST = "47.98.109.138";
public static final int PORT = 5672;
public static final String USER_NAME = "study";
public static final String PASSWORD = "study";
public static final String VIRTUAL_HOST = "aaa";
//publisher confirms
public static final String PUBLISHER_CONFIRMS_QUEUE1 = "publisher.confirms.queue1";
public static final String PUBLISHER_CONFIRMS_QUEUE2 = "publisher.confirms.queue2";
public static final String PUBLISHER_CONFIRMS_QUEUE3 = "publisher.confirms.queue3";
}單條確認(rèn)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
public class PublisherConfirms {
private static final Integer MESSAGE_COUNT = 100;
static Connection createConnection() throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前開放端口號
connectionFactory.setUsername(Constants.USER_NAME);//賬號
connectionFactory.setPassword(Constants.PASSWORD); //密碼
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機(jī)
return connectionFactory.newConnection();
}
public static void main(String[] args) throws Exception {
//Strategy #1: Publishing Messages Individually
//單獨(dú)確認(rèn)
publishingMessagesIndividually();
}
/**
* 單獨(dú)確認(rèn)
*/
private static void publishingMessagesIndividually() throws Exception {
try(Connection connection = createConnection()) {
//1. 開啟信道
Channel channel = connection.createChannel();
//2. 設(shè)置信道為confirm模式
channel.confirmSelect();
//3. 聲明隊(duì)列
channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1, true, false, false, null);
//4. 發(fā)送消息, 并等待確認(rèn)
long start = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String msg = "hello publisher confirms"+i;
channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes());
//等待確認(rèn)
channel.waitForConfirmsOrDie(5000);
}
long end = System.currentTimeMillis();
System.out.printf("單獨(dú)確認(rèn)策略, 消息條數(shù): %d, 耗時(shí): %d ms \n",MESSAGE_COUNT, end-start);
}
}
}運(yùn)行代碼

我們可以看到,以發(fā)送消息條數(shù)為100條為例,單條確認(rèn)模式是非常耗時(shí)的。
批量確認(rèn)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
public class PublisherConfirms {
private static final Integer MESSAGE_COUNT = 10000;
static Connection createConnection() throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前開放端口號
connectionFactory.setUsername(Constants.USER_NAME);//賬號
connectionFactory.setPassword(Constants.PASSWORD); //密碼
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機(jī)
return connectionFactory.newConnection();
}
public static void main(String[] args) throws Exception {
//Strategy #2: Publishing Messages in Batches
//批量確認(rèn)
publishingMessagesInBatches();
}
/**
* 批量確認(rèn)
* @throws Exception
*/
private static void publishingMessagesInBatches() throws Exception{
try(Connection connection = createConnection()) {
//1. 開啟信道
Channel channel = connection.createChannel();
//2. 設(shè)置信道為confirm模式
channel.confirmSelect();
//3. 聲明隊(duì)列
channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);
//4. 發(fā)送消息, 并進(jìn)行確認(rèn)
long start = System.currentTimeMillis();
int batchSize = 100;
int outstandingMessageCount = 0;
for (int i = 0; i < MESSAGE_COUNT; i++) {
String msg = "hello publisher confirms"+i;
channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes());
outstandingMessageCount++;
if (outstandingMessageCount==batchSize){
channel.waitForConfirmsOrDie(5000);
outstandingMessageCount = 0;
}
}
if (outstandingMessageCount>0){
channel.waitForConfirmsOrDie(5000);
}
long end = System.currentTimeMillis();
System.out.printf("批量確認(rèn)策略, 消息條數(shù): %d, 耗時(shí): %d ms \n",MESSAGE_COUNT, end-start);
}
}
}運(yùn)行代碼

我們可以看到,以發(fā)送消息條數(shù)為10000條為例,單條確認(rèn)模式是比較快的。
異步確認(rèn)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
public class PublisherConfirms {
private static final Integer MESSAGE_COUNT = 10000;
static Connection createConnection() throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前開放端口號
connectionFactory.setUsername(Constants.USER_NAME);//賬號
connectionFactory.setPassword(Constants.PASSWORD); //密碼
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機(jī)
return connectionFactory.newConnection();
}
public static void main(String[] args) throws Exception {
//Strategy #3: Handling Publisher Confirms Asynchronously
//異步確認(rèn)
handlingPublisherConfirmsAsynchronously();
}
/**
* 異步確認(rèn)
*/
private static void handlingPublisherConfirmsAsynchronously() throws Exception{
try (Connection connection = createConnection()){
//1. 開啟信道
Channel channel = connection.createChannel();
//2. 設(shè)置信道為confirm模式
channel.confirmSelect();
//3. 聲明隊(duì)列
channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null);
//4. 監(jiān)聽confirm
//集合中存儲(chǔ)的是未確認(rèn)的消息ID
long start = System.currentTimeMillis();
SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple){
confirmSeqNo.headSet(deliveryTag+1).clear();
}else {
confirmSeqNo.remove(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if (multiple){
confirmSeqNo.headSet(deliveryTag+1).clear();
}else {
confirmSeqNo.remove(deliveryTag);
}
//業(yè)務(wù)需要根據(jù)實(shí)際場景進(jìn)行處理, 比如重發(fā), 此處代碼省略
}
});
//5. 發(fā)送消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String msg = "hello publisher confirms"+i;
long seqNo = channel.getNextPublishSeqNo();
channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes());
confirmSeqNo.add(seqNo);
}
while (!confirmSeqNo.isEmpty()){
Thread.sleep(10);
}
long end = System.currentTimeMillis();
System.out.printf("異步確認(rèn)策略, 消息條數(shù): %d, 耗時(shí): %d ms \n",MESSAGE_COUNT, end-start);
}
}
}運(yùn)行代碼

我們可以看到,以發(fā)送消息條數(shù)為10000條為例,單條確認(rèn)模式是非??斓?。
對比批量確認(rèn)和異步確認(rèn)模式

我們可以看到,異步確認(rèn)模式是比批量確認(rèn)模式快很多的。
到此這篇關(guān)于RabbitMQ工作模式之發(fā)布確認(rèn)模式的文章就介紹到這了,更多相關(guān)RabbitMQ發(fā)布確認(rèn)模式內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot中?Jackson?日期的時(shí)區(qū)和日期格式問題解決
因?yàn)樽罱?xiàng)目需要國際化,需要能夠支持多種國際化語言,目前需要支持三種(法語、英語、簡體中文),這篇文章主要介紹了SpringBoot中?Jackson?日期的時(shí)區(qū)和日期格式問題,需要的朋友可以參考下2022-12-12
Java?FTP協(xié)議實(shí)現(xiàn)文件下載功能
FTP(File?Transfer?Protocol)就是文件傳輸協(xié)議。通過FTP客戶端從遠(yuǎn)程FTP服務(wù)器上拷貝文件到本地計(jì)算機(jī)稱為下載,將本地計(jì)算機(jī)上的文件復(fù)制到遠(yuǎn)程FTP服務(wù)器上稱為上傳,上傳和下載是FTP最常用的兩個(gè)功能2022-11-11
SpringBoot集成本地緩存性能之王Caffeine示例詳解
這篇文章主要為大家介紹了SpringBoot集成本地緩存性能之王Caffeine的示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-07-07
Java中零拷貝和深拷貝的原理及實(shí)現(xiàn)探究(代碼示例)
深拷貝和零拷貝是兩個(gè)在 Java 中廣泛使用的概念,它們分別用于對象復(fù)制和數(shù)據(jù)傳輸優(yōu)化,下面將詳細(xì)介紹這兩個(gè)概念的原理,并給出相應(yīng)的 Java 代碼示例,感興趣的朋友一起看看吧2023-12-12
Java 為什么要避免使用finalizer和Cleaner
這篇文章主要介紹了Java 為什么要避免使用finalizer和Cleaner,幫助大家更好的理解和學(xué)習(xí)使用Java,感興趣的朋友可以了解下2021-03-03
Spring 校驗(yàn)(validator,JSR-303)簡單實(shí)現(xiàn)方式
這篇文章主要介紹了Spring 校驗(yàn)(validator,JSR-303)簡單實(shí)現(xiàn)方式,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-10-10

