欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

springcloud中RabbitMQ死信隊(duì)列與延遲交換機(jī)實(shí)現(xiàn)方法

 更新時(shí)間:2022年05月31日 09:58:08   作者:wu55555  
死信隊(duì)列是消息隊(duì)列中非常重要的概念,同時(shí)我們需要業(yè)務(wù)場(chǎng)景中都需要延遲發(fā)送的概念,比如12306中的30分鐘后未支付訂單取消,那么本期,我們就來講解死信隊(duì)列,以及如何通過延遲交換機(jī)來實(shí)現(xiàn)延遲發(fā)送的需求,感興趣的朋友一起看看吧

0.引言

死信隊(duì)列是消息隊(duì)列中非常重要的概念,同時(shí)我們需要業(yè)務(wù)場(chǎng)景中都需要延遲發(fā)送的概念,比如12306中的30分鐘后未支付訂單取消。那么本期,我們就來講解死信隊(duì)列,以及如何通過延遲交換機(jī)來實(shí)現(xiàn)延遲發(fā)送的需求。

1. 死信隊(duì)列

1.2 什么是死信?

理解死信隊(duì)列前,我們先講解什么是死信,所謂死信就是沒有被成功消費(fèi)的消息,但并不是所有未成功消費(fèi)的消息都是死信消息,死信消息的產(chǎn)生來源于以下三種途徑: (1)消息被消費(fèi)者拒絕,參數(shù)requeue設(shè)置為false的消息 (2)過期的消息,過期消息分為兩種: a. 發(fā)送消息時(shí),設(shè)置了某一條消息的生存時(shí)間(message TTL),如果生存時(shí)間到了,消息還沒有被消費(fèi),就會(huì)被標(biāo)注為死信消息 b. 設(shè)置了隊(duì)列的消息生存時(shí)間,針對(duì)隊(duì)列中所有的消息,如果生存時(shí)間到了,消息還沒有被消費(fèi),就會(huì)被標(biāo)注為死信消息 (3)當(dāng)隊(duì)列達(dá)到了最大長(zhǎng)度后,再發(fā)送過來的消息就會(huì)直接變成死信消息

1.3 什么是死信隊(duì)列?

直接來講,用來盛裝死信的隊(duì)列就是死信隊(duì)列,好像是一句廢話,所以其重點(diǎn)在于理解死信的概念。

死信隊(duì)列的作用: (1)隊(duì)列在已滿的情況下,會(huì)將消息發(fā)送到死信隊(duì)列中,這樣消息就不會(huì)丟失了,回頭再?gòu)乃佬抨?duì)列里將消息取出來進(jìn)行消費(fèi)即可 (2)可以基于死信隊(duì)列實(shí)現(xiàn)延遲消費(fèi)的效果。具體的實(shí)現(xiàn)我們后續(xù)講解

1.4 創(chuàng)建死信交換機(jī)、死信隊(duì)列

死信交換機(jī)、死信隊(duì)列其實(shí)都是普通的交換機(jī)、隊(duì)列,只是專門聲明出來用于存儲(chǔ)死信消息的。我們只需要通過deadLetterExchange方法來聲明死信交換機(jī),然后用deadLetterRoutingKey方法來聲明死信隊(duì)列

如下代碼所示,我們創(chuàng)建了test.queue、test.exchangedead.queue、dead.exchange,并且在test.queue中將死信交換機(jī)和死信路由指定到了測(cè)試隊(duì)列中

注意:涉及到修改隊(duì)列、交換機(jī)屬性的,如果該隊(duì)列、交換機(jī)已經(jīng)存在需要將其刪除后才能生效,否則可能還會(huì)報(bào)錯(cuò)。

@Configuration
public class RabbitMqConfig {

    private static final String TEST_EXCHANGE = "test.exchange";
    private static final String TEST_QUEUE = "test.queue";
    private static final String TEST_ROUTING_KEY = "test.routing.key";
    private static final String DEAD_EXCHANGE = "dead.exchange";
    private static final String DEAD_QUEUE = "dead.queue";
    private static final String DEAD_ROUTING_KEY = "dead.routing.key";
    
    @Bean
    public Queue deadQueue(){
        return new Queue(DEAD_QUEUE);
    }
    public DirectExchange deadExchange(){
    // 設(shè)置演示,使用了直接交換機(jī)Direct,大家可以根據(jù)自己的業(yè)務(wù)情況聲明為其他類型的交換機(jī)
        return new DirectExchange(DEAD_EXCHANGE);
    public Binding deadBinding(Queue deadQueue,Exchange deadExchange){
        return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
    public Queue testQueue(){
        return QueueBuilder.durable(TEST_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey(DEAD_ROUTING_KEY).build();
    public DirectExchange testExchange(){
        return new DirectExchange(TEST_EXCHANGE);
    public Binding testQueueBing(Queue testQueue, DirectExchange testExchange){
        return BindingBuilder.bind(testQueue).to(testExchange).with(TEST_ROUTING_KEY);
}

1.5 實(shí)現(xiàn)死信消息

1.5.1 基于消費(fèi)者進(jìn)行reject或nack實(shí)現(xiàn)死信消息

@Component
public class QueueListener {
    @RabbitListener(queues = RabbitMqConfig.TEST_QUEUE)
    public void handler(MyMessage messageInfo, Message message, Channel channel) {
        try{
            System.out.println("接收的消息:"+messageInfo.toString());
            // requeue參數(shù)設(shè)置為false 設(shè)置死信消息
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
            // multiple和requeue設(shè)置為false 設(shè)置死信消息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
            // 返回ack 確認(rèn)接收到消息
//            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }catch (IOException e){
            try {
                channel.basicRecover();
            } catch (IOException ex) {
                ex.printStackTrace();
                log.error("消息處理失?。簕}",e.getMessage());
            }
        }
    }
}

1.5.2 基于生存時(shí)間實(shí)現(xiàn)

(1)發(fā)送消息時(shí)設(shè)置生存時(shí)間

    @GetMapping("sendTestQueueWithExpiration")
    public String sendTestQueueWithExpiration(){
        MyMessage message = new MyMessage(1L,"物流提醒","到達(dá)裝貨區(qū)域,注意上傳憑證",new Date());
        rabbitTemplate.convertAndSend(RabbitMqConfig.TEST_EXCHANGE,RabbitMqConfig.TEST_ROUTING_KEY, message,msg -> {
            msg.getMessageProperties().setExpiration("5000");
            return msg;
        });
        return "發(fā)送成功";
    }

(2)隊(duì)列設(shè)置生存時(shí)間

    @Bean
    public Queue testQueue(){
        return QueueBuilder.durable(TEST_QUEUE)
                .deadLetterExchange(DEAD_EXCHANGE)
                .deadLetterRoutingKey(DEAD_ROUTING_KEY)
                // 10s 過期
                .ttl(10000)
                .build();
    }

1.5.3 基于隊(duì)列max_length實(shí)現(xiàn)

    @Bean
    public Queue testQueue(){
        return QueueBuilder.durable(TEST_QUEUE)
                .deadLetterExchange(DEAD_EXCHANGE)
                .deadLetterRoutingKey(DEAD_ROUTING_KEY)
                // 容量最大100條
                .maxLength(100)
                .build();
    }

1.6 基于死信隊(duì)列實(shí)現(xiàn)消息延遲發(fā)送

上述我們說過死信隊(duì)列還可以消息延遲發(fā)送,其思路就是: (1)消息發(fā)送時(shí)設(shè)置消息的生存時(shí)間,其生存時(shí)間就是我們想要延遲的時(shí)間 (2)消息者監(jiān)控死信隊(duì)列進(jìn)行消費(fèi)

正常隊(duì)列的消息因?yàn)闆]有消費(fèi)者消費(fèi),同時(shí)又指定了生存時(shí)間,到達(dá)時(shí)間后消息轉(zhuǎn)發(fā)到死信隊(duì)列中,消費(fèi)者監(jiān)聽了死信隊(duì)列從而將其消費(fèi)掉。

基于死信隊(duì)列實(shí)現(xiàn)消息延遲發(fā)送的問題

如果有兩個(gè)消息,一個(gè)是5s生存時(shí)間,一個(gè)是10s生存時(shí)間,當(dāng)我們先發(fā)送了10s生存時(shí)間的消息到queue中時(shí),因?yàn)閞abbitmq只會(huì)監(jiān)控隊(duì)列最外側(cè)的消息的生存時(shí)間,也就是監(jiān)控10s生存時(shí)間的消息,而5s生存時(shí)間的消息只會(huì)在最外側(cè)的10s消息到期后才會(huì)監(jiān)控,也就導(dǎo)致我實(shí)際需要5s生存的消息,實(shí)際需要10s才監(jiān)聽到了。

所以呢,基于死信隊(duì)列實(shí)現(xiàn)的延遲消息,只使用于延遲時(shí)間一致的消息。

為了適配更多的延遲場(chǎng)景,已經(jīng)更加簡(jiǎn)單的實(shí)現(xiàn)延遲消息,我們引入了延遲交換機(jī)

2. 延遲交換機(jī)

延遲交換機(jī)并不是rabbitmq自帶的功能,而是要通過安裝延遲交換機(jī)插件delayed_message_exchange來實(shí)現(xiàn)

其插件的安裝我們之間已經(jīng)講解過,不再累敘,可以參考如下博文 springcloud:安裝rabbitmq并配置延遲隊(duì)列插件

通過延遲交換機(jī)實(shí)現(xiàn)的延遲消息,其重點(diǎn)主要在交換機(jī)上,隊(duì)列就是普通隊(duì)列,消息發(fā)送到交換機(jī)上后,會(huì)記錄消息的延遲時(shí)間,到達(dá)時(shí)間后才會(huì)發(fā)送到隊(duì)列中,這樣消費(fèi)者通過監(jiān)控隊(duì)列,就能在指定時(shí)間獲取到消息

因此延遲交換機(jī)與普通交換機(jī)的實(shí)現(xiàn),只在創(chuàng)建交換機(jī)時(shí),其他的操作與普通交換機(jī)無異,因此使用起來也很方便

創(chuàng)建延遲交換機(jī),通過x-delayed-type屬性聲明交換機(jī)類型,可以是direct也可以是topic,具體支持4中交換機(jī)類型,如果不清楚的可以參考之前的博文

@Configuration
public class RabbitMqDelayConfig {
    public static final String DELAY_EXCHANGE = "delay.exchange";
    public static final String DELAY_QUEUE = "delay.queue";
    public static final String DELAY_ROUTING_KEY = "delay.routing.key";
    @Bean
    public Exchange delayExchange(){
        Map<String, Object> arguments = new HashMap<>(1);
        arguments.put("x-delayed-type","direct");
        return new CustomExchange(DELAY_EXCHANGE,"x-delayed-message",true,false,arguments);
    }
    @Bean
    public Queue delayQueue(){
        return new Queue(DELAY_QUEUE);
    }
    @Bean
    public Binding delayBinding(Queue delayQueue, Exchange delayExchange){
        return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs();
    }

}

發(fā)送消息時(shí)指定延遲時(shí)間,單位毫秒

rabbitTemplate.convertAndSend(DelayedConfig.DELAYED_EXCHANGE, "delayed.abc", "xxxx", new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setDelay(30000);
                return message;
            }
        });

我們還可以將該方法封裝為工具類方法,方便之后調(diào)用

/**
	 * 發(fā)送 延遲隊(duì)列
	 * @param exchange 交換機(jī)
	 * @param routeKey 路由
	 * @param message 消息
	 * @param delaySecond 延遲秒數(shù)
	 */
	public void send(String exchange, String routeKey, Object message, int delaySecond){  
		rabbitTemplate.convertAndSend(exchange,routeKey,message,msg -> {
			// 消息持久化 
			msg.getMessageProperties().setDelay(delaySecond * 1000);
			return msg;
		});
	}

3. 應(yīng)用場(chǎng)景

延遲消息的應(yīng)用場(chǎng)景豐富,除了我們開篇所說的30分鐘未支付自動(dòng)取消訂單,還比如到貨后72小時(shí)未簽收自動(dòng)簽收

基本上所有需要延遲觸發(fā)的業(yè)務(wù)場(chǎng)景都可以用rabbitmq延遲隊(duì)列來實(shí)現(xiàn)。

4. 練習(xí)題

對(duì)于剛接觸rabbitmq的同學(xué),這里我提供一個(gè)練習(xí)題給大家,也讓大家在實(shí)操中加強(qiáng)對(duì)于rabbitmq的理解:

需求:訂單到貨后72小時(shí)未簽收,自動(dòng)簽收 講解:我們這里要實(shí)現(xiàn)訂單到貨后的自動(dòng)簽收功能,訂單到貨后會(huì)觸發(fā)發(fā)送自動(dòng)簽收消息的方法,訂單已簽收的狀態(tài)status為2,到貨狀態(tài)為1,如果72小時(shí)前已經(jīng)簽收了即status被更新為2了,那么需要取消自動(dòng)簽收(不執(zhí)行自動(dòng)簽收,即忽略自動(dòng)簽收消息)

到此這篇關(guān)于springcloud:RabbitMQ死信隊(duì)列與延遲交換機(jī)實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)springcloud RabbitMQ死信隊(duì)列與延遲交換機(jī)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論