欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Springboot RabbitMQ 消息隊列使用示例詳解

 更新時間:2024年06月05日 10:12:34   作者:bj_wasin  
本文通過示例代碼介紹了Springboot RabbitMQ 消息隊列使用,對大家的學習或工作具有一定的參考借鑒價值,感興趣的朋友跟隨小編一起看看吧

一、概念介紹:

RabbitMQ中幾個重要的概念介紹:

  • Channels:信道,多路復用連接中的一條獨立的雙向數(shù)據(jù)流通道。信道是建立在真實的 TCP 連接內(nèi)地虛擬連接,AMQP 命令都是通過信道發(fā)出去的,不管是發(fā)布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對于操作系統(tǒng)來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復用一條 TCP 連接。
  • Exchanges:交換器,用來接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務器中的隊列。
  • 交換機類型主要有以下幾種:
  • Direct Exchange(直連交換機):這種類型的交換機根據(jù)消息的Routing Key(路由鍵)進行精確匹配,只有綁定了相同路由鍵的隊列才會收到消息。適用于點對點的消息傳遞場景。
  • Fanout Exchange(扇形交換機):這種類型的交換機采用廣播模式,它會將消息發(fā)送給所有綁定到該交換機的隊列,不管消息的路由鍵是什么。適用于消息需要被多個消費者處理的場景。
  • Topic Exchange(主題交換機):這種類型的交換機支持基于模式匹配的路由鍵,可以使用通配符*(匹配一個單詞)和#(匹配零個或多個單詞)進行匹配。適用于實現(xiàn)更復雜的消息路由邏輯。
  • Headers Exchange(頭交換機):這種類型的交換機不處理路由鍵,而是根據(jù)發(fā)送的消息內(nèi)容中的headers屬性進行匹配。適用于需要在消息頭中攜帶額外信息的場景。
  • Queues:消息隊列,用來保存消息直到發(fā)送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列里面,等待消費者連接到這個隊列將其取走。

二、引入依賴:

 <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>

三、添加配置信息

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    listener:
      simple:
        acknowledge-mode: manual  # 手動提交

四、Direct Exchange(直連交換機)模式

1、新建配置文件 RabbitDirectConfig類

package com.example.direct;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description: 直連交換機--這種類型的交換機根據(jù)消息的Routing Key(路由鍵)進行精確匹配,
 * 只有綁定了相同路由鍵的隊列才會收到消息。適用于點對點的消息傳遞場景
 */
@Configuration
public class RabbitDirectConfig {
    /**
     * 隊列名稱
     */
    public static final String QUEUE_MESSAGE ="QUEUE_MESSAGE";
    public static final String QUEUE_USER ="QUEUE_USER";
    /**
     * 交換機
     */
    public static final String EXCHANGE="EXCHANGE_01";
    /**
     * 路由
     */
    public static final String ROUTING_KEY="ROUTING_KEY_01";
    @Bean
    public Queue queue01() {
        return new Queue(QUEUE_MESSAGE, //隊列名稱
                true, //是否持久化
                false, //是否排他
                false //是否自動刪除
        );
    }
    @Bean
    public Queue queue02() {
        return new Queue(QUEUE_USER, //隊列名稱
                true, //是否持久化
                false, //是否排他
                false //是否自動刪除
        );
    }
    @Bean
    public DirectExchange exchange01() {
        return new DirectExchange(EXCHANGE,
                true, //是否持久化
                false //是否排他
        );
    }
    @Bean
    public Binding demoBinding() {
        return BindingBuilder.bind(queue01()).to(exchange01()).with(ROUTING_KEY);
    }
    @Bean
    public Binding demoBinding2() {
        return BindingBuilder.bind(queue02()).to(exchange01()).with(ROUTING_KEY);
    }
}

2、添加消息生產(chǎn)者 Producer類

package com.example.direct;
import com.example.entity.User;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description:
 */
@Component
public class Producer {
    @Resource
    RabbitTemplate rabbitTemplate;
    public void sendMessageByExchangeANdRoute(String message){
        rabbitTemplate.convertAndSend(RabbitDirectConfig.EXCHANGE, RabbitDirectConfig.ROUTING_KEY,message);
    }
    /**
     * 默認交換器,隱式地綁定到每個隊列,路由鍵等于隊列名稱。
     * @param message
     */
    public void sendMessageByQueue(String message){
        rabbitTemplate.convertAndSend(RabbitDirectConfig.QUEUE_MESSAGE,message);
    }
    public void sendMessage(User user){
        rabbitTemplate.convertAndSend(RabbitDirectConfig.QUEUE_USER,user);
    }
}

3、添加消息消費者

package com.example.direct;
import com.example.entity.User;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description:
 */
@Component
public class Consumer {
    @RabbitListener(queues = RabbitDirectConfig.QUEUE_USER)
    public void onMessage(User user){
        System.out.println("收到的實體bean消息:"+user);
    }
    @RabbitListener(queues = RabbitDirectConfig.QUEUE_MESSAGE)
    public void onMessage2(String message){
        System.out.println("收到的字符串消息:"+message);
    }
}

4、 測試

package com.example;
import com.example.entity.User;
import com.example.direct.Producer;
import com.example.fanout.FanoutProducer;
import com.example.topic.TopicProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
@SpringBootTest
class SpringbootRabbitMqApplicationTests {
    @Resource
    Producer producer;
    @Test
    public void sendMessage() throws InterruptedException {
        producer.sendMessageByQueue("哈哈");
        producer.sendMessage(new User().setAge(10).setName("wasin"));
    }
}

五、Topic Exchange(主題交換機)模式

1、新建RabbitTopicConfig類

package com.example.topic;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description: 主題交換機--這種類型的交換機支持基于模式匹配的路由鍵,
 * 可以使用通配符*(匹配一個單詞)和#(匹配零個或多個單詞)進行匹配。適用于實現(xiàn)更復雜的消息路由邏輯。
 */
@Configuration
public class RabbitTopicConfig {
    /**
     * 交換機
     */
    public static final String EXCHANGE = "EXCHANGE_TOPIC1";
    /**
     * 隊列名稱
     */
    public static final String QUEUE_TOPIC1 = "QUEUE_TOPIC";
    /**
     * 路由
     * "*" 與 "#",用于做模糊匹配。其中 "*" 用于匹配一個單詞,"#" 用于匹配多個單詞(可以是零個)
     * 可以匹配 aa.wasin.aa.bb  wasin.aa.bb  wasin.aa ....
     * aa.bb.wasin.cc 無法匹配
     */
    public static final String ROUTING_KEY1 = "*.wasin.#";
    @Bean
    public Queue queue() {
        return new Queue(QUEUE_TOPIC1, //隊列名稱
                true, //是否持久化
                false, //是否排他
                false //是否自動刪除
        );
    }
    @Bean
    public TopicExchange exchange() {
        return new TopicExchange(EXCHANGE,
                true, //是否持久化
                false //是否排他
        );
    }
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with(ROUTING_KEY1);
    }
}

2、新建 消息生產(chǎn)者和發(fā)送者

TopicProducer類

package com.example.topic;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description:
 */
@Component
public class TopicProducer {
    @Resource
    RabbitTemplate rabbitTemplate;
    /**
     * @param routeKey 路由
     * @param message 消息
     */
    public void sendMessageByQueue(String routeKey, String message){
        rabbitTemplate.convertAndSend(RabbitTopicConfig.EXCHANGE,routeKey,message);
    }
}

TopicConsumer類

package com.example.topic;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description:
 */
@Slf4j
@Component
public class TopicConsumer {
    @RabbitListener(queues = RabbitTopicConfig.QUEUE_TOPIC1)
    public void onMessage2(String message){
        log.info("topic收到的字符串消息:{}",message);
    }
}

六、Fanout Exchange(扇形交換機)模式

1、 新建 RabbitFanoutConfig類

package com.example.fanout;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description: 扇形交換機--這種類型的交換機采用廣播模式,它會將消息發(fā)送給所有綁定到該交換機的隊列,
 * 不管消息的路由鍵是什么。適用于消息需要被多個消費者處理的場景。
 */
@Configuration
public class RabbitFanoutConfig {
    /**
     * 交換機
     */
    public static final String EXCHANGE = "EXCHANGE_FANOUT";
    /**
     * 隊列名稱
     */
    public static final String QUEUE_FANOUT1 = "QUEUE_FANOUT";
    /**
     * 隊列名稱
     */
    public static final String QUEUE_FANOUT2 = "QUEUE_FANOUT2";
    @Bean
    public Queue queueFanout1() {
        return new Queue(QUEUE_FANOUT1, //隊列名稱
                true, //是否持久化
                false, //是否排他
                false //是否自動刪除
        );
    }
    @Bean
    public Queue queueFanout2() {
        return new Queue(QUEUE_FANOUT2, //隊列名稱
                true, //是否持久化
                false, //是否排他
                false //是否自動刪除
        );
    }
    @Bean
    public FanoutExchange exchangeFanout() {
        return new FanoutExchange(EXCHANGE,
                true, //是否持久化
                false //是否排他
        );
    }
    @Bean
    public Binding bindingFanout() {
        return BindingBuilder.bind(queueFanout1()).to(exchangeFanout());
    }
    @Bean
    public Binding bindingFanout2() {
        return BindingBuilder.bind(queueFanout2()).to(exchangeFanout());
    }
}

2、新建 消息生產(chǎn)者和發(fā)送者

FanoutProducer類:

package com.example.fanout;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description:
 */
@Component
public class FanoutProducer {
    @Resource
    RabbitTemplate rabbitTemplate;
    /**
     * @param message 消息
     */
    public void sendMessageByQueue(String message) {
        rabbitTemplate.convertAndSend(RabbitFanoutConfig.EXCHANGE, "", message);
    }
}

FanoutConsumer類

package com.example.fanout;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
 * @author wasin
 * @version 1.0
 * @date 2024/6/4
 * @description:
 */
@Slf4j
@Component
public class FanoutConsumer {
    /**
     * 手動提交
     * @param message
     * @param channel
     * @param tag
     * @throws IOException
     */
    @RabbitListener(queues = RabbitFanoutConfig.QUEUE_FANOUT1)
    public void onMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        log.info("fanout1收到的字符串消息:{}",message);
        channel.basicAck(tag,false);
    }
    @RabbitListener(queues = RabbitFanoutConfig.QUEUE_FANOUT2)
    public void onMessage2(String message){
        log.info("fanout2到的字符串消息:{}",message);
    }
}

到此這篇關(guān)于Springboot RabbitMQ 消息隊列使用的文章就介紹到這了,更多相關(guān)Springboot RabbitMQ 消息隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java遞歸造成的堆棧溢出問題及解決方案

    Java遞歸造成的堆棧溢出問題及解決方案

    在Java中,遞歸造成的堆棧溢出問題通常是因為遞歸調(diào)用的深度過大,導致調(diào)用??臻g不足,解決這類問題的一種常見方法是使用非遞歸的方式重寫算法,即使用迭代替代遞歸,需要的朋友可以參考下
    2024-08-08
  • Java OpenCV利用KNN算法實現(xiàn)圖像背景移除

    Java OpenCV利用KNN算法實現(xiàn)圖像背景移除

    這篇文章主要為大家介紹了Java OpenCV利用K最鄰近(KNN,K-NearestNeighbor)分類算法實現(xiàn)圖像背景移除的示例代碼,需要的可以參考一下
    2022-01-01
  • 深入剖析Java ArrayQueue(JDK)的源碼

    深入剖析Java ArrayQueue(JDK)的源碼

    本篇文章主要給大家介紹一個比較簡單的JDK為我們提供的容器ArrayQueue,這個容器主要是用數(shù)組實現(xiàn)的一個單向隊列,整體的結(jié)構(gòu)相對其他容器來說就比較簡單了,感興趣的可以了解一下
    2022-08-08
  • springboot如何使用自定義的aspect

    springboot如何使用自定義的aspect

    AOP面向切面編程在Spring Boot中實現(xiàn),通過在方法調(diào)用前后加入固定邏輯,實現(xiàn)橫切關(guān)注點的模塊化,主要涉及概念有:切面(Aspect)、連接點(Jointpoint)、通知(Advice)、切入點(Pointcut)、引入(Introduction)、目標對象(Targetobject)
    2024-11-11
  • 通過Java實現(xiàn)文件斷點續(xù)傳功能

    通過Java實現(xiàn)文件斷點續(xù)傳功能

    用戶上傳大文件,網(wǎng)絡(luò)差點的需要歷時數(shù)小時,萬一線路中斷,不具備斷點續(xù)傳的服務器就只能從頭重傳,而斷點續(xù)傳就是,允許用戶從上傳斷線的地方繼續(xù)傳送,這樣大大減少了用戶的煩惱。本文將用Java語言實現(xiàn)斷點續(xù)傳,需要的可以參考一下
    2022-05-05
  • Spring web集成rabbitmq代碼實例

    Spring web集成rabbitmq代碼實例

    這篇文章主要介紹了Spring web集成rabbitmq代碼實例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-01-01
  • Java異常處理操作 Throwable、Exception、Error

    Java異常處理操作 Throwable、Exception、Error

    這篇文章主要介紹了Java異常處理操作 Throwable、Exception、Error,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-06-06
  • 深入理解Spring MVC概要與環(huán)境配置

    深入理解Spring MVC概要與環(huán)境配置

    本篇文章主要介紹了深入理解Spring MVC概要與環(huán)境配置 ,具有一定的參考價值,感興趣的小伙伴們可以參考一下。
    2017-03-03
  • Mybatis3 if判斷字符串變態(tài)寫法

    Mybatis3 if判斷字符串變態(tài)寫法

    這篇文章主要介紹了Mybatis3 if判斷字符串變態(tài)的寫法,非常不錯,具有參考借鑒價值,需要的朋友參考下
    2017-01-01
  • Spring Boot多模塊化后,服務間調(diào)用的坑及解決

    Spring Boot多模塊化后,服務間調(diào)用的坑及解決

    這篇文章主要介紹了Spring Boot多模塊化后,服務間調(diào)用的坑及解決,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-06-06

最新評論