欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

微服務架構之使用RabbitMQ進行異步處理方式

 更新時間:2025年02月05日 14:41:22   作者:記得開心一點嘛  
本文介紹了RabbitMQ的基本概念、異步調用處理邏輯、RabbitMQ的基本使用方法以及在Spring Boot項目中使用RabbitMQ解決高并發(fā)問題,RabbitMQ是一種流行的消息隊列實現(xiàn),支持異步通信,可以有效解耦應用程序的不同部分,并將任務分發(fā)給多個消費者

一.什么是RabbitMQ?

RabbitMQ 是一種流行的消息隊列(Message Queue)實現(xiàn),基于 AMQP 協(xié)議(Advanced Message Queuing Protocol)。它支持異步通信,使多個系統(tǒng)之間以非阻塞的方式交換數(shù)據(jù)。

在我們使用微服務的時候,微服務一旦拆分,必然涉及到服務之間的相互調用,目前我們服務之間調用采用的都是基于 OpenFeign 的調用。這種調用中,調用者發(fā)起請求后需要等待服務提供者執(zhí)行業(yè)務返回結果后,才能繼續(xù)執(zhí)行后面的業(yè)務。也就是說調用者在調用過程中處于阻塞狀態(tài),因此我們稱這種調用方式為同步調用,也可以叫同步通。

如果我們的業(yè)務需要實時得到服務提供方的響應,則應該選擇同步通訊(同步調用)。而如果我們追求更高的效率,并且不需要實時響應,則應該選擇異步通訊(異步調用)。

二.異步調用處理邏輯:

異步調用方式其實就是基于消息通知的方式,一般包含三個角色:

  • 消息發(fā)送者:投遞消息的人,就是原來的調用方
  • 消息Broker:管理、暫存、轉發(fā)消息,你可以把它理解成微信服務器
  • 消息接收者:接收和處理消息的人,就是原來的服務提供方

除此之外還有:

  • Exchange(交換機):用于路由消息。RabbitMQ 有多種交換機類型(如 direct、topic、fanout、headers),它們決定了消息如何被傳遞到隊列。
  • Binding(綁定):連接交換機和隊列的規(guī)則。

在異步調用中,發(fā)送者不再直接同步調用接收者的業(yè)務接口,而是發(fā)送一條消息投遞給消息Broker。然后接收者根據(jù)自己的需求從消息Broker那里訂閱消息。每當發(fā)送方發(fā)送消息后,接受者都能獲取消息并處理。這樣,發(fā)送消息的人和接收消息的人就完全解耦了。

異步調用的優(yōu)勢包括:

  • 耦合度更低
  • 性能更好
  • 業(yè)務拓展性強
  • 故障隔離,避免級聯(lián)失敗

當然,異步通信也并非完美無缺,它存在下列缺點:

  • 完全依賴于Broker的可靠性、安全性和性能
  • 架構復雜,后期維護和調試麻煩

三.RabbitMQ的基本使用

下面是RabbitMQ的官網(wǎng):https://www.rabbitmq.com/

1.安裝

首先將RabbitMQ的鏡像拉取下來,然后運行下面命令:

docker run \
 -e RABBITMQ_DEFAULT_USER=hmall \
 -e RABBITMQ_DEFAULT_PASS=123 \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq \
 -p 15672:15672 \
 -p 5672:5672 \
 --network hm-net\
 -d \
 rabbitmq:3.8-management

docker run 命令中,您將容器的兩個端口暴露到主機上:

  • -p 15672:15672:將容器內部的 15672 端口映射到主機的 15672 端口。這個端口通常用于 RabbitMQ 的 Web 管理控制臺(Management Console)。
  • -p 5672:5672:將容器內部的 5672 端口映射到主機的 5672 端口。這個端口用于 AMQP 協(xié)議,即客戶端與 RabbitMQ 進行消息傳輸?shù)亩丝凇?/li>

隨后我們訪問http://虛擬機IP地址:15672來打開RabbitMQ的控制臺,默認賬號密碼是 guest / guest。

在控制臺上主要可以關注三個信息:Exchanges(交換機),Queues(隊列),Admin(用戶管理)。

2.架構圖

其中包含幾個概念:

  • publisher:生產(chǎn)者,也就是發(fā)送消息的一方
  • consumer:消費者,也就是消費消息的一方
  • queue:隊列,存儲消息。生產(chǎn)者投遞的消息會暫存在消息隊列中,等待消費者處理
  • exchange:交換機,負責消息路由。生產(chǎn)者發(fā)送的消息由交換機決定投遞到哪個隊列。
  • virtual host:虛擬主機,起到數(shù)據(jù)隔離的作用。每個虛擬主機相互獨立,有各自的exchange、queue

RabbitMQ 使用 AMQP 協(xié)議,核心的消息模型包括:

  1. Producer 將消息發(fā)送到 Exchange。
  2. Exchange 將消息路由到 Queue。
  3. ConsumerQueue 獲取消息進行處理。

RabbitMQ 消息生命周期

  1. 消息發(fā)布:Producer 將消息發(fā)送到 RabbitMQ。
  2. 消息存儲:消息通過交換機路由到一個或多個隊列,隊列暫存這些消息。
  3. 消息消費:Consumer 從隊列中獲取并處理消息。

3.RabbitMQ控制臺的使用

RabbitMQ 中,交換機(Exchange)隊列(Queue) 是核心概念。它們之間的關系決定了消息的路由和存儲方式。

(1)Exchanges 交換機

  • 交換機是 消息的路由器,負責決定消息應該被發(fā)送到哪個隊列。
  • 生產(chǎn)者將消息發(fā)送給交換機,而不是直接發(fā)送到隊列。
  • 交換機根據(jù)路由規(guī)則 決定消息的走向(即發(fā)往哪些隊列)。

Exchange(交換機)只負責轉發(fā)消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規(guī)則的隊列,那么消息會丟失!

RabbitMQ 提供了四種常用的交換機類型,每種類型的路由規(guī)則不同:

  • Fanout:廣播,將消息交給所有綁定到交換機的隊列。我們最早在控制臺使用的正是Fanout交換機。
  • Direct:訂閱,基于RoutingKey(路由key)發(fā)送給訂閱了消息的隊列
  • Topic:通配符訂閱,與Direct類似,只不過RoutingKey可以使用通配符,可以進行模糊匹配。
  • Headers:頭匹配,基于MQ的 header 信息來路由消息,用的較少。

我們可以再這里創(chuàng)建交換機,Name表示創(chuàng)建的交換機的名字,Type表示可以選擇交換機的四種類型。創(chuàng)建成功后就可以在上面看到創(chuàng)建的交換機名字:

比如我們點擊amq.fanout查看交換機數(shù)據(jù)并且可以發(fā)送消息給消費者。

注意?。。∪绻覀儾粚⒔粨Q機指定隊列的話,由于沒有消費者存在,最終消息丟失了,這樣說明交換機沒有存儲消息的能力。

所以下面我們要先創(chuàng)建隊列,然后讓生產(chǎn)者推送的消息經(jīng)過交換機的傳遞后,到達消息隊列,然后再給消費者。所以生產(chǎn)者無需知道隊列的存在以此來達到解耦的效果。

(2)Queues 隊列

  • 隊列用于 存儲消息,直到消費者消費它們。
  • 隊列與消費者一一對應,即一個消費者從一個隊列讀取消息。
  • 隊列按 FIFO(First In, First Out)的順序存儲消息。

交換機與隊列的關系:

交換機(Exchange)是消息的路由器,它決定了將消息發(fā)送到哪個隊列。隊列(Queue)是消息的存儲和處理地方。交換機本身并不知道具體的隊列,它只是通過綁定(Binding)決定消息應該被路由到哪些隊列。

交換機將消息通過路由鍵(Routing Key)發(fā)送到綁定的隊列,但交換機和隊列之間的連接并不是自動的,需要顯式地設置綁定。綁定指定了 交換機隊列 之間的關系,以及 路由規(guī)則(例如,路由鍵匹配的規(guī)則)。

在這里我們填寫隊列名字即可,其他暫時可以不用填寫。

隨后我們向交換機進行綁定(bind)隊列,隨后通過隊列傳輸給消費者。

這里的Routing key的出現(xiàn)是為了讓 Direct (交換機的類型)能夠選擇隊列而存在的。

我們在綁定隊列完成后會出現(xiàn)下面這樣,這樣證明我們成功為交換機綁定好兩個隊列:

隨后我們在下面窗口推送消息:

(3)Admin

①Users(用戶管理):

管理 RabbitMQ 中的用戶賬號,在這里 添加、刪除用戶,并設置每個用戶的權限。

每個用戶可分配不同的 角色

  • administrator:管理員,具有所有權限。
  • monitoring:可以監(jiān)控和查看信息,但不能管理。
  • policymaker:可以設置策略和參數(shù)。
  • management:可以訪問管理界面但沒有策略權限。

  • Nameitheima,也就是用戶名
  • Tagsadministrator,說明itheima用戶是超級管理員,擁有所有權限
  • Can access virtual host/,可以訪問的virtual host,這里的/是默認的virtual host

②Virtual Hosts(虛擬主機):

將 RabbitMQ 服務器劃分為多個 虛擬主機(vhost),類似于一個獨立的命名空間。

  • 不同的應用可以使用不同的虛擬主機,彼此隔離。
  • 每個虛擬主機都有自己的 交換機、隊列和用戶權限。

四.SpringAMOP的使用

Spring AMQPSpring for Advanced Message Queuing Protocol)是 Spring 提供的一個消息隊列集成模塊,主要用于簡化與 RabbitMQ 的集成。它通過 AMQP 協(xié)議來實現(xiàn)消息的生產(chǎn)和消費。

  • Publisher:生產(chǎn)者,不再發(fā)送消息到隊列中,而是發(fā)給交換機
  • Exchange:交換機,一方面,接收生產(chǎn)者發(fā)送的消息。另一方面,知道如何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。
  • Queue:消息隊列也與以前一樣,接收消息、緩存消息。不過隊列一定要與交換機綁定。
  • Consumer:消費者,與以前一樣,訂閱隊列,沒有變化

1.導入依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.添加配置

publisher以及consumer服務的application.yml中添加配置:

spring:
  rabbitmq:
    host: 192.168.150.101 # 你的虛擬機IP
    port: 5672 # 端口
    virtual-host: /hmall # 虛擬主機
    username: hmall # 用戶名
    password: 123 # 密碼
    listener:
      simple:
        prefetch: 1 # (能者多勞)每次只能獲取一條消息,處理完成才能獲取下一個消息

5672 端口用于 AMQP 協(xié)議,通常用于客戶端與 RabbitMQ 進行消息傳輸。,所以這里port綁定的是5672。

在 RabbitMQ 中,guest 用戶默認只能在 本地 連接到 RabbitMQ 實例。如果你希望使用 guest 用戶進行遠程連接(即從非本地機器連接 RabbitMQ),RabbitMQ 默認是 不允許 的。

3.在publisher服務中利用RabbitTemplate實現(xiàn)消息發(fā)送

RabbitTemplate 解釋:

RabbitTemplate 是 Spring AMQP 的核心實現(xiàn)類,它實現(xiàn)了 AmqpTemplate 接口,并且對 AmqpTemplate 提供了一些更高層次的封裝,簡化了消息發(fā)送和接收的操作。

RabbitTemplate 是大多數(shù) Spring AMQP 用戶的首選,它提供了很多便捷的方法和默認的行為,使得消息交互變得更加簡單。

主要方法:

  • convertAndSend: 與 AmqpTemplate 相同,提供了消息轉換并發(fā)送的功能。
  • receiveAndConvert: 從隊列中接收消息并轉換成 Java 對象。
  • send: 發(fā)送消息,不進行轉換。
  • receive: 從隊列中接收消息。

RabbitTemplate 為發(fā)送消息提供了更豐富的功能,如消息轉換器、默認交換機支持等,通常適用于大多數(shù)使用 Spring 的場景。

(1) 發(fā)送消息(convertAndSend)

rabbitTemplate.convertAndSend(String exchange, String routingKey, Object message);
  • exchange:指定交換機的名稱。
  • routingKey:消息的路由鍵。
  • message:消息體,可以是任何對象,RabbitTemplate 會自動將其轉換為消息格式(例如,JSON、文本等)。

此方法的作用是將消息發(fā)送到指定交換機,并且根據(jù)給定的路由鍵路由到相關隊列。

實例:

rabbitTemplate.convertAndSend("pay.topic", "pay.success", "支付成功的消息");

這里消息 "支付成功的消息" 被發(fā)送到 pay.topic 交換機,使用 pay.success 路由鍵路由到對應的隊列。

(2)接收消息(receiveAndConvert)

public <T> T receiveAndConvert(String queueName);
  • queueName:消息隊列的名稱。
  • 返回類型為 <T>,表示接收到的消息會被轉換為指定的類型。

receiveAndConvert 會從指定的隊列接收消息,并自動將消息轉換成目標對象類型。如果消息體不是可轉換的,方法將拋出異常。

String message = (String) rabbitTemplate.receiveAndConvert("pay.success.queue");
System.out.println("收到的消息: " + message);

這里,pay.success.queue 隊列中的消息將被接收,并自動轉換為 String 類型。

還有一種就是 receive 方法:

public Message receive(String queueName);
  • queueName:隊列名稱。
  • 返回的是 Message 類型,而不是直接轉換成對象。你可以通過 Message 對象獲取消息的內容和其他屬性。

實例:

Message message = rabbitTemplate.receive("pay.success.queue");
if (message != null) {
    System.out.println("收到的消息: " + new String(message.getBody()));
}

(3)發(fā)送消息并獲取響應(convertSendAndReceive)

public <T> T convertSendAndReceive(String exchange, String routingKey, Object message);
  • exchange:交換機名稱。
  • routingKey:路由鍵。
  • message:發(fā)送的消息對象。

convertSendAndReceive 方法既發(fā)送消息到交換機,也等待從隊列返回響應。它會根據(jù)指定的路由鍵將消息發(fā)送到交換機,并等待響應消息,然后將響應轉換為返回類型。

實例:

String response = (String) rabbitTemplate.convertSendAndReceive("pay.topic", "pay.success", "請求處理");
System.out.println("收到響應: " + response);

(4)發(fā)送消息到特定隊列

public void send(String exchange, String routingKey, Message message);
  • exchange:交換機名稱。
  • routingKey:路由鍵。
  • message:消息對象(可以是 Message 類型)。

send 方法允許你將一個 Message 對象發(fā)送到指定的交換機,并根據(jù)路由鍵進行路由。

Message message = new Message("支付成功".getBytes());
rabbitTemplate.send("pay.topic", "pay.success", message);

(5)設置消息監(jiān)聽器

RabbitTemplate 本身并不直接處理消息監(jiān)聽(接收消息),但是可以通過設置 RabbitListener 來監(jiān)聽消息,并將其與 RabbitTemplate 配合使用。一般來說,消息接收和處理是在 @RabbitListener 注解的監(jiān)聽器方法中完成的。

@RabbitListener(queues = "pay.success.queue")
public void receiveMessage(String message) {
    System.out.println("接收到消息: " + message);
}

下面看一個代碼實例:

@SpringBootTest
public class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleQueue() {
        // 隊列名稱
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 發(fā)送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

4.定義消費者實現(xiàn)異步調用

@Component
@RequiredArgsConstructor
public class PayStatusListener {

    private final IOrderService orderService;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "trade.pay.success.queue", durable = "true"),
            exchange = @Exchange(name = "pay.topic"),
            key = "pay.success"
    ))
    public void listenPaySuccess(Long orderId){
        //調用方法
        orderService.markOrderPaySuccess(orderId);
    }
}

①@RequiredArgsConstructor

這是 Lombok 提供的注解,自動為類中所有 final 修飾的字段生成一個包含這些字段的構造函數(shù)。

使用這個注解可以免去手動編寫構造函數(shù)的麻煩,尤其是在使用依賴注入時(例如注入 IOrderService)。

②@RabbitListener

@RabbitListener 注解用于監(jiān)聽來自 RabbitMQ 隊列的消息。

它會自動監(jiān)聽指定的隊列,當有消息到達時,會觸發(fā) listenPaySuccess 方法進行處理。

③QueueBinding(隊列綁定)

通過 @QueueBinding 注解,綁定了 隊列交換機,并指定了 路由鍵。

@Queue

  • name = "trade.pay.success.queue":指定隊列的名稱為 trade.pay.success.queue。
  • durable = "true":表示隊列是 持久化 的,即 RabbitMQ 重啟后隊列依然存在。
  • 隊列的作用:隊列是消息的臨時存儲地,消費者會從隊列中拉取消息并處理。

@Exchange

  • name = "pay.topic":指定交換機的名稱為 pay.topic,這是一個 Topic Exchange(主題交換機)。
  • 交換機的作用:交換機決定消息如何路由到隊列。Topic Exchange 可以根據(jù)路由鍵的匹配規(guī)則將消息路由到合適的隊列。

key = "pay.success"

  • 路由鍵:指定了路由鍵為 pay.success。這意味著當生產(chǎn)者發(fā)送的消息路由鍵是 pay.success 時,消息將被路由到 trade.pay.success.queue 隊列。

5.總流程處理過程

  • 生產(chǎn)者:在支付成功后,生產(chǎn)者會發(fā)送一條消息到 pay.topic 交換機,消息的路由鍵為 pay.success。
  • 交換機pay.topic 交換機會根據(jù)路由鍵 pay.success 將消息路由到 trade.pay.success.queue 隊列。
  • 消費者PayStatusListener 作為消費者監(jiān)聽 trade.pay.success.queue,當有消息到達隊列時,它會接收到訂單 ID 并調用訂單服務更新訂單狀態(tài)。

五.使用配置類管理定義交換機,隊列及兩者關系

在 Spring AMQP 中,交換機(Exchange)、隊列(Queue)、以及綁定(Binding)可以通過配置類來定義和管理。配置類可以幫助你靈活地創(chuàng)建和綁定交換機與隊列,并且可以根據(jù)業(yè)務需求自定義各種參數(shù)。

創(chuàng)建配置類效果展示:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    // 創(chuàng)建隊列
    @Bean
    public Queue queue() {
        return new Queue("trade.pay.success.queue", true); // durable=true 表示隊列持久化
    }

    // 創(chuàng)建交換機
    @Bean
    public TopicExchange exchange() {
        return new TopicExchange("pay.topic"); // 創(chuàng)建主題交換機
    }

    // 創(chuàng)建綁定關系(隊列與交換機通過 routing key 綁定)
    @Bean
    public Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("pay.success"); // 路由鍵是 pay.success
    }
}

1.創(chuàng)建隊列:Queue

@Bean
public Queue queue() {
    return new Queue("trade.pay.success.queue", true);
}

作用:通過 @Bean 注解定義了一個隊列 Bean。Spring 容器會自動管理這個隊列,并在 RabbitMQ 上創(chuàng)建該隊列。

參數(shù)

  • "trade.pay.success.queue":這是隊列的名稱。每個隊列在 RabbitMQ 中必須有唯一的名稱。
  • true:表示這個隊列是 持久化 的。持久化的隊列在 RabbitMQ 服務重啟后依然存在。
  • 隊列還可以設置其他屬性,例如是否自動刪除(autoDelete)、是否排他性(exclusive)等,默認是 false。

2.創(chuàng)建交換機:Exchange

@Bean
public TopicExchange exchange() {
    return new TopicExchange("pay.topic");
}
  • 作用:通過 @Bean 注解定義了一個 Topic Exchange 類型的交換機。
  • 參數(shù)"pay.topic":這是交換機的名稱。同樣,交換機在 RabbitMQ 中也必須有唯一的名稱。TopicExchange 支持通過通配符(如 *#)進行更靈活的消息匹配。

Topic Exchange 是一種交換機類型,它允許使用通配符來進行路由。例如,路由鍵可以是 "pay.*",可以匹配 "pay.success""pay.failure"。在這里可以使用四種交換機類型來定義交換機,具體場景具體分析使用。

3.創(chuàng)建綁定關系:Binding

@Bean
public Binding binding(Queue queue, TopicExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with("pay.success");
}

作用:通過 @Bean 注解定義了隊列和交換機的綁定關系。這個綁定決定了消息在何種條件下會從交換機路由到隊列。

參數(shù)

  • queue:隊列 trade.pay.success.queue 被傳入,表示將該隊列與交換機綁定。
  • exchange:交換機 pay.topic 被傳入,表示將隊列與該交換機綁定。
  • .with("pay.success"):指定了路由鍵 pay.success,路由鍵 pay.success 會告訴交換機,如果生產(chǎn)者發(fā)送的消息帶有這個路由鍵,那么該消息就會路由到 trade.pay.success.queue 隊列。

4.多個隊列綁定到同一個交換機

我們可以將多個隊列綁定到同一個交換機,并使用不同的路由鍵。這樣可以實現(xiàn)根據(jù)不同的路由鍵來發(fā)送不同類型的消息到各自的隊列。

@Bean
public Queue paySuccessQueue() {
    return new Queue("pay.success.queue", true);
}

@Bean
public Queue payFailureQueue() {
    return new Queue("pay.failure.queue", true);
}

@Bean
public Binding paySuccessBinding(Queue paySuccessQueue, TopicExchange exchange) {
    return BindingBuilder.bind(paySuccessQueue).to(exchange).with("pay.success");
}

@Bean
public Binding payFailureBinding(Queue payFailureQueue, TopicExchange exchange) {
    return BindingBuilder.bind(payFailureQueue).to(exchange).with("pay.failure");
}
  • 在這個例子中,pay.success.queuepay.failure.queue 都綁定到同一個交換機 pay.topic,但使用不同的路由鍵。
  • 消息路由邏輯
    • 當生產(chǎn)者發(fā)送路由鍵為 pay.success 的消息時,消息會路由到 pay.success.queue 隊列。
    • 當生產(chǎn)者發(fā)送路由鍵為 pay.failure 的消息時,消息會路由到 pay.failure.queue 隊列。
  • BindingBuilder.bind(paySuccessQueue)paySuccessQueue 參數(shù)就是綁定到交換機的隊列對象,也就是 pay.success.queue。它會被傳遞給 BindingBuilder,并且指定此隊列和交換機的綁定關系。
  • .to(exchange):這里將隊列和交換機綁定,指定消息通過交換機路由到隊列。
  • .with("pay.success"):指定路由鍵為 pay.success,意思是只有路由鍵為 pay.success 的消息才會被路由到 paySuccessQueue 隊列中。

在 Spring AMQP 中,隊列對象(如 paySuccessQueue)是由 @Bean 注解的 Queue 類型方法返回的。Spring 會自動將返回的隊列對象放入到 Spring 容器中,并注入到需要它的地方。所以當我們在 Binding 中使用 paySuccessQueue 時,實際上是在引用之前構造并注冊到 Spring 容器中的隊列實例。

在 Spring 的上下文中,paySuccessQueuepayFailureQueue 是已經(jīng)被創(chuàng)建并管理的隊列對象,我們不需要手動創(chuàng)建隊列,只需要在 Binding 中通過引用這些對象來建立隊列與交換機之間的關系。

5.配置不同類型的交換機

除了 Topic Exchange,RabbitMQ 還支持其他幾種常見的交換機類型。這里分別演示如何創(chuàng)建 Direct ExchangeFanout ExchangeHeaders Exchange。

(1)Direct Exchange

@Bean
public DirectExchange directExchange() {
    return new DirectExchange("direct.exchange");
}

@Bean
public Binding directBinding(Queue queue, DirectExchange directExchange) {
    return BindingBuilder.bind(queue).to(directExchange).with("direct.routing.key");
}

Direct Exchange:直接交換機會根據(jù) 完全匹配的路由鍵 將消息發(fā)送到隊列。只有當消息的路由鍵和綁定的路由鍵 完全一致 時,消息才會被路由到指定隊列。

(2)Fanout Exchange

@Bean
public FanoutExchange fanoutExchange() {
    return new FanoutExchange("fanout.exchange");
}

@Bean
public Binding fanoutBinding(Queue queue, FanoutExchange fanoutExchange) {
    return BindingBuilder.bind(queue).to(fanoutExchange);  // 不需要路由鍵
}

Fanout Exchange:扇出交換機會將消息發(fā)送到所有綁定的隊列,不需要考慮路由鍵。這個交換機通常用于廣播消息。

(3)Headers Exchange

@Bean
public HeadersExchange headersExchange() {
    return new HeadersExchange("headers.exchange");
}

@Bean
public Binding headersBinding(Queue queue, HeadersExchange headersExchange) {
    return BindingBuilder.bind(queue).to(headersExchange).where("header-key").matches("header-value");
}

Headers Exchange:頭交換機根據(jù)消息頭的內容進行路由,而不是依賴路由鍵。適用于按消息的元數(shù)據(jù)進行路由的場景。

總結:

通過 Spring AMQP 的配置類,你可以非常靈活地定義 RabbitMQ 的 交換機、隊列綁定關系,并通過不同的路由鍵和交換機類型實現(xiàn)復雜的消息路由邏輯。

以下是一些關鍵要點:

  1. 隊列(Queue):消息的臨時存儲地,可以是持久化的。
  2. 交換機(Exchange):控制消息如何分發(fā)到不同的隊列。
    • Direct Exchange:嚴格匹配路由鍵。
    • Topic Exchange:支持通配符匹配路由鍵。
    • Fanout Exchange:廣播消息到所有綁定的隊列。
    • Headers Exchange:根據(jù)消息頭的內容進行路由。
  3. 綁定(Binding):將隊列與交換機連接起來,使用路由鍵來決定消息的流向。

通過配置類來定義這些組件,能夠簡化 RabbitMQ 與 Spring 應用的集成,并且通過靈活的路由規(guī)則支持復雜的消息傳遞需求。

六.在Springboot項目中使用RabbitMQ解決高并發(fā)問題

在 Spring Boot 項目中使用 RabbitMQ 來實現(xiàn)消息傳輸,處理并發(fā)問題是非常常見的一種方式。RabbitMQ 可以通過解耦應用程序的不同部分,并將任務分發(fā)給多個消費者,從而有效地解決并發(fā)和負載均衡問題。

1.引入依賴

首先,需要在 pom.xml 文件中引入 spring-boot-starter-amqp 依賴,這樣 Spring Boot 就可以與 RabbitMQ 集成:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
</dependencies>

2.配置RabbitMQ

application.ymlapplication.properties 文件中,配置 RabbitMQ 的連接信息:

spring:
  rabbitmq:
    host: localhost
    port: 15672
    username: hmall
    password: 123
    virtual-host: /
    listener:
      simple:
        concurrency: 3  # 設置消費者的最小數(shù)量
        max-concurrency: 10  # 設置消費者的最大數(shù)量

3.RabbitMQ配置類

創(chuàng)建一個配置類,用于聲明隊列、交換機、綁定等。

package com.example.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    // 聲明隊列
    @Bean
    public Queue taskQueue() {
        return new Queue("taskQueue", true);  // true表示隊列持久化
    }

    // 聲明交換機
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("taskExchange");
    }

    // 綁定隊列和交換機
    @Bean
    public Binding binding(Queue taskQueue, TopicExchange topicExchange) {
        return new Binding("taskQueue",
                Binding.DestinationType.QUEUE,
                "taskExchange",
                "task.#", // 使用路由鍵
                null);
    }
}

4.創(chuàng)建消息生產(chǎn)者(Producer)

創(chuàng)建一個消息生產(chǎn)者(Producer),它將消息發(fā)送到 RabbitMQ 隊列。

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class TaskProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 發(fā)送消息
    public void sendMessage(String message) {
        System.out.println("Sending message: " + message);
        rabbitTemplate.convertAndSend("taskExchange", "task.new", message);  // 發(fā)送到交換機,使用路由鍵
    }
}

5.創(chuàng)建消息消費者(Consumer)

消費者從 RabbitMQ 隊列中異步接收消息,并進行并發(fā)處理。在消費者類中,使用 @RabbitListener 注解監(jiān)聽隊列,確保多個消費者可以同時處理消息。

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class TaskConsumer {

    // 監(jiān)聽隊列并異步處理消息
    @RabbitListener(queues = "taskQueue")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
        
        // 模擬處理消息的業(yè)務邏輯
        try {
            Thread.sleep(1000);  // 模擬耗時操作
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Finished processing message: " + message);
    }
}

6.增加并發(fā)消費者(多個消費者)

為了處理高并發(fā),我們可以通過設置 concurrencymax-concurrency 來控制消費者的最小和最大并發(fā)數(shù)。這樣,多個消費者可以同時處理來自隊列的消息。我們已經(jīng)在 application.yml 中配置了 concurrency,現(xiàn)在的配置允許最多啟動 20 個消費者來處理消息。

  • 最小并發(fā)數(shù):設置為 5,意味著在任何時候都會至少啟動 5 個消費者。
  • 最大并發(fā)數(shù):設置為 20,表示 RabbitMQ 可以根據(jù)負載增加最多 20 個消費者。

總結

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關文章

  • java中1+1d/5和1+1/5的區(qū)別說明

    java中1+1d/5和1+1/5的區(qū)別說明

    這篇文章主要介紹了java中1+1d/5和1+1/5的區(qū)別說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-10-10
  • Java的訪問修飾符與變量的作用域講解

    Java的訪問修飾符與變量的作用域講解

    這篇文章主要介紹了Java的訪問修飾符與變量的作用域講解,是Java入門學習中的基礎知識,需要的朋友可以參考下
    2015-09-09
  • 解決java main函數(shù)中的args數(shù)組傳值問題

    解決java main函數(shù)中的args數(shù)組傳值問題

    這篇文章主要介紹了解決java main函數(shù)中的args數(shù)組傳值問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-02-02
  • Java跳出多重嵌套循環(huán)過程解析

    Java跳出多重嵌套循環(huán)過程解析

    這篇文章主要介紹了Java跳出多重嵌套循環(huán)過程解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2019-12-12
  • 一文詳解Java抽象類到底有多抽象

    一文詳解Java抽象類到底有多抽象

    這篇文章主要介紹了一文詳解Java抽象類到底有多抽象,抽象方法所在的類必須是抽象類,子類若繼承了一個抽象類,就必須覆寫父類的所有抽象方法,這里的子類是普通類,是強制要求覆寫所有抽象方法,但是如果子類也是一個抽象類,那么就可以不覆寫
    2022-06-06
  • Spring Boot 簡單使用EhCache緩存框架的方法

    Spring Boot 簡單使用EhCache緩存框架的方法

    本篇文章主要介紹了Spring Boot 簡單使用EhCache緩存框架的方法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2018-07-07
  • SpringBoot 自定義注解異步記錄復雜日志詳解

    SpringBoot 自定義注解異步記錄復雜日志詳解

    這篇文章主要為大家介紹了SpringBoot 自定義注解異步記錄復雜日志詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-09-09
  • MyBatis不用@Param傳遞多個參數(shù)的操作

    MyBatis不用@Param傳遞多個參數(shù)的操作

    這篇文章主要介紹了MyBatis不用@Param傳遞多個參數(shù)的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-02-02
  • Java?IO流之StringWriter和StringReader用法分析

    Java?IO流之StringWriter和StringReader用法分析

    這篇文章主要介紹了Java?IO流之StringWriter和StringReader用法分析,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • IntelliJ IDEA 2023.2正式發(fā)布新UI和Profiler轉正(最新推薦)

    IntelliJ IDEA 2023.2正式發(fā)布新UI和Profiler轉正(最新推薦)

    北京時間2023年7月26日,IntelliJ IDEA 2023.2正式發(fā)布,IntelliJ IDEA 2023.2 引入 AI Assistant(AI助手),通過一組由 AI 提供支持的功能助力開發(fā),今天給大家分享IntelliJ IDEA 2023.2正式發(fā)布新UI和Profiler轉正,感興趣的朋友一起看看吧
    2023-10-10

最新評論