欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

springcloud中RabbitMQ死信隊列與延遲交換機(jī)實現(xiàn)方法

 更新時間:2022年05月31日 09:58:08   作者:wu55555  
死信隊列是消息隊列中非常重要的概念,同時我們需要業(yè)務(wù)場景中都需要延遲發(fā)送的概念,比如12306中的30分鐘后未支付訂單取消,那么本期,我們就來講解死信隊列,以及如何通過延遲交換機(jī)來實現(xiàn)延遲發(fā)送的需求,感興趣的朋友一起看看吧

0.引言

死信隊列是消息隊列中非常重要的概念,同時我們需要業(yè)務(wù)場景中都需要延遲發(fā)送的概念,比如12306中的30分鐘后未支付訂單取消。那么本期,我們就來講解死信隊列,以及如何通過延遲交換機(jī)來實現(xiàn)延遲發(fā)送的需求。

1. 死信隊列

1.2 什么是死信?

理解死信隊列前,我們先講解什么是死信,所謂死信就是沒有被成功消費的消息,但并不是所有未成功消費的消息都是死信消息,死信消息的產(chǎn)生來源于以下三種途徑: (1)消息被消費者拒絕,參數(shù)requeue設(shè)置為false的消息 (2)過期的消息,過期消息分為兩種: a. 發(fā)送消息時,設(shè)置了某一條消息的生存時間(message TTL),如果生存時間到了,消息還沒有被消費,就會被標(biāo)注為死信消息 b. 設(shè)置了隊列的消息生存時間,針對隊列中所有的消息,如果生存時間到了,消息還沒有被消費,就會被標(biāo)注為死信消息 (3)當(dāng)隊列達(dá)到了最大長度后,再發(fā)送過來的消息就會直接變成死信消息

1.3 什么是死信隊列?

直接來講,用來盛裝死信的隊列就是死信隊列,好像是一句廢話,所以其重點在于理解死信的概念。

死信隊列的作用: (1)隊列在已滿的情況下,會將消息發(fā)送到死信隊列中,這樣消息就不會丟失了,回頭再從死信隊列里將消息取出來進(jìn)行消費即可 (2)可以基于死信隊列實現(xiàn)延遲消費的效果。具體的實現(xiàn)我們后續(xù)講解

1.4 創(chuàng)建死信交換機(jī)、死信隊列

死信交換機(jī)、死信隊列其實都是普通的交換機(jī)、隊列,只是專門聲明出來用于存儲死信消息的。我們只需要通過deadLetterExchange方法來聲明死信交換機(jī),然后用deadLetterRoutingKey方法來聲明死信隊列

如下代碼所示,我們創(chuàng)建了test.queue、test.exchangedead.queue、dead.exchange,并且在test.queue中將死信交換機(jī)和死信路由指定到了測試隊列中

注意:涉及到修改隊列、交換機(jī)屬性的,如果該隊列、交換機(jī)已經(jīng)存在需要將其刪除后才能生效,否則可能還會報錯。

@Configuration
public class RabbitMqConfig {

    private static final String TEST_EXCHANGE = "test.exchange";
    private static final String TEST_QUEUE = "test.queue";
    private static final String TEST_ROUTING_KEY = "test.routing.key";
    private static final String DEAD_EXCHANGE = "dead.exchange";
    private static final String DEAD_QUEUE = "dead.queue";
    private static final String DEAD_ROUTING_KEY = "dead.routing.key";
    
    @Bean
    public Queue deadQueue(){
        return new Queue(DEAD_QUEUE);
    }
    public DirectExchange deadExchange(){
    // 設(shè)置演示,使用了直接交換機(jī)Direct,大家可以根據(jù)自己的業(yè)務(wù)情況聲明為其他類型的交換機(jī)
        return new DirectExchange(DEAD_EXCHANGE);
    public Binding deadBinding(Queue deadQueue,Exchange deadExchange){
        return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
    public Queue testQueue(){
        return QueueBuilder.durable(TEST_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey(DEAD_ROUTING_KEY).build();
    public DirectExchange testExchange(){
        return new DirectExchange(TEST_EXCHANGE);
    public Binding testQueueBing(Queue testQueue, DirectExchange testExchange){
        return BindingBuilder.bind(testQueue).to(testExchange).with(TEST_ROUTING_KEY);
}

1.5 實現(xiàn)死信消息

1.5.1 基于消費者進(jìn)行reject或nack實現(xiàn)死信消息

@Component
public class QueueListener {
    @RabbitListener(queues = RabbitMqConfig.TEST_QUEUE)
    public void handler(MyMessage messageInfo, Message message, Channel channel) {
        try{
            System.out.println("接收的消息:"+messageInfo.toString());
            // requeue參數(shù)設(shè)置為false 設(shè)置死信消息
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
            // multiple和requeue設(shè)置為false 設(shè)置死信消息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
            // 返回ack 確認(rèn)接收到消息
//            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }catch (IOException e){
            try {
                channel.basicRecover();
            } catch (IOException ex) {
                ex.printStackTrace();
                log.error("消息處理失?。簕}",e.getMessage());
            }
        }
    }
}

1.5.2 基于生存時間實現(xiàn)

(1)發(fā)送消息時設(shè)置生存時間

    @GetMapping("sendTestQueueWithExpiration")
    public String sendTestQueueWithExpiration(){
        MyMessage message = new MyMessage(1L,"物流提醒","到達(dá)裝貨區(qū)域,注意上傳憑證",new Date());
        rabbitTemplate.convertAndSend(RabbitMqConfig.TEST_EXCHANGE,RabbitMqConfig.TEST_ROUTING_KEY, message,msg -> {
            msg.getMessageProperties().setExpiration("5000");
            return msg;
        });
        return "發(fā)送成功";
    }

(2)隊列設(shè)置生存時間

    @Bean
    public Queue testQueue(){
        return QueueBuilder.durable(TEST_QUEUE)
                .deadLetterExchange(DEAD_EXCHANGE)
                .deadLetterRoutingKey(DEAD_ROUTING_KEY)
                // 10s 過期
                .ttl(10000)
                .build();
    }

1.5.3 基于隊列max_length實現(xiàn)

    @Bean
    public Queue testQueue(){
        return QueueBuilder.durable(TEST_QUEUE)
                .deadLetterExchange(DEAD_EXCHANGE)
                .deadLetterRoutingKey(DEAD_ROUTING_KEY)
                // 容量最大100條
                .maxLength(100)
                .build();
    }

1.6 基于死信隊列實現(xiàn)消息延遲發(fā)送

上述我們說過死信隊列還可以消息延遲發(fā)送,其思路就是: (1)消息發(fā)送時設(shè)置消息的生存時間,其生存時間就是我們想要延遲的時間 (2)消息者監(jiān)控死信隊列進(jìn)行消費

正常隊列的消息因為沒有消費者消費,同時又指定了生存時間,到達(dá)時間后消息轉(zhuǎn)發(fā)到死信隊列中,消費者監(jiān)聽了死信隊列從而將其消費掉。

基于死信隊列實現(xiàn)消息延遲發(fā)送的問題

如果有兩個消息,一個是5s生存時間,一個是10s生存時間,當(dāng)我們先發(fā)送了10s生存時間的消息到queue中時,因為rabbitmq只會監(jiān)控隊列最外側(cè)的消息的生存時間,也就是監(jiān)控10s生存時間的消息,而5s生存時間的消息只會在最外側(cè)的10s消息到期后才會監(jiān)控,也就導(dǎo)致我實際需要5s生存的消息,實際需要10s才監(jiān)聽到了。

所以呢,基于死信隊列實現(xiàn)的延遲消息,只使用于延遲時間一致的消息。

為了適配更多的延遲場景,已經(jīng)更加簡單的實現(xiàn)延遲消息,我們引入了延遲交換機(jī)

2. 延遲交換機(jī)

延遲交換機(jī)并不是rabbitmq自帶的功能,而是要通過安裝延遲交換機(jī)插件delayed_message_exchange來實現(xiàn)

其插件的安裝我們之間已經(jīng)講解過,不再累敘,可以參考如下博文 springcloud:安裝rabbitmq并配置延遲隊列插件

通過延遲交換機(jī)實現(xiàn)的延遲消息,其重點主要在交換機(jī)上,隊列就是普通隊列,消息發(fā)送到交換機(jī)上后,會記錄消息的延遲時間,到達(dá)時間后才會發(fā)送到隊列中,這樣消費者通過監(jiān)控隊列,就能在指定時間獲取到消息

因此延遲交換機(jī)與普通交換機(jī)的實現(xiàn),只在創(chuàng)建交換機(jī)時,其他的操作與普通交換機(jī)無異,因此使用起來也很方便

創(chuàng)建延遲交換機(jī),通過x-delayed-type屬性聲明交換機(jī)類型,可以是direct也可以是topic,具體支持4中交換機(jī)類型,如果不清楚的可以參考之前的博文

@Configuration
public class RabbitMqDelayConfig {
    public static final String DELAY_EXCHANGE = "delay.exchange";
    public static final String DELAY_QUEUE = "delay.queue";
    public static final String DELAY_ROUTING_KEY = "delay.routing.key";
    @Bean
    public Exchange delayExchange(){
        Map<String, Object> arguments = new HashMap<>(1);
        arguments.put("x-delayed-type","direct");
        return new CustomExchange(DELAY_EXCHANGE,"x-delayed-message",true,false,arguments);
    }
    @Bean
    public Queue delayQueue(){
        return new Queue(DELAY_QUEUE);
    }
    @Bean
    public Binding delayBinding(Queue delayQueue, Exchange delayExchange){
        return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs();
    }

}

發(fā)送消息時指定延遲時間,單位毫秒

rabbitTemplate.convertAndSend(DelayedConfig.DELAYED_EXCHANGE, "delayed.abc", "xxxx", new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setDelay(30000);
                return message;
            }
        });

我們還可以將該方法封裝為工具類方法,方便之后調(diào)用

/**
	 * 發(fā)送 延遲隊列
	 * @param exchange 交換機(jī)
	 * @param routeKey 路由
	 * @param message 消息
	 * @param delaySecond 延遲秒數(shù)
	 */
	public void send(String exchange, String routeKey, Object message, int delaySecond){  
		rabbitTemplate.convertAndSend(exchange,routeKey,message,msg -> {
			// 消息持久化 
			msg.getMessageProperties().setDelay(delaySecond * 1000);
			return msg;
		});
	}

3. 應(yīng)用場景

延遲消息的應(yīng)用場景豐富,除了我們開篇所說的30分鐘未支付自動取消訂單,還比如到貨后72小時未簽收自動簽收

基本上所有需要延遲觸發(fā)的業(yè)務(wù)場景都可以用rabbitmq延遲隊列來實現(xiàn)。

4. 練習(xí)題

對于剛接觸rabbitmq的同學(xué),這里我提供一個練習(xí)題給大家,也讓大家在實操中加強(qiáng)對于rabbitmq的理解:

需求:訂單到貨后72小時未簽收,自動簽收 講解:我們這里要實現(xiàn)訂單到貨后的自動簽收功能,訂單到貨后會觸發(fā)發(fā)送自動簽收消息的方法,訂單已簽收的狀態(tài)status為2,到貨狀態(tài)為1,如果72小時前已經(jīng)簽收了即status被更新為2了,那么需要取消自動簽收(不執(zhí)行自動簽收,即忽略自動簽收消息)

到此這篇關(guān)于springcloud:RabbitMQ死信隊列與延遲交換機(jī)實現(xiàn)的文章就介紹到這了,更多相關(guān)springcloud RabbitMQ死信隊列與延遲交換機(jī)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論