欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot集成MQ的過程(四種交換機(jī)的實(shí)例)

 更新時(shí)間:2025年03月12日 15:16:58   作者:Eliauk544  
本文介紹了RabbitMQ中四種交換機(jī)(直連、扇出、主題和頭交換機(jī))的使用方法,包括路由機(jī)制、典型場景和實(shí)現(xiàn)步驟,通過創(chuàng)建SpringBoot項(xiàng)目并配置交換機(jī)、隊(duì)列和消費(fèi)者,展示了如何發(fā)送和接收消息,每種交換機(jī)的示例代碼和測試步驟也一并提供,感興趣的朋友一起看看吧

?RabbitMQ交換機(jī)(Exchange)的核心作用

在RabbitMQ中,?交換機(jī) 是消息路由的核心組件,負(fù)責(zé)接收生產(chǎn)者發(fā)送的消息,并根據(jù)規(guī)則(如路由鍵、頭信息等)將消息分發(fā)到對(duì)應(yīng)的隊(duì)列中。
不同交換機(jī)類型決定了消息的路由邏輯,使用不同的交換機(jī)在不同的場景下可以提高消息系統(tǒng)的高可用性。

1. 直連交換機(jī)(Direct Exchange)?

?路由機(jī)制 ?

  • 精確匹配路由鍵(Routing Key)?:消息會(huì)被發(fā)送到與 Routing Key ?完全匹配 的隊(duì)列。
  • ?典型場景:一對(duì)一或一對(duì)多的精確消息分發(fā)。

應(yīng)用場景 ?

  • 任務(wù)分發(fā):如訂單處理系統(tǒng),根據(jù)訂單類型(如 order.payment、order.shipping)分發(fā)到不同隊(duì)列。
  • ?日志分類:將不同級(jí)別的日志(log.error、log.info)路由到對(duì)應(yīng)的處理服務(wù)。

 使用直連交換機(jī)實(shí)現(xiàn)消息發(fā)送和接收

1.創(chuàng)建一個(gè)SpringBoot項(xiàng)目,在yml文件配置如下:

server:
  port: 8021
spring: 
  application:
    name: rabbitmq-provider
  #配置rabbitMq 服務(wù)器
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

 2.初始化隊(duì)列和交換機(jī),并進(jìn)行綁定

package com.atguigu.demomq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * 功能:
 * 作者:程序員ZXY
 * 日期:2025/3/8 下午1:55
 */
@Configuration
public class DirectRabbitConfig {
    @Bean
    public  Queue TestDirectQueue(){
        return new Queue("TestDirectQueue",true);
    }
    @Bean
    DirectExchange TestDirectExchange(){
        return new DirectExchange("TestDirectExchange",true,false);
    }
    @Bean
    Binding bindingDirect(){
        return BindingBuilder.bind(TestDirectQueue())
                .to(TestDirectExchange())
                .with("TestDirectRouting");
    }
} 

 3.實(shí)現(xiàn)sendDirectMessage發(fā)送消息請(qǐng)求,由生產(chǎn)者發(fā)送到MQ,TestDirectRouting作為Key,用于精確轉(zhuǎn)發(fā)。

package com.atguigu.demomq;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
 * 功能:
 * 作者:程序員ZXY
 * 日期:2025/3/8 下午2:12
 */
@RestController
public class SendMessageController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @GetMapping("/sendDirectMessage")
    public String sendDirectMessage() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "Hello MQ!";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String,Object> map=new HashMap<>();
        map.put("messageId",messageId);
        map.put("messageData",messageData);
        map.put("createTime",createTime);
        //將消息攜帶綁定鍵值:TestDirectRouting 發(fā)送到交換機(jī)TestDirectExchange
        rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
        return "OK";
    }
}

4.此時(shí)就可以啟動(dòng)項(xiàng)目發(fā)送消息了,使用PostMan發(fā)送消息,返回OK說明發(fā)送成功

5.進(jìn)入http://localhost:15672/,可以看到消息發(fā)送成功,我這里是請(qǐng)求了兩次(也就是發(fā)了兩條消息)。

6.接下來寫消費(fèi)者的消費(fèi)過程,新創(chuàng)建一個(gè)SpringBoot項(xiàng)目,在yml文件配置如下

server:
  port: 8022
spring:
  application:
    name: rabbitmq-provider
  #配置rabbitMq 服務(wù)器
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

7.消費(fèi)者配置類,同樣TestDirectRouting用于唯一識(shí)別Key

package com.atguigu.demomq2;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * 功能:
 * 作者:程序員ZXY
 * 日期:2025/3/8 下午 
 */
@Configuration
public class DirectRabbitConfig {
    @Bean
    public Queue TestDirectQueue() {
        return new Queue("TestDirectQueue",true);
    }
    @Bean
    DirectExchange TestDirectExchange() {
        return new DirectExchange("TestDirectExchange");
    }
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
    }
}

8.消費(fèi)者 接收消息@RabbitListener(queues = "TestDirectQueue")用于監(jiān)聽指定隊(duì)列發(fā)送的消息

package com.atguigu.demomq2;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@RabbitListener(queues = "TestDirectQueue")
public class DirectReceiver {
    @RabbitHandler
    public void process(Map testMessage) {
        System.out.println("DirectReceiver消費(fèi)者收到消息  : " + testMessage.toString());
    }
}

 9.啟動(dòng)消費(fèi)者,成功接收消息

10.查看MQ控制臺(tái),消息成功被消費(fèi) 

2. 扇出交換機(jī)(Fanout Exchange)? ?

路由機(jī)制(一個(gè)交換機(jī)轉(zhuǎn)發(fā)到多個(gè)隊(duì)列)

  • 廣播模式:忽略 Routing Key,將消息發(fā)送到所有綁定的隊(duì)列。
  • ?典型場景:消息的全局通知或并行處理。

?應(yīng)用場景

  • ?實(shí)時(shí)通知系統(tǒng):如用戶注冊(cè)成功后,同時(shí)發(fā)送郵件、短信、更新緩存。
  • ?日志廣播:多個(gè)服務(wù)訂閱同一日志源,各自獨(dú)立處理。

 使用扇出交換機(jī)實(shí)現(xiàn)消息發(fā)送和接收

1.扇出交換機(jī)配置

package com.atguigu.demomq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutExchangeConfig {
    // 定義扇出交換機(jī)
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout.user.register", true, false);
    }
    // 定義郵件隊(duì)列
    @Bean
    public Queue emailQueue() {
        return new Queue("fanout.user.email", true);
    }
    // 定義短信隊(duì)列
    @Bean
    public Queue smsQueue() {
        return new Queue("fanout.user.sms", true);
    }
    // 綁定所有隊(duì)列到扇出交換機(jī)(無需路由鍵)
    @Bean
    public Binding emailBinding() {
        return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
    }
    @Bean
    public Binding smsBinding() {
        return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
    }
}

2.生產(chǎn)者

package com.atguigu.demomq;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class FanoutUserService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @GetMapping("/sendFanoutMessage")
    public String sendRegisterBroadcast() {
        rabbitTemplate.convertAndSend(
            "fanout.user.register", 
            "", // 扇出交換機(jī)忽略路由鍵
            "message MQ"
        );
        return "OK Fan";
    }
}

3.消費(fèi)者

package com.atguigu.demomq2;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class FanoutNotificationConsumer {
    @RabbitListener(queues = "fanout.user.email")
    public void handleEmail(String message) {
        System.out.println("[Email] Received: " + message);
    }
    @RabbitListener(queues = "fanout.user.sms")
    public void handleSms(String message) {
        System.out.println("[SMS] Received: " + message);
    }
}

4.請(qǐng)求并查看消費(fèi)結(jié)果 

可以看到一個(gè)交換機(jī)完成消費(fèi)兩條消息 

?3. 主題交換機(jī)(Topic Exchange)?

  • ?路由機(jī)制 ?模式匹配路由鍵:使用 *(匹配一個(gè)單詞)和 #(匹配多個(gè)單詞)通配符。?
  • 典型場景:靈活的多條件消息路由。 ?

應(yīng)用場景

  • ?新聞?dòng)嗛喯到y(tǒng):用戶訂閱特定主題(如 news.sports.*、news.tech.#)。?
  • 設(shè)備狀態(tài)監(jiān)控:根據(jù)設(shè)備類型和區(qū)域路由消息(如 sensor.temperature.room1)。

1.配置主題交換機(jī)

package com.atguigu.demomq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicExchangeConfig {
    // 定義主題交換機(jī)
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("topic.news", true, false);
    }
    // 定義體育新聞隊(duì)列
    @Bean
    public Queue sportsQueue() {
        return new Queue("topic.news.sports", true);
    }
    // 定義科技新聞隊(duì)列
    @Bean
    public Queue techQueue() {
        return new Queue("topic.news.tech", true);
    }
    // 綁定體育隊(duì)列:匹配 news.sports.*
    @Bean
    public Binding sportsBinding() {
        return BindingBuilder.bind(sportsQueue())
                .to(topicExchange())
                .with("news.sports.*");
    }
    // 綁定科技隊(duì)列:匹配 news.tech.#
    @Bean
    public Binding techBinding() {
        return BindingBuilder.bind(techQueue())
                .to(topicExchange())
                .with("news.tech.#");
    }
}

2.生產(chǎn)者

package com.atguigu.demomq;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TopicNewsService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @GetMapping("/sendTopicMessage1")
    public String  sendSportsNews() {
        rabbitTemplate.convertAndSend(
            "topic.news", 
            "news.sports.football",
                "* message:news.sports.football"
        );
        return "*OK";
    }
    @GetMapping("/sendTopicMessage2")
    public String sendTechNews() {
        rabbitTemplate.convertAndSend(
            "topic.news", 
            "news.tech.ai.abc.123456",
            "# message:news.tech.ai.abc.123456"
        );
        return "#OK";
    }
}

3. 消費(fèi)者

package com.atguigu.demomq2;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TopicNewsConsumer {
    @RabbitListener(queues = "topic.news.sports")
    public void handleSports(String message) {
        System.out.println("[Sports] Received: " + message);
    }
    @RabbitListener(queues = "topic.news.tech")
    public void handleTech(String message) {
        System.out.println("[Tech] Received: " + message);
    }
}

4.發(fā)送請(qǐng)求

 可以看到消息成功消費(fèi),第一個(gè)為*通配符,第二個(gè)為#通配符

?4. 頭交換機(jī)(Headers Exchange)?

?路由機(jī)制( 我的理解是一種基于 ?多條件組合 的消息路由機(jī)制 ?

  • ?基于消息頭(Headers)匹配:忽略 Routing Key,通過鍵值對(duì)(Headers)匹配隊(duì)列綁定的條件。
  • ?匹配規(guī)則x-match 參數(shù)設(shè)為 all(需全部匹配)或 any(匹配任意一個(gè))。

?應(yīng)用場景

  • ?復(fù)雜路由邏輯:如根據(jù)消息的版本號(hào)、語言等元數(shù)據(jù)路由。?
  • 多維度過濾:如同時(shí)匹配用戶類型(user_type: vip)和地理位置(region: asia)。

1.頭交換機(jī)配置

package com.atguigu.demomq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class HeadersExchangeConfig {
    // 定義頭交換機(jī)
    @Bean
    public HeadersExchange headersExchange() {
        return new HeadersExchange("headers.user", true, false);
    }
    // 定義VIP用戶隊(duì)列
    @Bean
    public Queue vipQueue() {
        return new Queue("headers.user.vip", true);
    }
    // 綁定VIP隊(duì)列,要求同時(shí)匹配 userType=vip 和 region=asia
    @Bean
    public Binding vipBinding() {
        Map<String, Object> headers = new HashMap<>();
        headers.put("userType", "vip");
        headers.put("region", "asia");
        return BindingBuilder.bind(vipQueue())
                .to(headersExchange())
                .whereAll(headers).match(); // whereAll 表示需全部匹配
    }
}

2.生產(chǎn)者

package com.atguigu.demomq;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class HeaderUserVipService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @GetMapping("/sendHeaderMessage")
    public String  sendVipMessage() {
        MessageProperties props = new MessageProperties();
        props.setHeader("userType", "vip");
        props.setHeader("region", "asia");
        Message msg = new Message("HeaderMessage".getBytes(), props);
        rabbitTemplate.send("headers.user", "", msg);
        return "OK";
    }
}

3.消費(fèi)者

package com.atguigu.demomq2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class HeaderUserVipConsumer {
    @RabbitListener(queues = "headers.user.vip")
    public void handleVip(Message message) {
        String body = new String(message.getBody());
        System.out.println("[VIP] Received: " + body);
    }
}

4.PostMan測試 

這里僅消費(fèi)交換機(jī)初始化時(shí)滿足所有設(shè)定條件的消息,我們可以測試一下不滿足條件時(shí)發(fā)送消息

消費(fèi)者不消費(fèi)消息 

總結(jié) 

需要代碼自己進(jìn)行測試的 可以Git自取

git clone https://gitee.com/myselfzxy/mq-producer.git

git clone https://gitee.com/myselfzxy/mq-customer.git

到此這篇關(guān)于SpringBoot集成MQ,四種交換機(jī)的實(shí)例的文章就介紹到這了,更多相關(guān)SpringBoot集成MQ內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • java中Class類的基礎(chǔ)知識(shí)點(diǎn)及實(shí)例

    java中Class類的基礎(chǔ)知識(shí)點(diǎn)及實(shí)例

    在本篇文章里小編給大家分享了關(guān)于java中Class類的基礎(chǔ)知識(shí)點(diǎn)及實(shí)例內(nèi)容,有興趣的朋友們可以學(xué)習(xí)下。
    2021-05-05
  • MybatisX無法自動(dòng)生成entity實(shí)體類的解決方法

    MybatisX無法自動(dòng)生成entity實(shí)體類的解決方法

    本文主要介紹了MybatisX無法自動(dòng)生成entity實(shí)體類的解決方法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-06-06
  • 基于Spring p標(biāo)簽和c標(biāo)簽注入方式

    基于Spring p標(biāo)簽和c標(biāo)簽注入方式

    這篇文章主要介紹了Spring p標(biāo)簽和c標(biāo)簽注入方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-09-09
  • Java編程語言特性和優(yōu)勢

    Java編程語言特性和優(yōu)勢

    這篇文章主要介紹了Java編程語言特性和優(yōu)勢,Java是一門面向?qū)ο缶幊陶Z言,不僅吸收了C++語言的各種優(yōu)點(diǎn),還摒棄了C++里難以理解的多繼承、指針等概念,同時(shí)也增加了垃圾回收機(jī)制,釋放掉不被使用的內(nèi)存空間,解決了管理內(nèi)存空間的煩惱,下面來聊聊Java編程語言特性和優(yōu)勢吧
    2022-01-01
  • Java實(shí)現(xiàn)批量化操作Excel文件的示例代碼

    Java實(shí)現(xiàn)批量化操作Excel文件的示例代碼

    在操作Excel的場景中,通常會(huì)有一些針對(duì)Excel的批量操作,這篇文章主要為大家詳細(xì)介紹了如何使用GcExcel實(shí)現(xiàn)批量化操作Excel,感興趣的可以了解一下
    2024-12-12
  • 在IDEA中配置tomcat并創(chuàng)建tomcat項(xiàng)目的圖文教程

    在IDEA中配置tomcat并創(chuàng)建tomcat項(xiàng)目的圖文教程

    這篇文章主要介紹了在IDEA中配置tomcat并創(chuàng)建tomcat項(xiàng)目的圖文教程,需要的朋友可以參考下
    2020-07-07
  • 基于Eclipse 的JSP/Servlet的開發(fā)環(huán)境的搭建(圖文)

    基于Eclipse 的JSP/Servlet的開發(fā)環(huán)境的搭建(圖文)

    本文將會(huì)詳細(xì)地展示如何搭建JSP的開發(fā)環(huán)境。本次教程使用的是最新版的Eclipse 2018-09編輯器和最新版的Apache Tomcat v9.0,步驟詳細(xì),內(nèi)容詳盡,適合零基礎(chǔ)學(xué)者作為學(xué)習(xí)參考
    2018-12-12
  • springboot如何統(tǒng)一設(shè)置時(shí)區(qū)

    springboot如何統(tǒng)一設(shè)置時(shí)區(qū)

    這篇文章主要介紹了springboot如何統(tǒng)一設(shè)置時(shí)區(qū)問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-01-01
  • 詳解使用spring aop實(shí)現(xiàn)業(yè)務(wù)層mysql 讀寫分離

    詳解使用spring aop實(shí)現(xiàn)業(yè)務(wù)層mysql 讀寫分離

    本篇文章主要介紹了使用spring aop實(shí)現(xiàn)業(yè)務(wù)層mysql 讀寫分離,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2017-01-01
  • Knife4j的請(qǐng)求示例當(dāng)中有很多空白行的問題解決辦法

    Knife4j的請(qǐng)求示例當(dāng)中有很多空白行的問題解決辦法

    這篇文章主要介紹了Knife4j的請(qǐng)求示例當(dāng)中有很多空白行的問題解決辦法,按正常來說不應(yīng)該有上方的空白,當(dāng)然如果只是查看我也不至于非要解決他,主要是假如接口是json傳參,調(diào)試界面都沒辦法修改參數(shù),遇到同樣問題的同學(xué)可以參考閱讀本文
    2024-09-09

最新評(píng)論