springboot中rabbitmq實現(xiàn)消息可靠性機(jī)制詳解
1. 生產(chǎn)者模塊通過publisher confirm機(jī)制實現(xiàn)消息可靠性
1.1 生產(chǎn)者模塊導(dǎo)入rabbitmq相關(guān)依賴
<!--AMQP依賴,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--用于mq消息的序列化與反序列化-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
1.2 配置文件中進(jìn)行mq的相關(guān)配置
spring.rabbitmq.host=10.128.240.183
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
- publish-confirm-type:開啟publisher-confirm,有以下可選值
simple:同步等待confirm結(jié)果,直到超時
correlated:異步回調(diào),定義ConfirmCallback。mq返回結(jié)果時會回調(diào)這個ConfirmCallback
- publish-returns:開啟publish-return功能??梢远xReturnCallback
- template.mandatory: 定義消息路由失敗的策略
true:調(diào)用ReturnCallback
false:直接丟棄消息
1.3 定義ReturnCallback(消息投遞到隊列失敗觸發(fā)此回調(diào))
- 每個RabbitTemplate只能配置一個ReturnCallback。
- 當(dāng)消息投遞失敗,就會調(diào)用生產(chǎn)者的returnCallback中定義的處理邏輯
- 可以在容器啟動時就配置這個回調(diào)
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 獲取RabbitTemplate對象
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 配置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 判斷是否是延遲消息
Integer receivedDelay = message.getMessageProperties().getReceivedDelay();
if (receivedDelay != null && receivedDelay > 0) {
// 是一個延遲消息,忽略這個錯誤提示
return;
}
// 記錄日志
log.error("消息發(fā)送到隊列失敗,響應(yīng)碼:{}, 失敗原因:{}, 交換機(jī): {}, 路由key:{}, 消息: {}",
replyCode, replyText, exchange, routingKey, message.toString());
// 如果有需要的話,重發(fā)消息
});
}
}
1.4 定義ConfirmCallback(消息到達(dá)交換機(jī)觸發(fā)此回調(diào))
可以為redisTemplate指定一個統(tǒng)一的確認(rèn)回調(diào)
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 獲取RabbitTemplate對象
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 配置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 判斷是否是延遲消息
Integer receivedDelay = message.getMessageProperties().getReceivedDelay();
if (receivedDelay != null && receivedDelay > 0) {
// 是一個延遲消息,忽略這個錯誤提示
return;
}
// 記錄日志
log.error("消息發(fā)送到隊列失敗,響應(yīng)碼:{}, 失敗原因:{}, 交換機(jī): {}, 路由key:{}, 消息: {}",
replyCode, replyText, exchange, routingKey, message.toString());
// 如果有需要的話,重發(fā)消息
});
// 設(shè)置統(tǒng)一的confirm回調(diào)。只要消息到達(dá)broker就ack=true
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("這是統(tǒng)一的回調(diào)");
System.out.println("correlationData:" + correlationData);
System.out.println("ack:" + b);
System.out.println("cause:" + s);
}
});
}
}
也可以為特定的消息定制回調(diào)
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testmq() throws InterruptedException {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
correlationData.getFuture().addCallback(result->{
if (result.isAck()) {
// ACK
log.debug("消息成功投遞到交換機(jī)!消息ID: {}", correlationData.getId());
} else {
// NACK
log.error("消息投遞到交換機(jī)失敗!消息ID:{}", correlationData.getId());
// 重發(fā)消息
}
},ex->{
// 記錄日志
log.error("消息發(fā)送失??!", ex);
// 重發(fā)消息
});
rabbitTemplate.convertAndSend("example.direct","blue","hello,world",correlationData);
}
2. 消費者模塊開啟消息確認(rèn)
2.1 添加配置
# 手動ack消息,不使用默認(rèn)的消費端確認(rèn) spring.rabbitmq.listener.simple.acknowledge-mode=manual
- none:關(guān)閉ack,消息投遞時不可靠的,可能丟失
- auto:類似事務(wù)機(jī)制,出現(xiàn)異常時返回nack,消息回滾到mq,沒有異常,返回
- ackmanual:我們自己指定什么時候返回ack
2.2 manual模式在監(jiān)聽器中自定義返回ack
@RabbitListener(queues = "order.release.order.queue")
@Service
public class OrderCloseListener {
@Autowired
private OrderService orderService;
@RabbitHandler
private void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {
System.out.println("收到過期的訂單信息,準(zhǔn)備關(guān)閉訂單" + orderEntity.getOrderSn());
try {
orderService.closeOrder(orderEntity);
// 第二個參數(shù)為false則表示僅確認(rèn)此條消息。如果為true則表示對收到的多條消息同時確認(rèn)
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 第二個參數(shù)為ture表示將這個消息重新加入隊列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}
3. 消費者模塊開啟消息失敗重試機(jī)制
3.1 配置文件添加配置,開啟本地重試
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 開啟消費者失敗重試
initial-interval: 1000 # 初識的失敗等待時長為1秒
multiplier: 1 # 失敗的等待時長倍數(shù),下次等待時長 = multiplier * last-interval
max-attempts: 3 # 最大重試次數(shù)
stateless: true # true無狀態(tài);false有狀態(tài)。如果業(yè)務(wù)中包含事務(wù),這里改為false
- 開啟本地重試,如果消息處理過程總拋出異常,不會requeue到隊列,而是在消費者本地重試
- 重試達(dá)到最大次數(shù)后,spring會返回ack,消息會被丟棄
4. 消費者模塊添加失敗策略(用于開啟失敗本地重試功能后)
- 當(dāng)開啟本地重試后,重試最大次數(shù)后消息直接丟棄。
- 三種策略,都繼承于MessageRecovery接口
- RejectAndDontRequeueRecoverer:重試耗盡后,直接reject,丟棄消息。默認(rèn)就是這種方式
- ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊
- RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機(jī)
4.2 定義處理失敗消息的交換機(jī)和隊列 沒有會自動創(chuàng)建相應(yīng)的隊列、交換機(jī)與綁定關(guān)系,有了就啥也不做
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
// 路由鍵為key
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
4.3 向容器中添加一個失敗策略組件
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
// error為路由鍵
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
到此這篇關(guān)于springboot中rabbitmq實現(xiàn)消息可靠性的文章就介紹到這了,更多相關(guān)springboot rabbitmq消息可靠性內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
基于Java和GeoTools的Shapefile矢量數(shù)據(jù)縮略圖生成實踐
這篇文章主要介紹了基于Java和GeoTools的Shapefile矢量數(shù)據(jù)縮略圖生成實踐,需要的朋友可以參考下2024-08-08
Java?@Scheduled定時任務(wù)不執(zhí)行解決辦法
這篇文章主要給大家介紹了關(guān)于Java?@Scheduled定時任務(wù)不執(zhí)行解決的相關(guān)資料,當(dāng)@Scheduled定時任務(wù)不執(zhí)行時可以根據(jù)以下步驟進(jìn)行排查和解決,需要的朋友可以參考下2023-10-10
spring boot配置ssl(多cer格式)超詳細(xì)教程
這篇文章主要介紹了spring boot配置ssl(多cer格式)超詳細(xì)教程,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧2023-11-11
Java Web項目部署在Tomcat運行出錯與解決方法示例
這篇文章主要介紹了Java Web項目部署在Tomcat運行出錯與解決方法,結(jié)合具體實例形式分析了Java Web項目部署在Tomcat過程中由于xml配置文件導(dǎo)致的錯誤問題常見提示與解決方法,需要的朋友可以參考下2017-03-03

