微服務(wù)架構(gòu)之使用RabbitMQ進(jìn)行異步處理方式
一.什么是RabbitMQ?
RabbitMQ 是一種流行的消息隊列(Message Queue)實現(xiàn),基于 AMQP 協(xié)議(Advanced Message Queuing Protocol)。它支持異步通信,使多個系統(tǒng)之間以非阻塞的方式交換數(shù)據(jù)。
在我們使用微服務(wù)的時候,微服務(wù)一旦拆分,必然涉及到服務(wù)之間的相互調(diào)用,目前我們服務(wù)之間調(diào)用采用的都是基于 OpenFeign 的調(diào)用。這種調(diào)用中,調(diào)用者發(fā)起請求后需要等待服務(wù)提供者執(zhí)行業(yè)務(wù)返回結(jié)果后,才能繼續(xù)執(zhí)行后面的業(yè)務(wù)。也就是說調(diào)用者在調(diào)用過程中處于阻塞狀態(tài),因此我們稱這種調(diào)用方式為同步調(diào)用,也可以叫同步通訊。
如果我們的業(yè)務(wù)需要實時得到服務(wù)提供方的響應(yīng),則應(yīng)該選擇同步通訊(同步調(diào)用)。而如果我們追求更高的效率,并且不需要實時響應(yīng),則應(yīng)該選擇異步通訊(異步調(diào)用)。
二.異步調(diào)用處理邏輯:
異步調(diào)用方式其實就是基于消息通知的方式,一般包含三個角色:
- 消息發(fā)送者:投遞消息的人,就是原來的調(diào)用方
- 消息Broker:管理、暫存、轉(zhuǎn)發(fā)消息,你可以把它理解成微信服務(wù)器
- 消息接收者:接收和處理消息的人,就是原來的服務(wù)提供方

除此之外還有:
- Exchange(交換機(jī)):用于路由消息。RabbitMQ 有多種交換機(jī)類型(如 direct、topic、fanout、headers),它們決定了消息如何被傳遞到隊列。
- Binding(綁定):連接交換機(jī)和隊列的規(guī)則。
在異步調(diào)用中,發(fā)送者不再直接同步調(diào)用接收者的業(yè)務(wù)接口,而是發(fā)送一條消息投遞給消息Broker。然后接收者根據(jù)自己的需求從消息Broker那里訂閱消息。每當(dāng)發(fā)送方發(fā)送消息后,接受者都能獲取消息并處理。這樣,發(fā)送消息的人和接收消息的人就完全解耦了。
異步調(diào)用的優(yōu)勢包括:
- 耦合度更低
- 性能更好
- 業(yè)務(wù)拓展性強(qiáng)
- 故障隔離,避免級聯(lián)失敗
當(dāng)然,異步通信也并非完美無缺,它存在下列缺點:
- 完全依賴于Broker的可靠性、安全性和性能
- 架構(gòu)復(fù)雜,后期維護(hù)和調(diào)試麻煩
三.RabbitMQ的基本使用
下面是RabbitMQ的官網(wǎng):https://www.rabbitmq.com/
1.安裝
首先將RabbitMQ的鏡像拉取下來,然后運行下面命令:
docker run \ -e RABBITMQ_DEFAULT_USER=hmall \ -e RABBITMQ_DEFAULT_PASS=123 \ -v mq-plugins:/plugins \ --name mq \ --hostname mq \ -p 15672:15672 \ -p 5672:5672 \ --network hm-net\ -d \ rabbitmq:3.8-management
在 docker run 命令中,您將容器的兩個端口暴露到主機(jī)上:
-p 15672:15672:將容器內(nèi)部的15672端口映射到主機(jī)的15672端口。這個端口通常用于 RabbitMQ 的 Web 管理控制臺(Management Console)。-p 5672:5672:將容器內(nèi)部的5672端口映射到主機(jī)的5672端口。這個端口用于 AMQP 協(xié)議,即客戶端與 RabbitMQ 進(jìn)行消息傳輸?shù)亩丝凇?/li>
隨后我們訪問http://虛擬機(jī)IP地址:15672來打開RabbitMQ的控制臺,默認(rèn)賬號密碼是 guest / guest。

在控制臺上主要可以關(guān)注三個信息:Exchanges(交換機(jī)),Queues(隊列),Admin(用戶管理)。
2.架構(gòu)圖

其中包含幾個概念:
publisher:生產(chǎn)者,也就是發(fā)送消息的一方consumer:消費者,也就是消費消息的一方queue:隊列,存儲消息。生產(chǎn)者投遞的消息會暫存在消息隊列中,等待消費者處理exchange:交換機(jī),負(fù)責(zé)消息路由。生產(chǎn)者發(fā)送的消息由交換機(jī)決定投遞到哪個隊列。virtual host:虛擬主機(jī),起到數(shù)據(jù)隔離的作用。每個虛擬主機(jī)相互獨立,有各自的exchange、queue
RabbitMQ 使用 AMQP 協(xié)議,核心的消息模型包括:
- Producer 將消息發(fā)送到 Exchange。
- Exchange 將消息路由到 Queue。
- Consumer 從 Queue 獲取消息進(jìn)行處理。
RabbitMQ 消息生命周期:
- 消息發(fā)布:Producer 將消息發(fā)送到 RabbitMQ。
- 消息存儲:消息通過交換機(jī)路由到一個或多個隊列,隊列暫存這些消息。
- 消息消費:Consumer 從隊列中獲取并處理消息。
3.RabbitMQ控制臺的使用
在 RabbitMQ 中,交換機(jī)(Exchange) 和 隊列(Queue) 是核心概念。它們之間的關(guān)系決定了消息的路由和存儲方式。
(1)Exchanges 交換機(jī)
- 交換機(jī)是 消息的路由器,負(fù)責(zé)決定消息應(yīng)該被發(fā)送到哪個隊列。
- 生產(chǎn)者將消息發(fā)送給交換機(jī),而不是直接發(fā)送到隊列。
- 交換機(jī)根據(jù)路由規(guī)則 決定消息的走向(即發(fā)往哪些隊列)。
Exchange(交換機(jī))只負(fù)責(zé)轉(zhuǎn)發(fā)消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規(guī)則的隊列,那么消息會丟失!
RabbitMQ 提供了四種常用的交換機(jī)類型,每種類型的路由規(guī)則不同:
- Fanout:廣播,將消息交給所有綁定到交換機(jī)的隊列。我們最早在控制臺使用的正是Fanout交換機(jī)。
- Direct:訂閱,基于RoutingKey(路由key)發(fā)送給訂閱了消息的隊列
- Topic:通配符訂閱,與Direct類似,只不過RoutingKey可以使用通配符,可以進(jìn)行模糊匹配。
- Headers:頭匹配,基于MQ的 header 信息來路由消息,用的較少。

我們可以再這里創(chuàng)建交換機(jī),Name表示創(chuàng)建的交換機(jī)的名字,Type表示可以選擇交換機(jī)的四種類型。創(chuàng)建成功后就可以在上面看到創(chuàng)建的交換機(jī)名字:

比如我們點擊amq.fanout查看交換機(jī)數(shù)據(jù)并且可以發(fā)送消息給消費者。
注意?。?!如果我們不將交換機(jī)指定隊列的話,由于沒有消費者存在,最終消息丟失了,這樣說明交換機(jī)沒有存儲消息的能力。
所以下面我們要先創(chuàng)建隊列,然后讓生產(chǎn)者推送的消息經(jīng)過交換機(jī)的傳遞后,到達(dá)消息隊列,然后再給消費者。所以生產(chǎn)者無需知道隊列的存在以此來達(dá)到解耦的效果。
(2)Queues 隊列
- 隊列用于 存儲消息,直到消費者消費它們。
- 隊列與消費者一一對應(yīng),即一個消費者從一個隊列讀取消息。
- 隊列按 FIFO(First In, First Out)的順序存儲消息。
交換機(jī)與隊列的關(guān)系:
交換機(jī)(Exchange)是消息的路由器,它決定了將消息發(fā)送到哪個隊列。隊列(Queue)是消息的存儲和處理地方。交換機(jī)本身并不知道具體的隊列,它只是通過綁定(Binding)來決定消息應(yīng)該被路由到哪些隊列。
交換機(jī)將消息通過路由鍵(Routing Key)發(fā)送到綁定的隊列,但交換機(jī)和隊列之間的連接并不是自動的,需要顯式地設(shè)置綁定。綁定指定了 交換機(jī) 和 隊列 之間的關(guān)系,以及 路由規(guī)則(例如,路由鍵匹配的規(guī)則)。

在這里我們填寫隊列名字即可,其他暫時可以不用填寫。
隨后我們向交換機(jī)進(jìn)行綁定(bind)隊列,隨后通過隊列傳輸給消費者。

這里的Routing key的出現(xiàn)是為了讓 Direct (交換機(jī)的類型)能夠選擇隊列而存在的。
我們在綁定隊列完成后會出現(xiàn)下面這樣,這樣證明我們成功為交換機(jī)綁定好兩個隊列:

隨后我們在下面窗口推送消息:

(3)Admin

①Users(用戶管理):
管理 RabbitMQ 中的用戶賬號,在這里 添加、刪除用戶,并設(shè)置每個用戶的權(quán)限。
每個用戶可分配不同的 角色:
- administrator:管理員,具有所有權(quán)限。
- monitoring:可以監(jiān)控和查看信息,但不能管理。
- policymaker:可以設(shè)置策略和參數(shù)。
- management:可以訪問管理界面但沒有策略權(quán)限。

Name:itheima,也就是用戶名Tags:administrator,說明itheima用戶是超級管理員,擁有所有權(quán)限Can access virtual host:/,可以訪問的virtual host,這里的/是默認(rèn)的virtual host
②Virtual Hosts(虛擬主機(jī)):
將 RabbitMQ 服務(wù)器劃分為多個 虛擬主機(jī)(vhost),類似于一個獨立的命名空間。
- 不同的應(yīng)用可以使用不同的虛擬主機(jī),彼此隔離。
- 每個虛擬主機(jī)都有自己的 交換機(jī)、隊列和用戶權(quán)限。

四.SpringAMOP的使用
Spring AMQP(Spring for Advanced Message Queuing Protocol)是 Spring 提供的一個消息隊列集成模塊,主要用于簡化與 RabbitMQ 的集成。它通過 AMQP 協(xié)議來實現(xiàn)消息的生產(chǎn)和消費。

- Publisher:生產(chǎn)者,不再發(fā)送消息到隊列中,而是發(fā)給交換機(jī)
- Exchange:交換機(jī),一方面,接收生產(chǎn)者發(fā)送的消息。另一方面,知道如何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。
- Queue:消息隊列也與以前一樣,接收消息、緩存消息。不過隊列一定要與交換機(jī)綁定。
- Consumer:消費者,與以前一樣,訂閱隊列,沒有變化
1.導(dǎo)入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>2.添加配置
在publisher以及consumer服務(wù)的application.yml中添加配置:
spring:
rabbitmq:
host: 192.168.150.101 # 你的虛擬機(jī)IP
port: 5672 # 端口
virtual-host: /hmall # 虛擬主機(jī)
username: hmall # 用戶名
password: 123 # 密碼
listener:
simple:
prefetch: 1 # (能者多勞)每次只能獲取一條消息,處理完成才能獲取下一個消息5672 端口用于 AMQP 協(xié)議,通常用于客戶端與 RabbitMQ 進(jìn)行消息傳輸。,所以這里port綁定的是5672。
在 RabbitMQ 中,guest 用戶默認(rèn)只能在 本地 連接到 RabbitMQ 實例。如果你希望使用 guest 用戶進(jìn)行遠(yuǎn)程連接(即從非本地機(jī)器連接 RabbitMQ),RabbitMQ 默認(rèn)是 不允許 的。
3.在publisher服務(wù)中利用RabbitTemplate實現(xiàn)消息發(fā)送
RabbitTemplate 解釋:
RabbitTemplate 是 Spring AMQP 的核心實現(xiàn)類,它實現(xiàn)了 AmqpTemplate 接口,并且對 AmqpTemplate 提供了一些更高層次的封裝,簡化了消息發(fā)送和接收的操作。
RabbitTemplate 是大多數(shù) Spring AMQP 用戶的首選,它提供了很多便捷的方法和默認(rèn)的行為,使得消息交互變得更加簡單。
主要方法:
convertAndSend: 與AmqpTemplate相同,提供了消息轉(zhuǎn)換并發(fā)送的功能。receiveAndConvert: 從隊列中接收消息并轉(zhuǎn)換成 Java 對象。send: 發(fā)送消息,不進(jìn)行轉(zhuǎn)換。receive: 從隊列中接收消息。
RabbitTemplate 為發(fā)送消息提供了更豐富的功能,如消息轉(zhuǎn)換器、默認(rèn)交換機(jī)支持等,通常適用于大多數(shù)使用 Spring 的場景。
(1) 發(fā)送消息(convertAndSend)
rabbitTemplate.convertAndSend(String exchange, String routingKey, Object message);
exchange:指定交換機(jī)的名稱。routingKey:消息的路由鍵。message:消息體,可以是任何對象,RabbitTemplate會自動將其轉(zhuǎn)換為消息格式(例如,JSON、文本等)。
此方法的作用是將消息發(fā)送到指定交換機(jī),并且根據(jù)給定的路由鍵路由到相關(guān)隊列。
實例:
rabbitTemplate.convertAndSend("pay.topic", "pay.success", "支付成功的消息");這里消息 "支付成功的消息" 被發(fā)送到 pay.topic 交換機(jī),使用 pay.success 路由鍵路由到對應(yīng)的隊列。
(2)接收消息(receiveAndConvert)
public <T> T receiveAndConvert(String queueName);
queueName:消息隊列的名稱。- 返回類型為
<T>,表示接收到的消息會被轉(zhuǎn)換為指定的類型。
receiveAndConvert 會從指定的隊列接收消息,并自動將消息轉(zhuǎn)換成目標(biāo)對象類型。如果消息體不是可轉(zhuǎn)換的,方法將拋出異常。
String message = (String) rabbitTemplate.receiveAndConvert("pay.success.queue");
System.out.println("收到的消息: " + message);這里,pay.success.queue 隊列中的消息將被接收,并自動轉(zhuǎn)換為 String 類型。
還有一種就是 receive 方法:
public Message receive(String queueName);
queueName:隊列名稱。- 返回的是
Message類型,而不是直接轉(zhuǎn)換成對象。你可以通過Message對象獲取消息的內(nèi)容和其他屬性。
實例:
Message message = rabbitTemplate.receive("pay.success.queue");
if (message != null) {
System.out.println("收到的消息: " + new String(message.getBody()));
}(3)發(fā)送消息并獲取響應(yīng)(convertSendAndReceive)
public <T> T convertSendAndReceive(String exchange, String routingKey, Object message);
exchange:交換機(jī)名稱。routingKey:路由鍵。message:發(fā)送的消息對象。
convertSendAndReceive 方法既發(fā)送消息到交換機(jī),也等待從隊列返回響應(yīng)。它會根據(jù)指定的路由鍵將消息發(fā)送到交換機(jī),并等待響應(yīng)消息,然后將響應(yīng)轉(zhuǎn)換為返回類型。
實例:
String response = (String) rabbitTemplate.convertSendAndReceive("pay.topic", "pay.success", "請求處理");
System.out.println("收到響應(yīng): " + response);(4)發(fā)送消息到特定隊列
public void send(String exchange, String routingKey, Message message);
exchange:交換機(jī)名稱。routingKey:路由鍵。message:消息對象(可以是Message類型)。
send 方法允許你將一個 Message 對象發(fā)送到指定的交換機(jī),并根據(jù)路由鍵進(jìn)行路由。
Message message = new Message("支付成功".getBytes());
rabbitTemplate.send("pay.topic", "pay.success", message);(5)設(shè)置消息監(jiān)聽器
RabbitTemplate 本身并不直接處理消息監(jiān)聽(接收消息),但是可以通過設(shè)置 RabbitListener 來監(jiān)聽消息,并將其與 RabbitTemplate 配合使用。一般來說,消息接收和處理是在 @RabbitListener 注解的監(jiān)聽器方法中完成的。
@RabbitListener(queues = "pay.success.queue")
public void receiveMessage(String message) {
System.out.println("接收到消息: " + message);
}下面看一個代碼實例:
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
// 隊列名稱
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 發(fā)送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}4.定義消費者實現(xiàn)異步調(diào)用
@Component
@RequiredArgsConstructor
public class PayStatusListener {
private final IOrderService orderService;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "trade.pay.success.queue", durable = "true"),
exchange = @Exchange(name = "pay.topic"),
key = "pay.success"
))
public void listenPaySuccess(Long orderId){
//調(diào)用方法
orderService.markOrderPaySuccess(orderId);
}
}①@RequiredArgsConstructor
這是 Lombok 提供的注解,自動為類中所有 final 修飾的字段生成一個包含這些字段的構(gòu)造函數(shù)。
使用這個注解可以免去手動編寫構(gòu)造函數(shù)的麻煩,尤其是在使用依賴注入時(例如注入 IOrderService)。
②@RabbitListener
@RabbitListener 注解用于監(jiān)聽來自 RabbitMQ 隊列的消息。
它會自動監(jiān)聽指定的隊列,當(dāng)有消息到達(dá)時,會觸發(fā) listenPaySuccess 方法進(jìn)行處理。
③QueueBinding(隊列綁定)
通過 @QueueBinding 注解,綁定了 隊列 和 交換機(jī),并指定了 路由鍵。
@Queue
name = "trade.pay.success.queue":指定隊列的名稱為trade.pay.success.queue。durable = "true":表示隊列是 持久化 的,即 RabbitMQ 重啟后隊列依然存在。- 隊列的作用:隊列是消息的臨時存儲地,消費者會從隊列中拉取消息并處理。
@Exchange
name = "pay.topic":指定交換機(jī)的名稱為pay.topic,這是一個 Topic Exchange(主題交換機(jī))。- 交換機(jī)的作用:交換機(jī)決定消息如何路由到隊列。Topic Exchange 可以根據(jù)路由鍵的匹配規(guī)則將消息路由到合適的隊列。
key = "pay.success"
- 路由鍵:指定了路由鍵為
pay.success。這意味著當(dāng)生產(chǎn)者發(fā)送的消息路由鍵是pay.success時,消息將被路由到trade.pay.success.queue隊列。
5.總流程處理過程
- 生產(chǎn)者:在支付成功后,生產(chǎn)者會發(fā)送一條消息到
pay.topic交換機(jī),消息的路由鍵為pay.success。 - 交換機(jī):
pay.topic交換機(jī)會根據(jù)路由鍵pay.success將消息路由到trade.pay.success.queue隊列。 - 消費者:
PayStatusListener作為消費者監(jiān)聽trade.pay.success.queue,當(dāng)有消息到達(dá)隊列時,它會接收到訂單 ID 并調(diào)用訂單服務(wù)更新訂單狀態(tài)。
五.使用配置類管理定義交換機(jī),隊列及兩者關(guān)系
在 Spring AMQP 中,交換機(jī)(Exchange)、隊列(Queue)、以及綁定(Binding)可以通過配置類來定義和管理。配置類可以幫助你靈活地創(chuàng)建和綁定交換機(jī)與隊列,并且可以根據(jù)業(yè)務(wù)需求自定義各種參數(shù)。
創(chuàng)建配置類效果展示:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
// 創(chuàng)建隊列
@Bean
public Queue queue() {
return new Queue("trade.pay.success.queue", true); // durable=true 表示隊列持久化
}
// 創(chuàng)建交換機(jī)
@Bean
public TopicExchange exchange() {
return new TopicExchange("pay.topic"); // 創(chuàng)建主題交換機(jī)
}
// 創(chuàng)建綁定關(guān)系(隊列與交換機(jī)通過 routing key 綁定)
@Bean
public Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("pay.success"); // 路由鍵是 pay.success
}
}1.創(chuàng)建隊列:Queue
@Bean
public Queue queue() {
return new Queue("trade.pay.success.queue", true);
}作用:通過 @Bean 注解定義了一個隊列 Bean。Spring 容器會自動管理這個隊列,并在 RabbitMQ 上創(chuàng)建該隊列。
參數(shù):
"trade.pay.success.queue":這是隊列的名稱。每個隊列在 RabbitMQ 中必須有唯一的名稱。true:表示這個隊列是 持久化 的。持久化的隊列在 RabbitMQ 服務(wù)重啟后依然存在。- 隊列還可以設(shè)置其他屬性,例如是否自動刪除(
autoDelete)、是否排他性(exclusive)等,默認(rèn)是false。
2.創(chuàng)建交換機(jī):Exchange
@Bean
public TopicExchange exchange() {
return new TopicExchange("pay.topic");
}- 作用:通過
@Bean注解定義了一個 Topic Exchange 類型的交換機(jī)。 - 參數(shù):
"pay.topic":這是交換機(jī)的名稱。同樣,交換機(jī)在 RabbitMQ 中也必須有唯一的名稱。TopicExchange支持通過通配符(如*和#)進(jìn)行更靈活的消息匹配。
Topic Exchange 是一種交換機(jī)類型,它允許使用通配符來進(jìn)行路由。例如,路由鍵可以是 "pay.*",可以匹配 "pay.success" 或 "pay.failure"。在這里可以使用四種交換機(jī)類型來定義交換機(jī),具體場景具體分析使用。
3.創(chuàng)建綁定關(guān)系:Binding
@Bean
public Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("pay.success");
}作用:通過 @Bean 注解定義了隊列和交換機(jī)的綁定關(guān)系。這個綁定決定了消息在何種條件下會從交換機(jī)路由到隊列。
參數(shù):
queue:隊列trade.pay.success.queue被傳入,表示將該隊列與交換機(jī)綁定。exchange:交換機(jī)pay.topic被傳入,表示將隊列與該交換機(jī)綁定。.with("pay.success"):指定了路由鍵pay.success,路由鍵pay.success會告訴交換機(jī),如果生產(chǎn)者發(fā)送的消息帶有這個路由鍵,那么該消息就會路由到trade.pay.success.queue隊列。
4.多個隊列綁定到同一個交換機(jī)
我們可以將多個隊列綁定到同一個交換機(jī),并使用不同的路由鍵。這樣可以實現(xiàn)根據(jù)不同的路由鍵來發(fā)送不同類型的消息到各自的隊列。
@Bean
public Queue paySuccessQueue() {
return new Queue("pay.success.queue", true);
}
@Bean
public Queue payFailureQueue() {
return new Queue("pay.failure.queue", true);
}
@Bean
public Binding paySuccessBinding(Queue paySuccessQueue, TopicExchange exchange) {
return BindingBuilder.bind(paySuccessQueue).to(exchange).with("pay.success");
}
@Bean
public Binding payFailureBinding(Queue payFailureQueue, TopicExchange exchange) {
return BindingBuilder.bind(payFailureQueue).to(exchange).with("pay.failure");
}- 在這個例子中,
pay.success.queue和pay.failure.queue都綁定到同一個交換機(jī)pay.topic,但使用不同的路由鍵。 - 消息路由邏輯:
- 當(dāng)生產(chǎn)者發(fā)送路由鍵為
pay.success的消息時,消息會路由到pay.success.queue隊列。 - 當(dāng)生產(chǎn)者發(fā)送路由鍵為
pay.failure的消息時,消息會路由到pay.failure.queue隊列。
- 當(dāng)生產(chǎn)者發(fā)送路由鍵為
BindingBuilder.bind(paySuccessQueue):paySuccessQueue參數(shù)就是綁定到交換機(jī)的隊列對象,也就是pay.success.queue。它會被傳遞給BindingBuilder,并且指定此隊列和交換機(jī)的綁定關(guān)系。.to(exchange):這里將隊列和交換機(jī)綁定,指定消息通過交換機(jī)路由到隊列。.with("pay.success"):指定路由鍵為pay.success,意思是只有路由鍵為pay.success的消息才會被路由到paySuccessQueue隊列中。
在 Spring AMQP 中,隊列對象(如 paySuccessQueue)是由 @Bean 注解的 Queue 類型方法返回的。Spring 會自動將返回的隊列對象放入到 Spring 容器中,并注入到需要它的地方。所以當(dāng)我們在 Binding 中使用 paySuccessQueue 時,實際上是在引用之前構(gòu)造并注冊到 Spring 容器中的隊列實例。
在 Spring 的上下文中,paySuccessQueue 和 payFailureQueue 是已經(jīng)被創(chuàng)建并管理的隊列對象,我們不需要手動創(chuàng)建隊列,只需要在 Binding 中通過引用這些對象來建立隊列與交換機(jī)之間的關(guān)系。
5.配置不同類型的交換機(jī)
除了 Topic Exchange,RabbitMQ 還支持其他幾種常見的交換機(jī)類型。這里分別演示如何創(chuàng)建 Direct Exchange、Fanout Exchange 和 Headers Exchange。
(1)Direct Exchange
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct.exchange");
}
@Bean
public Binding directBinding(Queue queue, DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with("direct.routing.key");
}Direct Exchange:直接交換機(jī)會根據(jù) 完全匹配的路由鍵 將消息發(fā)送到隊列。只有當(dāng)消息的路由鍵和綁定的路由鍵 完全一致 時,消息才會被路由到指定隊列。
(2)Fanout Exchange
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout.exchange");
}
@Bean
public Binding fanoutBinding(Queue queue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange); // 不需要路由鍵
}Fanout Exchange:扇出交換機(jī)會將消息發(fā)送到所有綁定的隊列,不需要考慮路由鍵。這個交換機(jī)通常用于廣播消息。
(3)Headers Exchange
@Bean
public HeadersExchange headersExchange() {
return new HeadersExchange("headers.exchange");
}
@Bean
public Binding headersBinding(Queue queue, HeadersExchange headersExchange) {
return BindingBuilder.bind(queue).to(headersExchange).where("header-key").matches("header-value");
}Headers Exchange:頭交換機(jī)根據(jù)消息頭的內(nèi)容進(jìn)行路由,而不是依賴路由鍵。適用于按消息的元數(shù)據(jù)進(jìn)行路由的場景。
總結(jié):
通過 Spring AMQP 的配置類,你可以非常靈活地定義 RabbitMQ 的 交換機(jī)、隊列 和 綁定關(guān)系,并通過不同的路由鍵和交換機(jī)類型實現(xiàn)復(fù)雜的消息路由邏輯。
以下是一些關(guān)鍵要點:
- 隊列(Queue):消息的臨時存儲地,可以是持久化的。
- 交換機(jī)(Exchange):控制消息如何分發(fā)到不同的隊列。
- Direct Exchange:嚴(yán)格匹配路由鍵。
- Topic Exchange:支持通配符匹配路由鍵。
- Fanout Exchange:廣播消息到所有綁定的隊列。
- Headers Exchange:根據(jù)消息頭的內(nèi)容進(jìn)行路由。
- 綁定(Binding):將隊列與交換機(jī)連接起來,使用路由鍵來決定消息的流向。
通過配置類來定義這些組件,能夠簡化 RabbitMQ 與 Spring 應(yīng)用的集成,并且通過靈活的路由規(guī)則支持復(fù)雜的消息傳遞需求。
六.在Springboot項目中使用RabbitMQ解決高并發(fā)問題
在 Spring Boot 項目中使用 RabbitMQ 來實現(xiàn)消息傳輸,處理并發(fā)問題是非常常見的一種方式。RabbitMQ 可以通過解耦應(yīng)用程序的不同部分,并將任務(wù)分發(fā)給多個消費者,從而有效地解決并發(fā)和負(fù)載均衡問題。
1.引入依賴
首先,需要在 pom.xml 文件中引入 spring-boot-starter-amqp 依賴,這樣 Spring Boot 就可以與 RabbitMQ 集成:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>2.配置RabbitMQ
在 application.yml 或 application.properties 文件中,配置 RabbitMQ 的連接信息:
spring:
rabbitmq:
host: localhost
port: 15672
username: hmall
password: 123
virtual-host: /
listener:
simple:
concurrency: 3 # 設(shè)置消費者的最小數(shù)量
max-concurrency: 10 # 設(shè)置消費者的最大數(shù)量3.RabbitMQ配置類
創(chuàng)建一個配置類,用于聲明隊列、交換機(jī)、綁定等。
package com.example.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
// 聲明隊列
@Bean
public Queue taskQueue() {
return new Queue("taskQueue", true); // true表示隊列持久化
}
// 聲明交換機(jī)
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("taskExchange");
}
// 綁定隊列和交換機(jī)
@Bean
public Binding binding(Queue taskQueue, TopicExchange topicExchange) {
return new Binding("taskQueue",
Binding.DestinationType.QUEUE,
"taskExchange",
"task.#", // 使用路由鍵
null);
}
}4.創(chuàng)建消息生產(chǎn)者(Producer)
創(chuàng)建一個消息生產(chǎn)者(Producer),它將消息發(fā)送到 RabbitMQ 隊列。
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TaskProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
// 發(fā)送消息
public void sendMessage(String message) {
System.out.println("Sending message: " + message);
rabbitTemplate.convertAndSend("taskExchange", "task.new", message); // 發(fā)送到交換機(jī),使用路由鍵
}
}5.創(chuàng)建消息消費者(Consumer)
消費者從 RabbitMQ 隊列中異步接收消息,并進(jìn)行并發(fā)處理。在消費者類中,使用 @RabbitListener 注解監(jiān)聽隊列,確保多個消費者可以同時處理消息。
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TaskConsumer {
// 監(jiān)聽隊列并異步處理消息
@RabbitListener(queues = "taskQueue")
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
// 模擬處理消息的業(yè)務(wù)邏輯
try {
Thread.sleep(1000); // 模擬耗時操作
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Finished processing message: " + message);
}
}6.增加并發(fā)消費者(多個消費者)
為了處理高并發(fā),我們可以通過設(shè)置 concurrency 和 max-concurrency 來控制消費者的最小和最大并發(fā)數(shù)。這樣,多個消費者可以同時處理來自隊列的消息。我們已經(jīng)在 application.yml 中配置了 concurrency,現(xiàn)在的配置允許最多啟動 20 個消費者來處理消息。
- 最小并發(fā)數(shù):設(shè)置為 5,意味著在任何時候都會至少啟動 5 個消費者。
- 最大并發(fā)數(shù):設(shè)置為 20,表示 RabbitMQ 可以根據(jù)負(fù)載增加最多 20 個消費者。
總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
解決java main函數(shù)中的args數(shù)組傳值問題
這篇文章主要介紹了解決java main函數(shù)中的args數(shù)組傳值問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-02-02
Spring Boot 簡單使用EhCache緩存框架的方法
本篇文章主要介紹了Spring Boot 簡單使用EhCache緩存框架的方法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-07-07
SpringBoot 自定義注解異步記錄復(fù)雜日志詳解
這篇文章主要為大家介紹了SpringBoot 自定義注解異步記錄復(fù)雜日志詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-09-09
Java?IO流之StringWriter和StringReader用法分析
這篇文章主要介紹了Java?IO流之StringWriter和StringReader用法分析,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12
IntelliJ IDEA 2023.2正式發(fā)布新UI和Profiler轉(zhuǎn)正(最新推薦)
北京時間2023年7月26日,IntelliJ IDEA 2023.2正式發(fā)布,IntelliJ IDEA 2023.2 引入 AI Assistant(AI助手),通過一組由 AI 提供支持的功能助力開發(fā),今天給大家分享IntelliJ IDEA 2023.2正式發(fā)布新UI和Profiler轉(zhuǎn)正,感興趣的朋友一起看看吧2023-10-10

