SpringBoot整合RabbitMQ處理死信隊(duì)列和延遲隊(duì)列
簡(jiǎn)介
說明
本文用示例介紹SpringBoot整合RabbitMQ時(shí)如何處理死信隊(duì)列/延遲隊(duì)列。
RabbitMQ消息簡(jiǎn)介
RabbitMQ的消息默認(rèn)不會(huì)超時(shí)。
什么是死信隊(duì)列?什么是延遲隊(duì)列?
死信隊(duì)列:
DLX,全稱為Dead-Letter-Exchange,可以稱之為死信交換器,也有人稱之為死信郵箱。當(dāng)消息在一個(gè)隊(duì)列中變成死信(dead message)之后,它能被重新被發(fā)送到另一個(gè)交換器中,這個(gè)交換器就是DLX,綁定DLX的隊(duì)列就稱之為死信隊(duì)列。
以下幾種情況會(huì)導(dǎo)致消息變成死信:
- 消息被拒絕(Basic.Reject/Basic.Nack),并且設(shè)置requeue參數(shù)為false;
- 消息過期;
- 隊(duì)列達(dá)到最大長(zhǎng)度。
延遲隊(duì)列:
延遲隊(duì)列用來存放延遲消息。延遲消息:指當(dāng)消息被發(fā)送以后,不想讓消費(fèi)者立刻拿到消息,而是等待特定時(shí)間后,消費(fèi)者才能拿到這個(gè)消息進(jìn)行消費(fèi)。
相關(guān)網(wǎng)址
詳解RabbitMQ中死信隊(duì)列和延遲隊(duì)列的使用詳解
實(shí)例代碼
路由配置
package com.example.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitRouterConfig {
public static final String EXCHANGE_TOPIC_WELCOME = "Exchange@topic.welcome";
public static final String EXCHANGE_FANOUT_UNROUTE = "Exchange@fanout.unroute";
public static final String EXCHANGE_TOPIC_DELAY = "Exchange@topic.delay";
public static final String ROUTINGKEY_HELLOS = "hello.#";
public static final String ROUTINGKEY_DELAY = "delay.#";
public static final String QUEUE_HELLO = "Queue@hello";
public static final String QUEUE_HI = "Queue@hi";
public static final String QUEUE_UNROUTE = "Queue@unroute";
public static final String QUEUE_DELAY = "Queue@delay";
public static final Integer TTL_QUEUE_MESSAGE = 5000;
@Autowired
AmqpAdmin amqpAdmin;
@Bean
Object initBindingTest() {
amqpAdmin.declareExchange(ExchangeBuilder.fanoutExchange(EXCHANGE_FANOUT_UNROUTE).durable(true).autoDelete().build());
amqpAdmin.declareExchange(ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_DELAY).durable(true).autoDelete().build());
amqpAdmin.declareExchange(ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_WELCOME)
.durable(true)
.autoDelete()
.withArgument("alternate-exchange", EXCHANGE_FANOUT_UNROUTE)
.build());
amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_HI).build());
amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_HELLO)
.withArgument("x-dead-letter-exchange", EXCHANGE_TOPIC_DELAY)
.withArgument("x-dead-letter-routing-key", ROUTINGKEY_DELAY)
.withArgument("x-message-ttl", TTL_QUEUE_MESSAGE)
.build());
amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_UNROUTE).build());
amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_DELAY).build());
amqpAdmin.declareBinding(new Binding(QUEUE_HELLO, Binding.DestinationType.QUEUE,
EXCHANGE_TOPIC_WELCOME, ROUTINGKEY_HELLOS, null));
amqpAdmin.declareBinding(new Binding(QUEUE_UNROUTE, Binding.DestinationType.QUEUE,
EXCHANGE_FANOUT_UNROUTE, "", null));
amqpAdmin.declareBinding(new Binding(QUEUE_DELAY, Binding.DestinationType.QUEUE,
EXCHANGE_TOPIC_DELAY, ROUTINGKEY_DELAY, null));
return new Object();
}
}控制器
package com.example.controller;
import com.example.config.RabbitRouterConfig;
import com.example.mq.Sender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
@RestController
public class HelloController {
@Autowired
private Sender sender;
@PostMapping("/hi")
public void hi() {
sender.send(RabbitRouterConfig.QUEUE_HI, "hi1 message:" + LocalDateTime.now());
}
@PostMapping("/hello1")
public void hello1() {
sender.send("hello.a", "hello1 message:" + LocalDateTime.now());
}
@PostMapping("/hello2")
public void hello2() {
sender.send(RabbitRouterConfig.EXCHANGE_TOPIC_WELCOME, "hello.b", "hello2 message:" + LocalDateTime.now());
}
@PostMapping("/ae")
public void aeTest() {
sender.send(RabbitRouterConfig.EXCHANGE_TOPIC_WELCOME, "nonono", "ae message:" + LocalDateTime.now());
}
}發(fā)送器
package com.example.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class Sender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String routingKey, String message) {
this.rabbitTemplate.convertAndSend(routingKey, message);
}
public void send(String exchange, String routingKey, String message) {
this.rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
}接收器
package com.example.mq;
import com.example.config.RabbitRouterConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Receiver {
@RabbitListener(queues = RabbitRouterConfig.QUEUE_HI)
public void hi(String payload) {
System.out.println ("Receiver(hi) : " + payload);
}
// @RabbitListener(queues = RabbitRouterConfig.QUEUE_HELLO)
// public void hello(String hello) throws InterruptedException {
// System.out.println ("Receiver(hello) : " + hello);
// Thread.sleep(5 * 1000);
// System.out.println("(hello):sleep over");
// }
//
// @RabbitListener(queues = RabbitRouterConfig.QUEUE_UNROUTE)
// public void unroute(String hello) throws InterruptedException {
// System.out.println ("Receiver(unroute) : " + hello);
// Thread.sleep(5 * 1000);
// System.out.println("(unroute):sleep over");
// }
@RabbitListener(queues = RabbitRouterConfig.QUEUE_DELAY)
public void delay(String hello) throws InterruptedException {
System.out.println ("Receiver(delay) : " + hello);
Thread.sleep(5 * 1000);
System.out.println("(delay):sleep over");
}
}application.yml
server:
# port: 9100
port: 9101
spring:
application:
# name: demo-rabbitmq-sender
name: demo-rabbitmq-receiver
rabbitmq:
host: localhost
port: 5672
username: admin
password: 123456
# virtualHost: /
publisher-confirms: true
publisher-returns: true
# listener:
# simple:
# acknowledge-mode: manual
# direct:
# acknowledge-mode: manual實(shí)例測(cè)試
分別啟動(dòng)發(fā)送者和接收者。
訪問:http://localhost:9100/hello2
五秒鐘后輸出:
Receiver(delay) : hello2 message:2020-11-27T09:30:51.548
(delay):sleep over
以上就是SpringBoot整合RabbitMQ處理死信隊(duì)列和延遲隊(duì)列的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot RabbitMQ死信隊(duì)列 延遲隊(duì)列的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- SpringBoot+RabbitMQ?實(shí)現(xiàn)死信隊(duì)列的示例
- 如何利用rabbitMq的死信隊(duì)列實(shí)現(xiàn)延時(shí)消息
- 深入分析RabbitMQ中死信隊(duì)列與死信交換機(jī)
- 關(guān)于SpringBoot整合RabbitMQ實(shí)現(xiàn)死信隊(duì)列
- 關(guān)于Rabbitmq死信隊(duì)列及延時(shí)隊(duì)列的實(shí)現(xiàn)
- Springboot結(jié)合rabbitmq實(shí)現(xiàn)的死信隊(duì)列
- RabbitMQ之死信隊(duì)列深入解析
- springboot中RabbitMQ死信隊(duì)列的實(shí)現(xiàn)示例
- SpringBoot整合RabbitMQ實(shí)現(xiàn)延遲隊(duì)列和死信隊(duì)列
- springboot整合RabbitMQ中死信隊(duì)列的實(shí)現(xiàn)
相關(guān)文章
springboot yml配置文件定義list集合、數(shù)組和map以及使用中的錯(cuò)誤
這篇文章主要介紹了springboot yml配置文件定義list集合、數(shù)組和map以及使用中遇到的錯(cuò)誤問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-07-07
mybatis 根據(jù)id批量刪除的實(shí)現(xiàn)操作
這篇文章主要介紹了mybatis 根據(jù)id批量刪除的實(shí)現(xiàn)操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-08-08
java仿Servlet生成驗(yàn)證碼實(shí)例詳解
這篇文章主要介紹了java仿Servlet生成驗(yàn)證碼實(shí)例詳解的相關(guān)資料,需要的朋友可以參考下2017-04-04
繼承WebMvcConfigurationSupport后自動(dòng)配置不生效及如何配置攔截器
這篇文章主要介紹了繼承WebMvcConfigurationSupport后自動(dòng)配置不生效及如何配置攔截器,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-11-11
零基礎(chǔ)學(xué)Java:Java開發(fā)工具 Eclipse 安裝過程創(chuàng)建第一個(gè)Java項(xiàng)目及Eclipse的一些基礎(chǔ)使用技巧
這篇文章主要介紹了零基礎(chǔ)學(xué)Java:Java開發(fā)工具 Eclipse 安裝過程創(chuàng)建第一個(gè)Java項(xiàng)目及Eclipse的一些基礎(chǔ)使用技巧,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-09-09
spring boot security自定義認(rèn)證的代碼示例
這篇文章主要介紹了spring boot security自定義認(rèn)證,本文通過示例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-07-07
idea右鍵沒有java class選項(xiàng)問題解決方案
這篇文章主要介紹了idea右鍵沒有java class選項(xiàng)問題解決方案,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-04-04

