Springboot結(jié)合rabbitmq實(shí)現(xiàn)的死信隊(duì)列
概述
RabbitMQ是流行的開源消息隊(duì)列系統(tǒng),使用erlang語言開發(fā)。為了保證訂單業(yè)務(wù)的消息數(shù)據(jù)不丟失,需要使用到RabbitMQ的死信隊(duì)列機(jī)制,當(dāng)消息消費(fèi)發(fā)生異常時(shí),將消息投入死信隊(duì)列中。但由于對(duì)死信隊(duì)列的概念及配置不熟悉,導(dǎo)致曾一度陷入百度的汪洋大海,無法自拔,很多文章都看起來可行,但是實(shí)際上卻并不能幫我解決實(shí)際問題。最終,在官網(wǎng)文檔中找到了我想要的答案,通過官網(wǎng)文檔的學(xué)習(xí),才發(fā)現(xiàn)對(duì)于死信隊(duì)列存在一些誤解,導(dǎo)致配置死信隊(duì)列之路困難重重。
詳細(xì)
一、運(yùn)行效果


二、實(shí)現(xiàn)過程
①、先創(chuàng)建一個(gè)Springboot項(xiàng)目。然后在pom文件中添加 spring-boot-starter-amqp 和 spring-boot-starter-web 的依賴,接下來創(chuàng)建一個(gè)Config類,這里是關(guān)鍵:
package com.zyf.rabbitmqdeadletterdemo.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfig {
public static final String BUSINESS_EXCHANGE_NAME = "rabbitmq.dead.letter.business.exchange";
public static final String BUSINESS_QUEUEA_NAME = "rabbitmq.dead.letter.business.queueA";
public static final String BUSINESS_QUEUEB_NAME = "rabbitmq.dead.letter.business.queueB";
public static final String DEAD_LETTER_EXCHANGE = "rabbitmq.dead.letter.deadletter.exchange";
public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "rabbitmq.dead.letter.deadletter.queueA.routingkey";
public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "rabbitmq.dead.letter.deadletter.queueB.routingkey";
public static final String DEAD_LETTER_QUEUEA_NAME = "rabbitmq.dead.letter.deadletter.queueA";
public static final String DEAD_LETTER_QUEUEB_NAME = "rabbitmq.dead.letter.deadletter.queueB";
// 聲明業(yè)務(wù)Exchange
@Bean("businessExchange")
public FanoutExchange businessExchange(){
return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
}
// 聲明死信Exchange
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange(){
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
// 聲明業(yè)務(wù)隊(duì)列A
@Bean("businessQueueA")
public Queue businessQueueA(){
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 這里聲明當(dāng)前隊(duì)列綁定的死信交換機(jī)
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 這里聲明當(dāng)前隊(duì)列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build();
}
// 聲明業(yè)務(wù)隊(duì)列B
@Bean("businessQueueB")
public Queue businessQueueB(){
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 這里聲明當(dāng)前隊(duì)列綁定的死信交換機(jī)
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 這里聲明當(dāng)前隊(duì)列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);
return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(args).build();
}
// 聲明死信隊(duì)列A
@Bean("deadLetterQueueA")
public Queue deadLetterQueueA(){
return new Queue(DEAD_LETTER_QUEUEA_NAME);
}
// 聲明死信隊(duì)列B
@Bean("deadLetterQueueB")
public Queue deadLetterQueueB(){
return new Queue(DEAD_LETTER_QUEUEB_NAME);
}
// 聲明業(yè)務(wù)隊(duì)列A綁定關(guān)系
@Bean
public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue,
@Qualifier("businessExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
// 聲明業(yè)務(wù)隊(duì)列B綁定關(guān)系
@Bean
public Binding businessBindingB(@Qualifier("businessQueueB") Queue queue,
@Qualifier("businessExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
// 聲明死信隊(duì)列A綁定關(guān)系
@Bean
public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
}
// 聲明死信隊(duì)列B綁定關(guān)系
@Bean
public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
}
}②、接下來,是業(yè)務(wù)隊(duì)列的消費(fèi)代碼:
@Slf4j@Componentpublic class BusinessMessageReceiver { @RabbitListener(queues = BUSINESS_QUEUEA_NAME)
public void receiveA(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("收到業(yè)務(wù)消息A:{}", msg); boolean ack = true;
Exception exception = null; try { if (msg.contains("deadletter")){ throw new RuntimeException("dead letter exception");
}
} catch (Exception e){
ack = false;
exception = e;
} if (!ack){
log.error("消息消費(fèi)發(fā)生異常,error msg:{}", exception.getMessage(), exception);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} else {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
} @RabbitListener(queues = BUSINESS_QUEUEB_NAME)
public void receiveB(Message message, Channel channel) throws IOException {
System.out.println("收到業(yè)務(wù)消息B:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}③、然后配置死信隊(duì)列的消費(fèi)者:
@Componentpublic class DeadLetterMessageReceiver { @RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)
public void receiveA(Message message, Channel channel) throws IOException {
System.out.println("收到死信消息A:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} @RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)
public void receiveB(Message message, Channel channel) throws IOException {
System.out.println("收到死信消息B:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}④、為了方便測(cè)試,寫一個(gè)簡(jiǎn)單的消息生產(chǎn)者,并通過controller層來生產(chǎn)消息。
@Componentpublic class BusinessMessageSender { @Autowired
private RabbitTemplate rabbitTemplate; public void sendMsg(String msg){
rabbitTemplate.convertSendAndReceive(BUSINESS_EXCHANGE_NAME, "", msg);
}
}@RequestMapping("rabbitmq")@RestControllerpublic class RabbitMQMsgController { @Autowired
private BusinessMessageSender sender; @RequestMapping("sendmsg")
public void sendMsg(String msg){
sender.sendMsg(msg);
}
}三、項(xiàng)目結(jié)構(gòu)圖

四、補(bǔ)充總結(jié)
死信隊(duì)列其實(shí)并沒有什么神秘的地方,不過是綁定在死信交換機(jī)上的普通隊(duì)列,而死信交換機(jī)也只是一個(gè)普通的交換機(jī),不過是用來專門處理死信的交換機(jī)。
總結(jié)一下死信消息的生命周期:
- 業(yè)務(wù)消息被投入業(yè)務(wù)隊(duì)列
- 消費(fèi)者消費(fèi)業(yè)務(wù)隊(duì)列的消息,由于處理過程中發(fā)生異常,于是進(jìn)行了nck或者reject操作
- 被nck或reject的消息由RabbitMQ投遞到死信交換機(jī)中
- 死信交換機(jī)將消息投入相應(yīng)的死信隊(duì)列
- 死信隊(duì)列的消費(fèi)者消費(fèi)死信消息
死信消息是RabbitMQ為我們做的一層保證,其實(shí)我們也可以不使用死信隊(duì)列,而是在消息消費(fèi)異常時(shí),將消息主動(dòng)投遞到另一個(gè)交換機(jī)中,當(dāng)你明白了這些之后,這些Exchange和Queue想怎樣配合就能怎么配合。比如從死信隊(duì)列拉取消息,然后發(fā)送郵件、短信、釘釘通知來通知開發(fā)人員關(guān)注?;蛘邔⑾⒅匦峦哆f到一個(gè)隊(duì)列然后設(shè)置過期時(shí)間,來進(jìn)行延時(shí)消費(fèi)。
到此這篇關(guān)于Springboot結(jié)合rabbitmq實(shí)現(xiàn)的死信隊(duì)列的文章就介紹到這了,更多相關(guān)Springboot rabbitmq死信隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringBoot整合RabbitMQ處理死信隊(duì)列和延遲隊(duì)列
- SpringBoot+RabbitMQ?實(shí)現(xiàn)死信隊(duì)列的示例
- 如何利用rabbitMq的死信隊(duì)列實(shí)現(xiàn)延時(shí)消息
- 深入分析RabbitMQ中死信隊(duì)列與死信交換機(jī)
- 關(guān)于SpringBoot整合RabbitMQ實(shí)現(xiàn)死信隊(duì)列
- 關(guān)于Rabbitmq死信隊(duì)列及延時(shí)隊(duì)列的實(shí)現(xiàn)
- RabbitMQ之死信隊(duì)列深入解析
- springboot中RabbitMQ死信隊(duì)列的實(shí)現(xiàn)示例
- SpringBoot整合RabbitMQ實(shí)現(xiàn)延遲隊(duì)列和死信隊(duì)列
- springboot整合RabbitMQ中死信隊(duì)列的實(shí)現(xiàn)
相關(guān)文章
Mybatis 高級(jí)用法和tk.mybatis使用示例詳解
tkmybatis 是對(duì)底層 sql 進(jìn)行了抽象封裝,不需要考慮 sql 怎么寫,只需要按照邏輯思維,遵循 tkmybatis 的語法即可實(shí)現(xiàn)數(shù)據(jù)庫操作,這篇文章主要介紹了Mybatis 高級(jí)用法和tk.mybatis使用,需要的朋友可以參考下2024-05-05
Java中如何快速構(gòu)建項(xiàng)目腳手架的實(shí)現(xiàn)
這篇文章主要介紹了Java中如何快速構(gòu)建項(xiàng)目腳手架,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-05-05
java階乘計(jì)算獲得結(jié)果末尾0的個(gè)數(shù)代碼實(shí)現(xiàn)
今天偶然看到一個(gè)要求,求1000~10000之間的數(shù)n的階乘并計(jì)算所得的數(shù)n!末尾有多少個(gè)0?要求: 不計(jì)算 只要得到末尾有多少個(gè)0就可以了,看下面的代碼吧2013-12-12
Java創(chuàng)建對(duì)象(顯式創(chuàng)建和隱含創(chuàng)建)
本文詳細(xì)介紹對(duì)象的創(chuàng)建,在 Java 語言中創(chuàng)建對(duì)象分顯式創(chuàng)建與隱含創(chuàng)建兩種情況,顯式創(chuàng)建和隱含創(chuàng)建,,需要的朋友可以參考下面文章的具體內(nèi)容2021-09-09

