欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RabbitMQ消息的延遲隊(duì)列詳解

 更新時(shí)間:2024年02月11日 10:31:27   作者:云村小威  
這篇文章主要介紹了RabbitMQ消息的延遲隊(duì)列,延遲隊(duì)列也就是死信交換機(jī),有些隊(duì)列的消息成為死信后,消息中間件可以將其從當(dāng)前隊(duì)列發(fā)送到另一個(gè)隊(duì)列中,這個(gè)隊(duì)列就是死信隊(duì)列,感興趣的同學(xué)可以參考下文

Dead Letter Exchange(死信交換機(jī))

在MQ中,當(dāng)消息成為死信(Dead message 死掉的信息)后,消息中間件可以將其從當(dāng)前隊(duì)列發(fā)送到另一個(gè)隊(duì)列中,這個(gè)隊(duì)列就是死信隊(duì)列。而 在RabbitMQ中,由于有交換機(jī)的概念,實(shí)際是將死信發(fā)送給了死信交換機(jī)(Dead Letter Exchange,簡稱DLX)。死信交換機(jī)和死信隊(duì)列和普通的沒有區(qū)別。

消息成為死信的情況

  • 隊(duì)列消息長度到達(dá)限制
  • 消費(fèi)者拒簽消息,并且不把消息重新放入原隊(duì)列
  • 消息到達(dá)存活時(shí)間未被消費(fèi)

有些隊(duì)列的消息成為死信后,(比如過期了或者隊(duì)列滿了)這些死信一般情況下是會被 RabbitMQ 清理的。但是你可以配置某個(gè)交換機(jī)為此隊(duì)列的死信交換機(jī),該隊(duì)列的消息成為死信后會被重新發(fā)送到此 DLX 。至于怎么處理這個(gè)DLX中的死信就是看具體的業(yè)務(wù)場景了,DLX 中的信息可以被路由到新的隊(duì)列。

生產(chǎn)者

    /**
     * 普通交換機(jī)綁定普通交換機(jī)
     *
     * @return
     */
    @Bean
    public Queue queueA() {
        //信息配置
        Map<String, Object> map = new HashMap<>();
        //message在該隊(duì)列queue的存活時(shí)間最大為15秒
        map.put("x-message-ttl", 15000);
        //x-dead-letter-exchange參數(shù)是設(shè)置該隊(duì)列的死信交換器(DLX)
        map.put("x-dead-letter-exchange", "exchangeB");
        //x-dead-letter-routing-key參數(shù)是給這個(gè)DLX指定路由鍵
        map.put("x-dead-letter-routing-key", "queueB");
        return new Queue("queueA", true, false, false, map);
    }
    @Bean
    public DirectExchange exchangeA() {
        return new DirectExchange("exchangeA");
    }
    @Bean
    public Binding bindingA() {
        return BindingBuilder
                .bind(queueA())
                .to(exchangeA()).with("queueA");
    }
    /**
     * 死信交換機(jī)綁定死信交換機(jī)
     *
     * @return
     */
    @Bean
    public Queue queueB() {
        return new Queue("queueB");
    }
    @Bean
    public DirectExchange exchangeB() {
        return new DirectExchange("exchangeB");
    }
    @Bean
    public Binding bindingB() {
        return BindingBuilder
                .bind(queueB())
                .to(exchangeB()).with("queueB");
    }

模擬發(fā)送請求

    @RequestMapping("/send6")
    public String sendSix() throws JsonProcessingException {
        rabbitTemplate.convertAndSend("exchangeA", "queueA", "檢查訂單是否過期");
        return "??";
    }

這時(shí)我發(fā)送請求到隊(duì)列queueA,并設(shè)置了15秒的延遲,將超時(shí)的信息調(diào)用到死信交換機(jī)中。在這里我是沒開啟消費(fèi)者所有沒有消費(fèi)者去處理該請求的,信息在queueA隊(duì)列等待15秒后將會轉(zhuǎn)到死信交換機(jī)queueB隊(duì)列進(jìn)行處理:

延遲隊(duì)列

延遲隊(duì)列,即消息進(jìn)入隊(duì)列后不會立即被消費(fèi),只有到達(dá)指定時(shí)間后,才會被消費(fèi)。經(jīng)典的應(yīng)用場景是下單減庫存。

根據(jù)以上結(jié)論,在rabbitmq中消費(fèi)者只要接到信息就會自動(dòng)確認(rèn)進(jìn)行處理。所以在上面并沒有開啟消費(fèi)者,當(dāng)請求時(shí)效后(如訂單未支付,定時(shí)30分鐘自動(dòng)取消功能)我們不應(yīng)該再讓它正常處理,而把該請求放到死信交換機(jī)中安排對應(yīng)的處理,所以我們需要打消費(fèi)者自動(dòng)處理請求改成手動(dòng)。

如果手動(dòng)確認(rèn)則當(dāng)消費(fèi)者調(diào)用 ack、nack、reject 幾種方法進(jìn)行確認(rèn),手動(dòng)確認(rèn)可以在業(yè)務(wù)失敗后進(jìn)行一些操作,如果消息未被 ACK 則會發(fā)送到下一個(gè)消費(fèi)者

如果某個(gè)服務(wù)忘記 ACK 了,則 RabbitMQ 不會再發(fā)送數(shù)據(jù)給它,因?yàn)?RabbitMQ 認(rèn)為該服務(wù)的處理能力有限

ACK 機(jī)制還可以起到限流作用,比如在接收到某條消息時(shí)休眠幾秒鐘

消息確認(rèn)模式有:

  • AcknowledgeMode.NONE:自動(dòng)確認(rèn)
  • AcknowledgeMode.AUTO:根據(jù)情況確認(rèn)
  • AcknowledgeMode.MANUAL:手動(dòng)確認(rèn)

確認(rèn)消息(局部方法處理消息)

默認(rèn)情況下消息消費(fèi)者是自動(dòng) ack (確認(rèn))消息的,如果要手動(dòng) ack(確認(rèn))則需要修改確認(rèn)模式為 manual

消費(fèi)者添加手動(dòng)確認(rèn)消息配置配置 :

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manua

消費(fèi)者接受消息:

package com.ycxw.consumer.demos;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class DLXReceiver {
    @RabbitListener(queues = {"queueA"})
    @RabbitHandler
    public void handlerA(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        System.out.println("已接受到隊(duì)列queueA傳遞過來的消息:" + msg);
        channel.basicReject(tag, false);// 拒接消息,如果為true則拒絕后又從新回到隊(duì)列被接受(循環(huán)),除非消息過期。
        //channel.basicAck(tag, true); 確認(rèn)消息()一次性全接受,如果為false則接受一次
    }
    /**
     * 接受死信消息
     *
     * @param msg
     */
    @RabbitListener(queues = {"queueB"})
    @RabbitHandler
    public void handlerB(String msg) {
        /**
         * ...接受到信息,去數(shù)據(jù)庫處理
         */
        System.out.println("已接受到隊(duì)列queueB傳遞過來的消息:" + msg);
    }
}

第一次進(jìn)入普通隊(duì)列別拒絕后,轉(zhuǎn)到死信隊(duì)列中處理...

需要注意的 basicAck 方法需要傳遞兩個(gè)參數(shù)

  • deliveryTag(唯一標(biāo)識 ID):當(dāng)一個(gè)消費(fèi)者向 RabbitMQ 注冊后,會建立起一個(gè) Channel ,RabbitMQ 會用 basic.deliver 方法向消費(fèi)者推送消息,這個(gè)方法攜帶了一個(gè) delivery tag, 它代表了 RabbitMQ 向該 Channel 投遞的這條消息的唯一標(biāo)識 ID,是一個(gè)單調(diào)遞增的正整數(shù),delivery tag 的范圍僅限于 Channel
  • multiple:為了減少網(wǎng)絡(luò)流量,手動(dòng)確認(rèn)可以被批處理,當(dāng)該參數(shù)為 true 時(shí),則可以一次性確認(rèn) delivery_tag 小于等于傳入值的所有消息

到此這篇關(guān)于RabbitMQ消息的延遲隊(duì)列詳解的文章就介紹到這了,更多相關(guān)RabbitMQ延遲隊(duì)列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java的內(nèi)存區(qū)域與內(nèi)存溢出異常你了解嗎

    Java的內(nèi)存區(qū)域與內(nèi)存溢出異常你了解嗎

    這篇文章主要為大家詳細(xì)介紹了Java的內(nèi)存區(qū)域與內(nèi)存溢出異常,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助
    2022-03-03
  • Java堆內(nèi)存又溢出了!教你一招必殺技(推薦)

    Java堆內(nèi)存又溢出了!教你一招必殺技(推薦)

    這篇文章主要介紹了Java內(nèi)存溢出問題,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-04-04
  • Spring如何使用@Indexed加快啟動(dòng)速度

    Spring如何使用@Indexed加快啟動(dòng)速度

    這篇文章主要介紹了Spring如何使用@Indexed加快啟動(dòng)速度,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-11-11
  • Java中Base64加密解密舉例詳解

    Java中Base64加密解密舉例詳解

    Base64編碼是我們程序開發(fā)中經(jīng)常使用到的編碼方法,它是一種基于用64個(gè)可打印字符來表示二進(jìn)制數(shù)據(jù)的表示方法,這篇文章主要給大家介紹了關(guān)于Java中Base64加密解密的相關(guān)資料,需要的朋友可以參考下
    2024-05-05
  • java 通過發(fā)送json,post請求,返回json數(shù)據(jù)的方法

    java 通過發(fā)送json,post請求,返回json數(shù)據(jù)的方法

    下面小編就為大家分享一篇java 通過發(fā)送json,post請求,返回json數(shù)據(jù)的方法,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2018-03-03
  • springboot如何實(shí)現(xiàn)自動(dòng)裝配源碼解讀

    springboot如何實(shí)現(xiàn)自動(dòng)裝配源碼解讀

    這篇文章主要介紹了springboot如何實(shí)現(xiàn)自動(dòng)裝配源碼賞析,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-12-12
  • Spring Boot 3 整合 Spring Cloud Gateway實(shí)踐過程

    Spring Boot 3 整合 Spring Cloud 

    本文介紹了如何使用SpringCloudAlibaba2023.0.0.0版本構(gòu)建一個(gè)微服務(wù)網(wǎng)關(guān),包括統(tǒng)一路由、限流防刷和登錄鑒權(quán)等功能,并通過一個(gè)項(xiàng)目實(shí)例進(jìn)行詳細(xì)說明,感興趣的朋友一起看看吧
    2025-02-02
  • Java?IDEA集成開發(fā)工具中英文切換圖文教程

    Java?IDEA集成開發(fā)工具中英文切換圖文教程

    相信很多小伙伴們剛接觸IDEA時(shí),看到一堆英文界面不知道如何下手,這篇文章主要給大家介紹了關(guān)于Java?IDEA集成開發(fā)工具中英文切換的相關(guān)資料,需要的朋友可以參考下
    2024-04-04
  • java中this關(guān)鍵字的詳細(xì)使用介紹

    java中this關(guān)鍵字的詳細(xì)使用介紹

    大家好,本篇文章主要講的是java中this關(guān)鍵字的詳細(xì)使用介紹,感興趣的同學(xué)趕快來看一看吧,對你有幫助的話記得收藏一下,方便下次瀏覽
    2022-01-01
  • Fastjson 常用API介紹及下載地址(推薦)

    Fastjson 常用API介紹及下載地址(推薦)

    Fastjson是一個(gè)Java語言編寫的高性能功能完善的JSON庫。接下來通過本文給大家分享Fastjson 常用API介紹及下載地址,感興趣的朋友一起看看吧
    2017-11-11

最新評論