欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot整合RabbitMQ實現(xiàn)延遲隊列的示例詳解

 更新時間:2023年04月19日 08:31:34   作者:越走越遠的風  
這篇文章主要為大家詳細介紹了SpringBoot如何整合RabbitMQ實現(xiàn)延遲隊列,文中的示例代碼講解詳細,具有一定的學習價值,感興趣的可以了解一下

如何保證消息不丟失

rabbitmq消息投遞路徑

生產(chǎn)者->交換機->隊列->消費者

總的來說分為三個階段。

  • 1.生產(chǎn)者保證消息投遞可靠性。
  • 2.mq內(nèi)部消息不丟失。
  • 3.消費者消費成功。

什么是消息投遞可靠性

簡單點說就是消息百分百發(fā)送到消息隊列中。

我們可以開啟confirmCallback

生產(chǎn)者投遞消息后,mq會給生產(chǎn)者一個ack.根據(jù)ack,生產(chǎn)者就可以確認這條消息是否發(fā)送到mq.

開啟confirmCallback

修改配置文件

#NONE:禁用發(fā)布確認模式,是默認值,CORRELATED:發(fā)布消息成功到交換器后會觸發(fā)回調(diào)方法
spring:
  rabbitmq:
    publisher-confirm-type: correlated

測試代碼

@Test  
public void testConfirmCallback() throws InterruptedException {  
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {  
    /**  
    *  
    * @param correlationData 配置  
    * @param ack 交換機是否收到消息,true是成功,false是失敗  
    * @param cause 失敗的原因  
    */  
    @Override  
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {  
        System.out.println("confirm=====>");  
        System.out.println("confirm==== ack="+ack);  
        System.out.println("confirm==== cause="+cause);  
        //根據(jù)ACK狀態(tài)做對應的消息更新操作 TODO  
    }  
    });  
    rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"ikun.mei", "雞你太美");  
    Thread.sleep(10000);  
}

通過returnCallback保證消息從交換器發(fā)送到隊列成功。 修改配置文件

spring:
  rabbitmq:
    #開啟returnCallback
    publisher-returns: true
    #交換機處理消息到路由失敗,則會返回給生產(chǎn)者
    template:
      mandatory: true

測試代碼

@Test  
void testReturnCallback() {  
    //為true,則交換機處理消息到路由失敗,則會返回給生產(chǎn)者 配置文件指定,則這里不需指定  
    rabbitTemplate.setMandatory(true);  
    //開啟強制消息投遞(mandatory為設置為true),但消息未被路由至任何一個queue,則回退一條消息  
    rabbitTemplate.setReturnsCallback(returned -> {  
        int code = returned.getReplyCode();  
        System.out.println("code="+code);  
        System.out.println("returned="+ returned);  
    });  
    rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"123456","測試returnCallback");  
}

消費者消費消息時需要通過ack手動確認消息已消費。

修改配置文件

spring:
  rabbitmq:
    listener:  
      simple:  
        acknowledge-mode: manual

編寫測試代碼

@RabbitHandler  
public void consumer(String body, Message message, Channel channel) throws IOException {  
    long msgTag = message.getMessageProperties().getDeliveryTag();  
    System.out.println("msgTag="+msgTag);  
    System.out.println("message="+ message);  
    System.out.println("body="+body);  

    //成功確認,使用此回執(zhí)方法后,消息會被 rabbitmq broker 刪除  
    channel.basicAck(msgTag,false);  
    // channel.basicNack(msgTag,false,true);  
  
}

deliveryTags是消息投遞序號,每次消費消息或者消息重新投遞后,deliveryTag都會增加

ttl死信隊列

什么是死信隊列

沒有被及時消費的消息存放的隊列

消息有哪幾種情況成為死信

  • 消費者拒收消息 (basic.reject/ basic.nack) ,并且沒有重新入隊 requeue=false
  • 消息在隊列中未被消費,且超過隊列或者消息本身的過期時間TTL(time-to-live)
  • 隊列的消息長度達到極限
  • 結(jié)果:消息成為死信后,如果該隊列綁定了死信交換機,則消息會被死信交換機重新路由到死信隊列

死信隊列經(jīng)常用來做延遲隊列消費。

延遲隊列

生產(chǎn)者投遞到mq中并不希望這條消息立馬被消費,而是等待一段時間后再去消費。

springboot整合rabbitmq實現(xiàn)訂單超時自動關(guān)閉

package com.fandf.test.rabbit;  
  
import org.springframework.amqp.core.*;  
import org.springframework.beans.factory.annotation.Qualifier;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
  
import java.util.HashMap;  
import java.util.Map;  
  
/**  
* @author fandongfeng  
* @date 2023/4/15 15:38  
*/  
@Configuration  
public class RabbitMQConfig {  
  
    /**  
    * 訂單交換機  
    */  
    public static final String ORDER_EXCHANGE = "order_exchange";  
    /**  
    * 訂單隊列  
    */  
    public static final String ORDER_QUEUE = "order_queue";  
    /**  
    * 訂單路由key  
    */  
    public static final String ORDER_QUEUE_ROUTING_KEY = "order.#";  

    /**  
    * 死信交換機  
    */  
    public static final String ORDER_DEAD_LETTER_EXCHANGE = "order_dead_letter_exchange";  
    /**  
    * 死信隊列 routingKey  
    */  
    public static final String ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY = "order_dead_letter_queue_routing_key";  

    /**  
    * 死信隊列  
    */  
    public static final String ORDER_DEAD_LETTER_QUEUE = "order_dead_letter_queue";  


    /**  
    * 創(chuàng)建死信交換機  
    */  
    @Bean("orderDeadLetterExchange")  
    public Exchange orderDeadLetterExchange() {  
        return new TopicExchange(ORDER_DEAD_LETTER_EXCHANGE, true, false);  
    }  

    /**  
    * 創(chuàng)建死信隊列  
    */  
    @Bean("orderDeadLetterQueue")  
    public Queue orderDeadLetterQueue() {  
        return QueueBuilder.durable(ORDER_DEAD_LETTER_QUEUE).build();  
    }  

    /**  
    * 綁定死信交換機和死信隊列  
    */  
    @Bean("orderDeadLetterBinding")  
    public Binding orderDeadLetterBinding(@Qualifier("orderDeadLetterQueue") Queue queue, @Qualifier("orderDeadLetterExchange")Exchange exchange) {  
        return BindingBuilder.bind(queue).to(exchange).with(ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY).noargs();  
    }  


    /**  
    * 創(chuàng)建訂單交換機  
    */  
    @Bean("orderExchange")  
    public Exchange orderExchange() {  
        return new TopicExchange(ORDER_EXCHANGE, true, false);  
    }  

    /**  
    * 創(chuàng)建訂單隊列  
    */  
    @Bean("orderQueue")  
    public Queue orderQueue() {  
        Map<String, Object> args = new HashMap<>(3);  
        //消息過期后,進入到死信交換機  
        args.put("x-dead-letter-exchange", ORDER_DEAD_LETTER_EXCHANGE);  

        //消息過期后,進入到死信交換機的路由key  
        args.put("x-dead-letter-routing-key", ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY);  

        //過期時間,單位毫秒  
        args.put("x-message-ttl", 10000);  

        return QueueBuilder.durable(ORDER_QUEUE).withArguments(args).build();  
    }  

    /**  
    * 綁定訂單交換機和隊列  
    */  
    @Bean("orderBinding")  
    public Binding orderBinding(@Qualifier("orderQueue") Queue queue, @Qualifier("orderExchange")Exchange exchange) {  
        return BindingBuilder.bind(queue).to(exchange).with(ORDER_QUEUE_ROUTING_KEY).noargs();  
    }  
  
  
}

消費者

package com.fandf.test.rabbit;  
  
import cn.hutool.core.date.DateUtil;  
import com.rabbitmq.client.Channel;  
import org.springframework.amqp.core.Message;  
import org.springframework.amqp.rabbit.annotation.RabbitHandler;  
import org.springframework.amqp.rabbit.annotation.RabbitListener;  
import org.springframework.stereotype.Component;  
  
import java.io.IOException;  
  
/**  
* @author fandongfeng  
* @date 2023/4/15 15:42  
*/  
@Component  
@RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_LETTER_QUEUE)  
public class OrderMQListener {  
  
  
  
    @RabbitHandler  
    public void consumer(String body, Message message, Channel channel) throws IOException {  
        System.out.println("收到消息:" + DateUtil.now());  
        long msgTag = message.getMessageProperties().getDeliveryTag();  
        System.out.println("msgTag=" + msgTag);  
        System.out.println("message=" + message);  
        System.out.println("body=" + body);  
        channel.basicAck(msgTag, false);  
    }  
  
}

測試類

@Test  
void testOrder() throws InterruptedException {  
//為true,則交換機處理消息到路由失敗,則會返回給生產(chǎn)者 配置文件指定,則這里不需指定  
    rabbitTemplate.setMandatory(true);  
    //開啟強制消息投遞(mandatory為設置為true),但消息未被路由至任何一個queue,則回退一條消息  
    rabbitTemplate.setReturnsCallback(returned -> {  
    int code = returned.getReplyCode();  
    System.out.println("code=" + code);  
    System.out.println("returned=" + returned);  
    });  
    rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, "order", "測試訂單延遲");  
    System.out.println("發(fā)送消息:" + DateUtil.now());  
    Thread.sleep(20000);  
}

程序輸出

發(fā)送消息:2023-04-16 15:14:34
收到消息:2023-04-16 15:14:44
msgTag=1
message=(Body:'測試訂單延遲' MessageProperties [headers={spring_listener_return_correlation=03169cfc-5061-41fe-be47-c98e36d17eac, x-first-death-exchange=order_exchange, x-death=[{reason=expired, count=1, exchange=order_exchange, time=Mon Apr 16 15:14:44 CST 2023, routing-keys=[order], queue=order_queue}], x-first-death-reason=expired, x-first-death-queue=order_queue}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=order_dead_letter_exchange, receivedRoutingKey=order_dead_letter_queue_routing_key, deliveryTag=1, consumerTag=amq.ctag-Eh8GMgrsrAH1rvtGj7ykOQ, consumerQueue=order_dead_letter_queue])
body=測試訂單延遲

到此這篇關(guān)于SpringBoot整合RabbitMQ實現(xiàn)延遲隊列的示例詳解的文章就介紹到這了,更多相關(guān)SpringBoot RabbitMQ延遲隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論