欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

java使用RabbitMQ實現(xiàn)延遲消息示例

 更新時間:2024年10月22日 08:33:28   作者:java炒飯小能手  
本文介紹了在分布式系統(tǒng)中,使用RabbitMQ實現(xiàn)延遲消息處理,其中詳細闡述了RabbitMQ隊列和交換機的配置、消息的發(fā)送與接收以及死信隊列的處理,具有一定的參考價值,感興趣的可以了解一下

在分布式系統(tǒng)中,消息隊列通常用于解耦服務(wù),RabbitMQ是一個廣泛使用的消息隊列服務(wù)。延遲消息(也稱為延時隊列或TTL消息)是一種常見的場景應(yīng)用,特別適合處理某些任務(wù)在一段時間后執(zhí)行的需求,如訂單超時處理、延時通知等。

本文將以具體代碼為例,展示如何使用RabbitMQ來實現(xiàn)延遲消息處理,涵蓋隊列和交換機的配置、消息的發(fā)送與接收以及死信隊列的處理。

什么是延遲消息?

延遲消息是指消息在發(fā)送到隊列后,經(jīng)過設(shè)定的時間延遲再被消費。RabbitMQ 本身沒有直接支持延遲隊列的功能,但可以通過 TTL(Time To Live)+ 死信隊列(Dead Letter Queue, DLQ) 的組合來實現(xiàn)。當(dāng)消息超過TTL(消息存活時間)后,不會被立即消費,而是會被轉(zhuǎn)發(fā)到綁定的死信隊列,從而實現(xiàn)延遲處理。

RabbitMQ中的延遲消息原理

在RabbitMQ中,我們可以通過以下幾個概念來實現(xiàn)延遲消息:

  • TTL(Time To Live):可以為隊列設(shè)置TTL,消息超過該時間后會被標(biāo)記為“死信”。
  • 死信隊列(Dead Letter Queue):當(dāng)消息在正常隊列中過期或處理失敗時,RabbitMQ可以將它們路由到一個死信隊列,死信隊列可以用來處理這些過期或未處理的消息。
  • x-dead-letter-exchange 和 x-dead-letter-routing-key:可以通過配置隊列的參數(shù),將過期消息發(fā)送到一個專門的死信交換器,并根據(jù)指定的路由鍵轉(zhuǎn)發(fā)到死信隊列。

 消息來到ttl.queue消息隊列,過期時間內(nèi)無人消費,消息來到死信交換機hmall.direct,在direct.queue消息隊列無需等待。

1. RabbitMQ的配置

首先,我們需要配置兩個隊列和兩個交換機:一個用于存放延時消息,另一個用于處理超時的死信消息。

package com.heima.stroke.configuration;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitConfig {
    // 延遲時間 單位:毫秒 (這里設(shè)為30秒)
    private static final long DELAY_TIME = 1000 * 30;

    // 行程超時隊列
    public static final String STROKE_OVER_QUEUE = "STROKE_OVER_QUEUE";
    // 行程死信隊列
    public static final String STROKE_DEAD_QUEUE = "STROKE_DEAD_QUEUE";

    // 行程超時隊列交換機
    public static final String STROKE_OVER_QUEUE_EXCHANGE = "STROKE_OVER_QUEUE_EXCHANGE";
    // 行程死信隊列交換機
    public static final String STROKE_DEAD_QUEUE_EXCHANGE = "STROKE_DEAD_QUEUE_EXCHANGE";

    // 行程超時交換機 Routing Key
    public static final String STROKE_OVER_KEY = "STROKE_OVER_KEY";
    // 行程死信交換機 Routing Key
    public static final String STROKE_DEAD_KEY = "STROKE_DEAD_KEY";

    /**
     * 聲明行程超時隊列,并設(shè)置其參數(shù)
     * x-dead-letter-exchange:綁定的死信交換機
     * x-dead-letter-routing-key:死信路由Key
     * x-message-ttl:消息的過期時間
     */
    @Bean
    public Queue strokeOverQueue() {
        Map<String, Object> args = new HashMap<>(3);
        args.put("x-dead-letter-exchange", STROKE_DEAD_QUEUE_EXCHANGE);
        args.put("x-dead-letter-routing-key", STROKE_DEAD_KEY);
        args.put("x-message-ttl", DELAY_TIME); // 設(shè)置TTL為30秒
        return QueueBuilder.durable(STROKE_OVER_QUEUE).withArguments(args).build();
    }

    @Bean
    public DirectExchange strokeOverQueueExchange() {
        return new DirectExchange(STROKE_OVER_QUEUE_EXCHANGE);
    }

    @Bean
    public Binding bindingStrokeOverDirect() {
        return BindingBuilder.bind(strokeOverQueue()).to(strokeOverQueueExchange()).with(STROKE_OVER_KEY);
    }
}

解釋:

TTL設(shè)置:我們通過x-message-ttl設(shè)置消息的過期時間為30秒。

死信隊列綁定:通過x-dead-letter-exchangex-dead-letter-routing-key設(shè)置,當(dāng)消息過期時,它會被轉(zhuǎn)發(fā)到死信交換機,再路由到死信隊列。

2. 生產(chǎn)者發(fā)送延遲消息

接下來,我們通過生產(chǎn)者向超時隊列發(fā)送消息,這些消息將在TTL過期后轉(zhuǎn)發(fā)到死信隊列。

package com.heima.stroke.rabbitmq;

import com.alibaba.fastjson.JSON;
import com.heima.modules.vo.StrokeVO;
import com.heima.stroke.configuration.RabbitConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MQProducer {
    private final static Logger logger = LoggerFactory.getLogger(MQProducer.class);

    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * 發(fā)送延時消息到行程超時隊列
     *
     * @param strokeVO 消息體
     */
    public void sendOver(StrokeVO strokeVO) {
        String mqMessage = JSON.toJSONString(strokeVO);
        logger.info("send timeout msg:{}", mqMessage);

        rabbitTemplate.convertAndSend(RabbitConfig.STROKE_OVER_QUEUE_EXCHANGE, RabbitConfig.STROKE_OVER_KEY, mqMessage);
    }
}

解釋:

sendOver 方法將消息發(fā)送到超時隊列,消息將在超時后進入死信隊列。生產(chǎn)者不需要額外處理TTL或死信的配置,只需發(fā)送消息即可。

3. 消費者監(jiān)聽死信隊列

當(dāng)消息超過TTL后,將會被轉(zhuǎn)發(fā)到死信隊列。消費者需要監(jiān)聽死信隊列并處理這些消息。

package com.heima.stroke.rabbitmq;

import com.alibaba.fastjson.JSON;
import com.heima.modules.vo.StrokeVO;
import com.heima.stroke.configuration.RabbitConfig;
import com.heima.stroke.handler.StrokeHandler;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@Component
public class MQConsumer {
    private final static Logger logger = LoggerFactory.getLogger(MQConsumer.class);

    @Autowired
    private StrokeHandler strokeHandler;

    /**
     * 監(jiān)聽死信隊列
     *
     * @param message 消息體
     * @param channel RabbitMQ的Channel
     * @param tag 消息的Delivery Tag
     */
    @RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue(value = RabbitConfig.STROKE_DEAD_QUEUE, durable = "true"),
                            exchange = @Exchange(value = RabbitConfig.STROKE_DEAD_QUEUE_EXCHANGE),
                            key = RabbitConfig.STROKE_DEAD_KEY)
            })
    @RabbitHandler
    public void processStroke(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        StrokeVO strokeVO = JSON.parseObject(message.getBody(), StrokeVO.class);
        logger.info("get dead msg:{}", message.getBody());
        
        if (strokeVO == null) {
            return;
        }

        try {
            // 處理超時的行程消息
            strokeHandler.timeoutHandel(strokeVO);
            // 手動確認消息
            channel.basicAck(tag, false);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

解釋:

@RabbitListener 注解綁定了死信隊列的監(jiān)聽器。當(dāng)消息被轉(zhuǎn)發(fā)到死信隊列時,該消費者會接收到消息。

使用 channel.basicAck(tag, false) 手動確認消息處理成功,確保消息不會重復(fù)消費。

4. 處理超時業(yè)務(wù)邏輯

在我們的業(yè)務(wù)中,當(dāng)消息超時未處理時,將其狀態(tài)設(shè)置為超時。

public void timeoutHandel(StrokeVO strokeVO) {
    // 獲取司機行程ID和乘客行程ID
    String inviterTripId = strokeVO.getInviterTripId();
    String inviteeTripId = strokeVO.getInviteeTripId();

    // 檢查邀請狀態(tài)是否為未確認
    String inviteeStatus = redisHelper.getHash(HtichConstants.STROKE_INVITE_PREFIX, inviteeTripId, inviterTripId);
    String inviterStatus = redisHelper.getHash(HtichConstants.STROKE_INVITE_PREFIX, inviterTripId, inviteeTripId);

    if (String.valueOf(InviteState.UNCONFIRMED.getCode()).equals(inviteeStatus) &&
        String.valueOf(InviteState.UNCONFIRMED.getCode()).equals(inviterStatus)) {
        // 更新為超時狀態(tài)
        redisHelper.addHash(HtichConstants.STROKE_INVITE_PREFIX, inviteeTripId, inviterTripId, String.valueOf(InviteState.TIMEOUT.getCode()));
        redisHelper.addHash(HtichConstants.STROKE_INVITE_PREFIX, inviterTripId, inviteeTripId, String.valueOf(InviteState.TIMEOUT.getCode()));
    }
}

到此這篇關(guān)于java使用RabbitMQ實現(xiàn)延遲消息示例的文章就介紹到這了,更多相關(guān)java RabbitMQ延遲消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 如何基于js及java分析并封裝排序算法

    如何基于js及java分析并封裝排序算法

    這篇文章主要介紹了如何基于js及java分析并封裝排序算法,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-07-07
  • IDEA 自帶的數(shù)據(jù)庫工具真的很牛(收藏版)

    IDEA 自帶的數(shù)據(jù)庫工具真的很牛(收藏版)

    這篇文章主要介紹了IDEA 自帶的數(shù)據(jù)庫工具真的很牛(收藏版),本文以 IntelliJ IDEA/ Mac 版本作為演示,其他版本的應(yīng)該也差距不大,需要的朋友可以參考下
    2021-04-04
  • springBoot項目如何實現(xiàn)啟動多個實例

    springBoot項目如何實現(xiàn)啟動多個實例

    這篇文章主要介紹了springBoot項目如何實現(xiàn)啟動多個實例的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-08-08
  • Java中Druid連接池連接超時獲取不到連接的解決

    Java中Druid連接池連接超時獲取不到連接的解決

    這篇文章主要介紹了Java中Druid連接池連接超時獲取不到連接的解決,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-11-11
  • Maven項目繼承實現(xiàn)過程圖解

    Maven項目繼承實現(xiàn)過程圖解

    這篇文章主要介紹了Maven項目繼承實現(xiàn)過程圖解,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-08-08
  • 關(guān)于@Autowierd && @Resource 你真的了解嗎

    關(guān)于@Autowierd && @Resource 你真的了解嗎

    這篇文章主要介紹了關(guān)于@Autowierd && @Resource的具體使用,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-08-08
  • spring集成mybatis實現(xiàn)mysql數(shù)據(jù)庫讀寫分離

    spring集成mybatis實現(xiàn)mysql數(shù)據(jù)庫讀寫分離

    本文通過實例代碼給大家介紹了spring集成mybatis實現(xiàn)mysql數(shù)據(jù)庫讀寫分離,需要的朋友可以參考下
    2017-08-08
  • Java并發(fā)之CAS原理詳解

    Java并發(fā)之CAS原理詳解

    這篇文章主要為大家詳細介紹了Java的CAS原理,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助
    2022-03-03
  • 詳談java 堆區(qū)、方法區(qū)和棧區(qū)

    詳談java 堆區(qū)、方法區(qū)和棧區(qū)

    下面小編就為大家?guī)硪黄斦刯ava 堆區(qū)、方法區(qū)和棧區(qū)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-05-05
  • 使用springboot實現(xiàn)上傳文件時校驗文件是否有病毒

    使用springboot實現(xiàn)上傳文件時校驗文件是否有病毒

    在SpringBoot中實現(xiàn)文件上傳時的病毒校驗,可以使用ClamAV、Metascan或VirusTotal等工具,這些工具通過掃描上傳的文件,可以有效地檢測和阻止惡意軟件的傳播,安裝和配置ClamAV服務(wù)的步驟如下:下載并安裝ClamAV二進制文件,配置clamd.conf文件
    2025-01-01

最新評論