Springboot結(jié)合rabbitmq實(shí)現(xiàn)的死信隊(duì)列
概述
RabbitMQ是流行的開源消息隊(duì)列系統(tǒng),使用erlang語言開發(fā)。為了保證訂單業(yè)務(wù)的消息數(shù)據(jù)不丟失,需要使用到RabbitMQ的死信隊(duì)列機(jī)制,當(dāng)消息消費(fèi)發(fā)生異常時(shí),將消息投入死信隊(duì)列中。但由于對(duì)死信隊(duì)列的概念及配置不熟悉,導(dǎo)致曾一度陷入百度的汪洋大海,無法自拔,很多文章都看起來可行,但是實(shí)際上卻并不能幫我解決實(shí)際問題。最終,在官網(wǎng)文檔中找到了我想要的答案,通過官網(wǎng)文檔的學(xué)習(xí),才發(fā)現(xiàn)對(duì)于死信隊(duì)列存在一些誤解,導(dǎo)致配置死信隊(duì)列之路困難重重。
詳細(xì)
一、運(yùn)行效果
二、實(shí)現(xiàn)過程
①、先創(chuàng)建一個(gè)Springboot項(xiàng)目。然后在pom文件中添加 spring-boot-starter-amqp
和 spring-boot-starter-web
的依賴,接下來創(chuàng)建一個(gè)Config類,這里是關(guān)鍵:
package com.zyf.rabbitmqdeadletterdemo.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class RabbitMQConfig { public static final String BUSINESS_EXCHANGE_NAME = "rabbitmq.dead.letter.business.exchange"; public static final String BUSINESS_QUEUEA_NAME = "rabbitmq.dead.letter.business.queueA"; public static final String BUSINESS_QUEUEB_NAME = "rabbitmq.dead.letter.business.queueB"; public static final String DEAD_LETTER_EXCHANGE = "rabbitmq.dead.letter.deadletter.exchange"; public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "rabbitmq.dead.letter.deadletter.queueA.routingkey"; public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "rabbitmq.dead.letter.deadletter.queueB.routingkey"; public static final String DEAD_LETTER_QUEUEA_NAME = "rabbitmq.dead.letter.deadletter.queueA"; public static final String DEAD_LETTER_QUEUEB_NAME = "rabbitmq.dead.letter.deadletter.queueB"; // 聲明業(yè)務(wù)Exchange @Bean("businessExchange") public FanoutExchange businessExchange(){ return new FanoutExchange(BUSINESS_EXCHANGE_NAME); } // 聲明死信Exchange @Bean("deadLetterExchange") public DirectExchange deadLetterExchange(){ return new DirectExchange(DEAD_LETTER_EXCHANGE); } // 聲明業(yè)務(wù)隊(duì)列A @Bean("businessQueueA") public Queue businessQueueA(){ Map<String, Object> args = new HashMap<>(2); // x-dead-letter-exchange 這里聲明當(dāng)前隊(duì)列綁定的死信交換機(jī) args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // x-dead-letter-routing-key 這里聲明當(dāng)前隊(duì)列的死信路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY); return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build(); } // 聲明業(yè)務(wù)隊(duì)列B @Bean("businessQueueB") public Queue businessQueueB(){ Map<String, Object> args = new HashMap<>(2); // x-dead-letter-exchange 這里聲明當(dāng)前隊(duì)列綁定的死信交換機(jī) args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // x-dead-letter-routing-key 這里聲明當(dāng)前隊(duì)列的死信路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY); return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(args).build(); } // 聲明死信隊(duì)列A @Bean("deadLetterQueueA") public Queue deadLetterQueueA(){ return new Queue(DEAD_LETTER_QUEUEA_NAME); } // 聲明死信隊(duì)列B @Bean("deadLetterQueueB") public Queue deadLetterQueueB(){ return new Queue(DEAD_LETTER_QUEUEB_NAME); } // 聲明業(yè)務(wù)隊(duì)列A綁定關(guān)系 @Bean public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue, @Qualifier("businessExchange") FanoutExchange exchange){ return BindingBuilder.bind(queue).to(exchange); } // 聲明業(yè)務(wù)隊(duì)列B綁定關(guān)系 @Bean public Binding businessBindingB(@Qualifier("businessQueueB") Queue queue, @Qualifier("businessExchange") FanoutExchange exchange){ return BindingBuilder.bind(queue).to(exchange); } // 聲明死信隊(duì)列A綁定關(guān)系 @Bean public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY); } // 聲明死信隊(duì)列B綁定關(guān)系 @Bean public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY); } }
②、接下來,是業(yè)務(wù)隊(duì)列的消費(fèi)代碼:
@Slf4j@Componentpublic class BusinessMessageReceiver { @RabbitListener(queues = BUSINESS_QUEUEA_NAME) public void receiveA(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("收到業(yè)務(wù)消息A:{}", msg); boolean ack = true; Exception exception = null; try { if (msg.contains("deadletter")){ throw new RuntimeException("dead letter exception"); } } catch (Exception e){ ack = false; exception = e; } if (!ack){ log.error("消息消費(fèi)發(fā)生異常,error msg:{}", exception.getMessage(), exception); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } else { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } @RabbitListener(queues = BUSINESS_QUEUEB_NAME) public void receiveB(Message message, Channel channel) throws IOException { System.out.println("收到業(yè)務(wù)消息B:" + new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
③、然后配置死信隊(duì)列的消費(fèi)者:
@Componentpublic class DeadLetterMessageReceiver { @RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME) public void receiveA(Message message, Channel channel) throws IOException { System.out.println("收到死信消息A:" + new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } @RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME) public void receiveB(Message message, Channel channel) throws IOException { System.out.println("收到死信消息B:" + new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
④、為了方便測(cè)試,寫一個(gè)簡(jiǎn)單的消息生產(chǎn)者,并通過controller層來生產(chǎn)消息。
@Componentpublic class BusinessMessageSender { @Autowired private RabbitTemplate rabbitTemplate; public void sendMsg(String msg){ rabbitTemplate.convertSendAndReceive(BUSINESS_EXCHANGE_NAME, "", msg); } }
@RequestMapping("rabbitmq")@RestControllerpublic class RabbitMQMsgController { @Autowired private BusinessMessageSender sender; @RequestMapping("sendmsg") public void sendMsg(String msg){ sender.sendMsg(msg); } }
三、項(xiàng)目結(jié)構(gòu)圖
四、補(bǔ)充總結(jié)
死信隊(duì)列其實(shí)并沒有什么神秘的地方,不過是綁定在死信交換機(jī)上的普通隊(duì)列,而死信交換機(jī)也只是一個(gè)普通的交換機(jī),不過是用來專門處理死信的交換機(jī)。
總結(jié)一下死信消息的生命周期:
- 業(yè)務(wù)消息被投入業(yè)務(wù)隊(duì)列
- 消費(fèi)者消費(fèi)業(yè)務(wù)隊(duì)列的消息,由于處理過程中發(fā)生異常,于是進(jìn)行了nck或者reject操作
- 被nck或reject的消息由RabbitMQ投遞到死信交換機(jī)中
- 死信交換機(jī)將消息投入相應(yīng)的死信隊(duì)列
- 死信隊(duì)列的消費(fèi)者消費(fèi)死信消息
死信消息是RabbitMQ為我們做的一層保證,其實(shí)我們也可以不使用死信隊(duì)列,而是在消息消費(fèi)異常時(shí),將消息主動(dòng)投遞到另一個(gè)交換機(jī)中,當(dāng)你明白了這些之后,這些Exchange和Queue想怎樣配合就能怎么配合。比如從死信隊(duì)列拉取消息,然后發(fā)送郵件、短信、釘釘通知來通知開發(fā)人員關(guān)注?;蛘邔⑾⒅匦峦哆f到一個(gè)隊(duì)列然后設(shè)置過期時(shí)間,來進(jìn)行延時(shí)消費(fèi)。
到此這篇關(guān)于Springboot結(jié)合rabbitmq實(shí)現(xiàn)的死信隊(duì)列的文章就介紹到這了,更多相關(guān)Springboot rabbitmq死信隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringBoot整合RabbitMQ處理死信隊(duì)列和延遲隊(duì)列
- SpringBoot+RabbitMQ?實(shí)現(xiàn)死信隊(duì)列的示例
- 如何利用rabbitMq的死信隊(duì)列實(shí)現(xiàn)延時(shí)消息
- 深入分析RabbitMQ中死信隊(duì)列與死信交換機(jī)
- 關(guān)于SpringBoot整合RabbitMQ實(shí)現(xiàn)死信隊(duì)列
- 關(guān)于Rabbitmq死信隊(duì)列及延時(shí)隊(duì)列的實(shí)現(xiàn)
- RabbitMQ之死信隊(duì)列深入解析
- springboot中RabbitMQ死信隊(duì)列的實(shí)現(xiàn)示例
- SpringBoot整合RabbitMQ實(shí)現(xiàn)延遲隊(duì)列和死信隊(duì)列
- springboot整合RabbitMQ中死信隊(duì)列的實(shí)現(xiàn)
相關(guān)文章
Mybatis 高級(jí)用法和tk.mybatis使用示例詳解
tkmybatis 是對(duì)底層 sql 進(jìn)行了抽象封裝,不需要考慮 sql 怎么寫,只需要按照邏輯思維,遵循 tkmybatis 的語法即可實(shí)現(xiàn)數(shù)據(jù)庫操作,這篇文章主要介紹了Mybatis 高級(jí)用法和tk.mybatis使用,需要的朋友可以參考下2024-05-05Java中如何快速構(gòu)建項(xiàng)目腳手架的實(shí)現(xiàn)
這篇文章主要介紹了Java中如何快速構(gòu)建項(xiàng)目腳手架,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-05-05java階乘計(jì)算獲得結(jié)果末尾0的個(gè)數(shù)代碼實(shí)現(xiàn)
今天偶然看到一個(gè)要求,求1000~10000之間的數(shù)n的階乘并計(jì)算所得的數(shù)n!末尾有多少個(gè)0?要求: 不計(jì)算 只要得到末尾有多少個(gè)0就可以了,看下面的代碼吧2013-12-12Java創(chuàng)建對(duì)象(顯式創(chuàng)建和隱含創(chuàng)建)
本文詳細(xì)介紹對(duì)象的創(chuàng)建,在 Java 語言中創(chuàng)建對(duì)象分顯式創(chuàng)建與隱含創(chuàng)建兩種情況,顯式創(chuàng)建和隱含創(chuàng)建,,需要的朋友可以參考下面文章的具體內(nèi)容2021-09-09