欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot 整合 RabbitMQ 的使用方式(代碼示例)

 更新時(shí)間:2024年10月11日 09:39:36   作者:寰夢  
本文詳細(xì)介紹了使用RabbitTemplate進(jìn)行消息傳遞的幾種模式,包括點(diǎn)對點(diǎn)通信、發(fā)布/訂閱模式、工作隊(duì)列模式、路由模式和主題模式,每種模式都通過代碼示例展示了生產(chǎn)者和消費(fèi)者的實(shí)現(xiàn),幫助開發(fā)者理解和運(yùn)用RabbitMQ進(jìn)行高效的消息處理

一、RabbitTemplate 的使用

1.【導(dǎo)入依賴】

<!-- rabbitMQ -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.6.1</version>
</dependency>

2.【添加配置】

rabbitmq:
    host:   #ip地址
    port: 5672 #端口
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        prefetch: 1 # 默認(rèn)每次取出一條消息消費(fèi), 消費(fèi)完成取下一條
        acknowledge-mode: manual # 設(shè)置消費(fèi)端手動ack確認(rèn)
        retry:
          enabled: true # 是否支持重試
    publisher-confirm-type: correlated  #確認(rèn)消息已發(fā)送到交換機(jī)(Exchange)
    publisher-returns: true  #確認(rèn)消息已發(fā)送到隊(duì)列(Queue)

3.【點(diǎn)對點(diǎn)通信(隊(duì)列模式)(Point-to-Point Messaging)】

使用方式:

這種方式也被稱為隊(duì)列(Queue)模型。消息發(fā)送者(Producer)發(fā)送消息到隊(duì)列,然后消息接收者(Consumer)從隊(duì)列中獲取消息進(jìn)行處理。這種模型下,每個(gè)消息只有一個(gè)消費(fèi)者可以接收,確保消息的可靠傳遞和順序處理。

代碼示例: 生產(chǎn)者

    /**
     * 第一種模型: 簡單模型
     * 一個(gè)消息生產(chǎn)者  一個(gè)隊(duì)列  一個(gè)消費(fèi)者
     * @return
     */
    @GetMapping("hello/world")
    public void helloWorld() {
        SysUser sysUser = new SysUser();
        // 發(fā)送消息
        // 第一個(gè)參數(shù): String routingKey 路由規(guī)則 【交換機(jī) 和隊(duì)列的綁定規(guī)則 】  隊(duì)列名稱
        // 第二個(gè)參數(shù): object message 消息的內(nèi)容
//        rabbitTemplate.convertAndSend("hello_world_queue", "hello world rabbit!");
        ///  MessagePostProcessor 消息包裝器  如果需要對消息進(jìn)行包裝
        rabbitTemplate.convertAndSend("hello_world_queue", "hello world rabbit!", message -> {
            // 設(shè)置唯一的標(biāo)識
            message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
            return message;
        });

消費(fèi)者

import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
@Component
@Log4j2
public class HelloWorldConsumer {
    @Autowired
    private StringRedisTemplate redisTemplate;
    /**
     * 監(jiān)聽 hello_world_queue 隊(duì)列消費(fèi)消息
     * queues 監(jiān)聽隊(duì)列的名稱  要求這個(gè)隊(duì)列必須是已經(jīng)存在的隊(duì)列
     * queuesToDeclare 監(jiān)聽隊(duì)列 如果這個(gè)隊(duì)列不存在 則 rabbitMQ 中 RabbitAdmin 會幫助去構(gòu)建這個(gè)隊(duì)列
     */
    @RabbitListener(queuesToDeclare = @Queue("hello_world_queue"))
    public void helloWorldConsumer(String msg, Message message, Channel channel) {
        // 獲取消息的唯一標(biāo)識
        String messageId = message.getMessageProperties().getMessageId();
        // 將消息添加到 Redis的set集合中  set 不能重復(fù)的  方法的返回值 添加成功的數(shù)量
        Long count = redisTemplate.opsForSet().add("hello_world_queue", messageId);
        if (count != null && count == 1) {
            // 沒有消費(fèi)過   正常消費(fèi)
            log.info("hello_world_queue隊(duì)列消費(fèi)者接收到了消息,消息內(nèi)容:{}", message);
        }
    }
}

4.【發(fā)布/訂閱模式(Publish/Subscribe Messaging)】

使用方式:

在發(fā)布/訂閱模式中,消息發(fā)送者將消息發(fā)布到交換機(jī)(Exchange),而不是直接發(fā)送到隊(duì)列。交換機(jī)負(fù)責(zé)將消息路由到一個(gè)或多個(gè)綁定的隊(duì)列中。每個(gè)訂閱者(Subscriber)可以選擇訂閱它感興趣的消息隊(duì)列,從而接收消息。

代碼示例: 生產(chǎn)者

/**
 * 工作隊(duì)列
 * 一個(gè)生產(chǎn)者  一個(gè)隊(duì)列  多個(gè)消費(fèi)者
 */
@GetMapping("work/queue")
public void workQueue() {
    for (int i = 1; i <= 10; i++) {
        rabbitTemplate.convertAndSend("work_queue", i + "hello work queue!");
    }
}

消費(fèi)者

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Log4j2
public class WorkQueueConsumer {
    /***
     * 消費(fèi)者1
     * @param message
     */
    @RabbitListener(queuesToDeclare = @Queue("work_queue"))
    public void workQueueConsumer(String message) throws InterruptedException {
        Thread.sleep(200);
        log.info("work_queue隊(duì)列消費(fèi)者1接收到了消息,消息內(nèi)容:{}", message);
    }
    /***
     * 消費(fèi)者2
     * @param message
     */
    @RabbitListener(queuesToDeclare = @Queue("work_queue"))
    public void workQueueConsumer2(String message) throws InterruptedException {
        Thread.sleep(400);
        log.info("work_queue隊(duì)列消費(fèi)者2接收到了消息,消息內(nèi)容:{}", message);
    }
}

5.【工作隊(duì)列模式(Work Queues)】

使用方式:

工作隊(duì)列模式也稱為任務(wù)隊(duì)列(Task Queues),它可以用來實(shí)現(xiàn)任務(wù)的異步處理。多個(gè)工作者(Worker)同時(shí)監(jiān)聽同一個(gè)隊(duì)列,當(dāng)有新的任務(wù)消息被發(fā)送到隊(duì)列中時(shí),空閑的工作者會獲取并處理這些任務(wù),確保任務(wù)能夠并行處理而不會重復(fù)執(zhí)行。

代碼示例: 生產(chǎn)者

/**
 * 發(fā)布訂閱
 * 一個(gè)生產(chǎn)者  多個(gè)隊(duì)列   多個(gè)消費(fèi)者   涉及 到交換機(jī)  fanout
 */
@GetMapping("publish/subscribe")
public void publishSubscribe() {
    // 第一個(gè)參數(shù): 交換機(jī)的名稱  沒有要求
    // 第二個(gè)參數(shù): 交換機(jī)和隊(duì)列的綁定規(guī)則    如果是發(fā)布訂閱模式 那么這個(gè)規(guī)則默認(rèn)不寫 只需要交換機(jī)和隊(duì)列綁定即可不需要規(guī)則
    // 第三個(gè)參數(shù): 消息內(nèi)容
    rabbitTemplate.convertAndSend("publish_subscribe_exchange", "",
            "hello publisher subscribe!!");
}

消費(fèi)者

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class PublisherSubscribeConsumer {
    private static final Logger log = LoggerFactory.getLogger(PublisherSubscribeConsumer.class);
    /**
     * 發(fā)布訂閱模型消費(fèi)者
     *
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("pb_sb_queue_01"),
            exchange = @Exchange(name = "publish_subscribe_exchange",
                    type = ExchangeTypes.FANOUT)))
    public void publisherSubscribe(String message) {
        log.info("發(fā)布訂閱模型消費(fèi)者1接收到了消息,消息內(nèi)容:{}", message);
    }
    /**
     * 發(fā)布訂閱模型消費(fèi)者
     *
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("pb_sb_queue_02"),
            exchange = @Exchange(name = "publish_subscribe_exchange", type = ExchangeTypes.FANOUT)))
    public void publisherSubscribe2(String message) {
        log.info("發(fā)布訂閱模型消費(fèi)者2接收到了消息,消息內(nèi)容:{}", message);
    }
}

6.【路由模式(Routing)】

使用方式:

路由模式允許發(fā)送者根據(jù)消息的路由鍵(Routing Key)將消息路由到特定的隊(duì)列。發(fā)送者將消息發(fā)送到交換機(jī),并且通過設(shè)置不同的路由鍵,使消息能夠被交換機(jī)路由到不同的隊(duì)列。消費(fèi)者可以根據(jù)需要選擇監(jiān)聽哪些隊(duì)列來接收消息。

代碼示例: 生產(chǎn)者

/**
 * 路由模型
 * 一個(gè)生產(chǎn)者  多個(gè)隊(duì)列   多個(gè)消費(fèi)者   涉及 到交換機(jī)  direct
 */
@GetMapping("routing")
public void routing() {
    // 第一個(gè)參數(shù): 交換機(jī)的名稱  沒有要求
    // 第二個(gè)參數(shù): 交換機(jī)和隊(duì)列的綁定規(guī)則    字符串 隨意
    // 第三個(gè)參數(shù): 消息內(nèi)容
    rabbitTemplate.convertAndSend("routing_exchange", "aaa",
            "hello routing!!");
}

消費(fèi)者

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Log4j2
public class RoutingConsumer {
    /**
     * 路由模型消費(fèi)者
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_01"),
            exchange = @Exchange(name = "routing_exchange", type = ExchangeTypes.DIRECT),
            key = { "abc", "error", "info" }))
    public void routingConsumer(String message) {
        log.info("路由模型消費(fèi)者1接收到了消息,消息內(nèi)容:{}", message);
    }
    /**
     * 路由模型消費(fèi)者
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_02"),
            exchange = @Exchange(name = "routing_exchange", type = ExchangeTypes.DIRECT),
            key = { "aaa", "ccc", "waadaffas" }))
    public void routingConsumer2(String message) {
        log.info("路由模型消費(fèi)者2接收到了消息,消息內(nèi)容:{}", message);
    }
    /**
     * 路由模型消費(fèi)者
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_03"),
            exchange = @Exchange(name = "routing_exchange", type = ExchangeTypes.DIRECT),
            key = { "bbbb", "asdfasd", "asdfasdf" }))
    public void routingConsumer3(String message) {
        log.info("路由模型消費(fèi)者3接收到了消息,消息內(nèi)容:{}", message);
    }
}

7.【主題模式(Topics)】

使用方式:

主題模式是路由模式的一種擴(kuò)展,它允許發(fā)送者根據(jù)消息的多個(gè)屬性(如主題)將消息路由到一個(gè)或多個(gè)隊(duì)列。主題交換機(jī)(Topic Exchange)使用通配符匹配路由鍵與隊(duì)列綁定鍵的模式,從而實(shí)現(xiàn)更靈活的消息路由和過濾。

代碼示例: 生產(chǎn)者

/**
 * 主題模型
 * 一個(gè)生產(chǎn)者  多個(gè)隊(duì)列   多個(gè)消費(fèi)者   涉及 到交換機(jī)  topic
 */
@GetMapping("topic")
public void topic() {
    // 第一個(gè)參數(shù): 交換機(jī)的名稱  沒有要求
    // 第二個(gè)參數(shù): 交換機(jī)和隊(duì)列的綁定規(guī)則    多個(gè)單詞  以 “.” 拼起來
    // 第三個(gè)參數(shù): 消息內(nèi)容
    rabbitTemplate.convertAndSend("topic_exchange", "bwie.age.name",
            "hello topic!!");
}

消費(fèi)者

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Log4j2
public class TopicConsumer {
    /**
     * *  表示任意一個(gè)單詞
     * #  表示任意一個(gè)單詞 或 多個(gè)
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic_queue_01"),
            exchange = @Exchange(name = "topic_exchange", type = ExchangeTypes.TOPIC),
            key = { "abc.*", "error.*.info", "#.name" }))
    public void topicConsumer(String message) {
        log.info("xxxxxxxxx1");
    }
    /**
     * *  表示任意一個(gè)單詞
     * #  表示任意一個(gè)單詞 或 多個(gè)
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic_queue_02"),
            exchange = @Exchange(name = "topic_exchange", type = ExchangeTypes.TOPIC),
            key = { "abc.*", "username" }))
    public void topicConsumer2(String message) {
        log.info("xxxxxxxxx2");
    }
    /**
     * *  表示任意一個(gè)單詞
     * #  表示任意一個(gè)單詞 或 多個(gè)
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic_queue_03"),
            exchange = @Exchange(name = "topic_exchange", type = ExchangeTypes.TOPIC),
            key = { "bwie.*", "error.*.info" }))
    public void topicConsumer3(String message) {
        log.info("xxxxxxxxx3");
    }
}

到此這篇關(guān)于SpringBoot 整合 RabbitMQ 的使用的文章就介紹到這了,更多相關(guān)SpringBoot 整合 RabbitMQ內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論