欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot集成RabbitMQ的方法(死信隊(duì)列)

 更新時(shí)間:2019年05月01日 11:21:27   作者:小揪揪  
這篇文章主要介紹了SpringBoot集成RabbitMQ的方法(死信隊(duì)列),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

介紹

死信隊(duì)列:沒有被及時(shí)消費(fèi)的消息存放的隊(duì)列,消息沒有被及時(shí)消費(fèi)有以下幾點(diǎn)原因:
1.有消息被拒絕(basic.reject/ basic.nack)并且requeue=false
2.隊(duì)列達(dá)到最大長(zhǎng)度
3.消息TTL過期

場(chǎng)景

1.小時(shí)進(jìn)入初始隊(duì)列,等待30分鐘后進(jìn)入5分鐘隊(duì)列
2.消息等待5分鐘后進(jìn)入執(zhí)行隊(duì)列
3.執(zhí)行失敗后重新回到5分鐘隊(duì)列
4.失敗5次后,消息進(jìn)入2小時(shí)隊(duì)列
5.消息等待2小時(shí)進(jìn)入執(zhí)行隊(duì)列
6.失敗5次后,將消息丟棄或做其他處理

使用

安裝MQ

使用docker方式安裝,選擇帶mangement的版本

docker pull rabbitmq:management
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management

訪問 localhost: 15672,默認(rèn)賬號(hào)密碼guest/guest

項(xiàng)目配置

(1)創(chuàng)建springboot項(xiàng)目
(2)在application.properties配置文件中配置mq連接信息

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

(3)隊(duì)列配置

package com.df.ps.mq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class MqConfig {

  //time
  @Value("${spring.df.buffered.min:120}")
  private int springdfBufferedTime;

  @Value("${spring.df.high-buffered.min:5}")
  private int springdfHighBufferedTime;

  @Value("${spring.df.low-buffered.min:120}")
  private int springdfLowBufferedTime;

  // 30min Buffered Queue
  @Value("${spring.df.queue:spring-df-buffered-queue}")
  private String springdfBufferedQueue;

  @Value("${spring.df.topic:spring-df-buffered-topic}")
  private String springdfBufferedTopic;

  @Value("${spring.df.route:spring-df-buffered-route}")
  private String springdfBufferedRouteKey;

  // 5M Buffered Queue
  @Value("${spring.df.high-buffered.queue:spring-df-high-buffered-queue}")
  private String springdfHighBufferedQueue;

  @Value("${spring.df.high-buffered.topic:spring-df-high-buffered-topic}")
  private String springdfHighBufferedTopic;

  @Value("${spring.df.high-buffered.route:spring-df-high-buffered-route}")
  private String springdfHighBufferedRouteKey;

  // High Queue
  @Value("${spring.df.high.queue:spring-df-high-queue}")
  private String springdfHighQueue;

  @Value("${spring.df.high.topic:spring-df-high-topic}")
  private String springdfHighTopic;

  @Value("${spring.df.high.route:spring-df-high-route}")
  private String springdfHighRouteKey;

  // 2H Low Buffered Queue
  @Value("${spring.df.low-buffered.queue:spring-df-low-buffered-queue}")
  private String springdfLowBufferedQueue;

  @Value("${spring.df.low-buffered.topic:spring-df-low-buffered-topic}")
  private String springdfLowBufferedTopic;

  @Value("${spring.df.low-buffered.route:spring-df-low-buffered-route}")
  private String springdfLowBufferedRouteKey;

  // Low Queue
  @Value("${spring.df.low.queue:spring-df-low-queue}")
  private String springdfLowQueue;

  @Value("${spring.df.low.topic:spring-df-low-topic}")
  private String springdfLowTopic;

  @Value("${spring.df.low.route:spring-df-low-route}")
  private String springdfLowRouteKey;


  @Bean(autowire = Autowire.BY_NAME, value = "springdfBufferedQueue")
  Queue springdfBufferedQueue() {
    int bufferedTime = 1000 * 60 * springdfBufferedTime;
    return createBufferedQueue(springdfBufferedQueue, springdfHighBufferedTopic, springdfHighBufferedRouteKey, bufferedTime);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfHighBufferedQueue")
  Queue springdfHighBufferedQueue() {
    int highBufferedTime = 1000 * 60 * springdfHighBufferedTime;
    return createBufferedQueue(springdfHighBufferedQueue, springdfHighTopic, springdfHighRouteKey, highBufferedTime);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfHighQueue")
  Queue springdfHighQueue() {
    return new Queue(springdfHighQueue, true);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfLowBufferedQueue")
  Queue springdfLowBufferedQueue() {
    int lowBufferedTime = 1000 * 60 * springdfLowBufferedTime;
    return createBufferedQueue(springdfLowBufferedQueue, springdfLowTopic, springdfLowRouteKey, lowBufferedTime);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfLowQueue")
  Queue springdfLowQueue() {
    return new Queue(springdfLowQueue, true);
  }


  @Bean(autowire = Autowire.BY_NAME, value = "springdfBufferedTopic")
  TopicExchange springdfBufferedTopic() {
    return new TopicExchange(springdfBufferedTopic);
  }

  @Bean
  Binding springBuffereddf(Queue springdfBufferedQueue, TopicExchange springdfBufferedTopic) {
    return BindingBuilder.bind(springdfBufferedQueue).to(springdfBufferedTopic).with(springdfBufferedRouteKey);
  }


  @Bean(autowire = Autowire.BY_NAME, value = "springdfHighBufferedTopic")
  TopicExchange springdfHighBufferedTopic() {
    return new TopicExchange(springdfHighBufferedTopic);
  }

  @Bean
  Binding springHighBuffereddf(Queue springdfHighBufferedQueue, TopicExchange springdfHighBufferedTopic) {
    return BindingBuilder.bind(springdfHighBufferedQueue).to(springdfHighBufferedTopic).with(springdfHighBufferedRouteKey);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfHighTopic")
  TopicExchange springdfHighTopic() {
    return new TopicExchange(springdfHighTopic);
  }

  @Bean
  Binding springHighdf(Queue springdfHighQueue, TopicExchange springdfHighTopic) {
    return BindingBuilder.bind(springdfHighQueue).to(springdfHighTopic).with(springdfHighRouteKey);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfLowBufferedTopic")
  TopicExchange springdfLowBufferedTopic() {
    return new TopicExchange(springdfLowBufferedTopic);
  }

  @Bean
  Binding springLowBuffereddf(Queue springdfLowBufferedQueue, TopicExchange springdfLowBufferedTopic) {
    return BindingBuilder.bind(springdfLowBufferedQueue).to(springdfLowBufferedTopic).with(springdfLowBufferedRouteKey);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfLowTopic")
  TopicExchange springdfLowTopic() {
    return new TopicExchange(springdfLowTopic);
  }

  @Bean
  Binding springLowdf(Queue springdfLowQueue, TopicExchange springdfLowTopic) {
    return BindingBuilder.bind(springdfLowQueue).to(springdfLowTopic).with(springdfLowRouteKey);
  }


  @Bean
  SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                       MessageListenerAdapter listenerAdapter) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(springdfHighQueue, springdfLowQueue);
    container.setMessageListener(listenerAdapter);
    return container;
  }

  @Bean
  MessageListenerAdapter listenerAdapter(IntegrationReceiver receiver) {


    MessageListenerAdapter adapter = new MessageListenerAdapter(receiver);
    adapter.setDefaultListenerMethod("receive");
    Map<String, String> queueOrTagToMethodName = new HashMap<>();
    queueOrTagToMethodName.put(springdfHighQueue, "springdfHighReceive");
    queueOrTagToMethodName.put(springdfLowQueue, "springdfLowReceive");
    adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
    return adapter;

  }


  private Queue createBufferedQueue(String queueName, String topic, String routeKey, int bufferedTime) {
    Map<String, Object> args = new HashMap<>();
    args.put("x-dead-letter-exchange", topic);
    args.put("x-dead-letter-routing-key", routeKey);
    args.put("x-message-ttl", bufferedTime);
    // 是否持久化
    boolean durable = true;
    // 僅創(chuàng)建者可以使用的私有隊(duì)列,斷開后自動(dòng)刪除
    boolean exclusive = false;
    // 當(dāng)所有消費(fèi)客戶端連接斷開后,是否自動(dòng)刪除隊(duì)列
    boolean autoDelete = false;

    return new Queue(queueName, durable, exclusive, autoDelete, args);
  }
}

消費(fèi)者配置

package com.df.ps.mq;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

import java.util.Map;

public class MqReceiver {

  private static Logger logger = LoggerFactory.getLogger(MqReceiver.class);

  @Value("${high-retry:5}")
  private int highRetry;

  @Value("${low-retry:5}")
  private int lowRetry;

  @Value("${spring.df.high-buffered.topic:spring-df-high-buffered-topic}")
  private String springdfHighBufferedTopic;

  @Value("${spring.df.high-buffered.route:spring-df-high-buffered-route}")
  private String springdfHighBufferedRouteKey;

  @Value("${spring.df.low-buffered.topic:spring-df-low-buffered-topic}")
  private String springdfLowBufferedTopic;

  @Value("${spring.df.low-buffered.route:spring-df-low-buffered-route}")
  private String springdfLowBufferedRouteKey;

  private final RabbitTemplate rabbitTemplate;
  @Autowired
  public MqReceiver(RabbitTemplate rabbitTemplate) {
    this.rabbitTemplate = rabbitTemplate;
  }

  public void receive(Object message) {
    if (logger.isInfoEnabled()) {
      logger.info("default receiver: " + message);
    }
  }

  /**
   * 消息從初始隊(duì)列進(jìn)入5分鐘的高速緩沖隊(duì)列
   * @param message
   */
  public void highReceiver(Object message){
    ObjectMapper mapper = new ObjectMapper();
    Map msg = mapper.convertValue(message, Map.class);

    try{
      logger.info("這里做消息處理...");
    }catch (Exception e){
      int times = msg.get("times") == null ? 0 : (int) msg.get("times");
      if (times < highRetry) {
        msg.put("times", times + 1);
        rabbitTemplate.convertAndSend(springdfHighBufferedTopic,springdfHighBufferedRouteKey,message);
      } else {
        msg.put("times", 0);
        rabbitTemplate.convertAndSend(springdfLowBufferedTopic,springdfLowBufferedRouteKey,message);
      }
    }
  }

  /**
   * 消息從5分鐘緩沖隊(duì)列進(jìn)入2小時(shí)緩沖隊(duì)列
   * @param message
   */
  public void lowReceiver(Object message){
    ObjectMapper mapper = new ObjectMapper();
    Map msg = mapper.convertValue(message, Map.class);
    
    try {
      logger.info("這里做消息處理...");
    }catch (Exception e){
      int times = msg.get("times") == null ? 0 : (int) msg.get("times");
      if (times < lowRetry) {
        rabbitTemplate.convertAndSend(springdfLowBufferedTopic,springdfLowBufferedRouteKey,message);
      }else{
        logger.info("消息無法被消費(fèi)...");
      }
    } 
  }
}

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • IDEA與JDK、Maven安裝配置完整步驟解析

    IDEA與JDK、Maven安裝配置完整步驟解析

    這篇文章主要介紹了如何安裝和配置IDE(IntelliJ?IDEA),包括IDE的安裝步驟、JDK的下載與配置、Maven的安裝與配置,以及如何在IDE中使用Maven進(jìn)行Java開發(fā),需要的朋友可以參考下
    2025-03-03
  • Java中的異步非阻塞AIO模型詳解

    Java中的異步非阻塞AIO模型詳解

    這篇文章主要介紹了Java中的異步非阻塞AIO模型詳解,AIO需要操作系統(tǒng)的支持,在linux內(nèi)核2.6版本中加入了對(duì)真正異步IO的支持,java從jdk1.7開始支持AIO,本文提供了部分實(shí)現(xiàn)代碼,需要的朋友可以參考下
    2023-09-09
  • springboot連接不上redis的三種解決辦法

    springboot連接不上redis的三種解決辦法

    這篇文章主要介紹了springboot連接不上redis的三種解決辦法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-04-04
  • Java中定時(shí)器Timer致命缺點(diǎn)案例詳解

    Java中定時(shí)器Timer致命缺點(diǎn)案例詳解

    這篇文章主要介紹了Java中定時(shí)器Timer致命缺點(diǎn),以Java中定時(shí)器Time為案例整理下我的學(xué)習(xí)方法,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下
    2022-02-02
  • Java遞歸方法求5!的實(shí)現(xiàn)代碼

    Java遞歸方法求5!的實(shí)現(xiàn)代碼

    這篇文章主要介紹了Java遞歸方法求5!的實(shí)現(xiàn)代碼,需要的朋友可以參考下
    2017-02-02
  • Java 超詳細(xì)講解IO操作字節(jié)流與字符流

    Java 超詳細(xì)講解IO操作字節(jié)流與字符流

    本章具體介紹了字節(jié)流、字符流的基本使用方法,圖解穿插代碼實(shí)現(xiàn)。 JAVA從基礎(chǔ)開始講,后續(xù)會(huì)講到JAVA高級(jí),中間會(huì)穿插面試題和項(xiàng)目實(shí)戰(zhàn),希望能給大家?guī)韼椭?/div> 2022-03-03
  • java8中Map的一些騷操作總結(jié)

    java8中Map的一些騷操作總結(jié)

    這篇文章主要給大家介紹了關(guān)于java8中Map的一些騷操作,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-02-02
  • Protostuff序列化和反序列化的使用說明

    Protostuff序列化和反序列化的使用說明

    今天小編就為大家分享一篇關(guān)于Protostuff序列化和反序列化的使用說明,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧
    2019-04-04
  • Java后端服務(wù)間歇性響應(yīng)慢的問題排查與解決

    Java后端服務(wù)間歇性響應(yīng)慢的問題排查與解決

    之前在公司內(nèi)其它團(tuán)隊(duì)找到幫忙排查的一個(gè)后端服務(wù)連接超時(shí)問題,問題的表現(xiàn)是服務(wù)部署到線上后出現(xiàn)間歇性請(qǐng)求響應(yīng)非常慢(大于10s),但是后端業(yè)務(wù)分析業(yè)務(wù)日志時(shí)卻沒有發(fā)現(xiàn)慢請(qǐng)求,所以本文給大家介紹了Java后端服務(wù)間歇性響應(yīng)慢的問題排查與解決,需要的朋友可以參考下
    2025-03-03
  • mybatis雙重foreach如何實(shí)現(xiàn)遍歷map中的兩個(gè)list數(shù)組

    mybatis雙重foreach如何實(shí)現(xiàn)遍歷map中的兩個(gè)list數(shù)組

    本文介紹了如何解析前端傳遞的JSON字符串并在Java后臺(tái)動(dòng)態(tài)構(gòu)建SQL查詢條件,首先,通過JSONArray.fromObject()將JSON字符串轉(zhuǎn)化為JSONArray對(duì)象,遍歷JSONArray,從中提取name和infos,構(gòu)建成Map對(duì)象用于Mybatis SQL映射
    2024-09-09

最新評(píng)論