微服務(wù)架構(gòu)之使用RabbitMQ進(jìn)行異步處理方式
一.什么是RabbitMQ?
RabbitMQ 是一種流行的消息隊(duì)列(Message Queue)實(shí)現(xiàn),基于 AMQP 協(xié)議(Advanced Message Queuing Protocol)。它支持異步通信,使多個(gè)系統(tǒng)之間以非阻塞的方式交換數(shù)據(jù)。
在我們使用微服務(wù)的時(shí)候,微服務(wù)一旦拆分,必然涉及到服務(wù)之間的相互調(diào)用,目前我們服務(wù)之間調(diào)用采用的都是基于 OpenFeign 的調(diào)用。這種調(diào)用中,調(diào)用者發(fā)起請(qǐng)求后需要等待服務(wù)提供者執(zhí)行業(yè)務(wù)返回結(jié)果后,才能繼續(xù)執(zhí)行后面的業(yè)務(wù)。也就是說(shuō)調(diào)用者在調(diào)用過(guò)程中處于阻塞狀態(tài),因此我們稱這種調(diào)用方式為同步調(diào)用,也可以叫同步通訊。
如果我們的業(yè)務(wù)需要實(shí)時(shí)得到服務(wù)提供方的響應(yīng),則應(yīng)該選擇同步通訊(同步調(diào)用)。而如果我們追求更高的效率,并且不需要實(shí)時(shí)響應(yīng),則應(yīng)該選擇異步通訊(異步調(diào)用)。
二.異步調(diào)用處理邏輯:
異步調(diào)用方式其實(shí)就是基于消息通知的方式,一般包含三個(gè)角色:
- 消息發(fā)送者:投遞消息的人,就是原來(lái)的調(diào)用方
- 消息Broker:管理、暫存、轉(zhuǎn)發(fā)消息,你可以把它理解成微信服務(wù)器
- 消息接收者:接收和處理消息的人,就是原來(lái)的服務(wù)提供方
除此之外還有:
- Exchange(交換機(jī)):用于路由消息。RabbitMQ 有多種交換機(jī)類型(如 direct、topic、fanout、headers),它們決定了消息如何被傳遞到隊(duì)列。
- Binding(綁定):連接交換機(jī)和隊(duì)列的規(guī)則。
在異步調(diào)用中,發(fā)送者不再直接同步調(diào)用接收者的業(yè)務(wù)接口,而是發(fā)送一條消息投遞給消息Broker。然后接收者根據(jù)自己的需求從消息Broker那里訂閱消息。每當(dāng)發(fā)送方發(fā)送消息后,接受者都能獲取消息并處理。這樣,發(fā)送消息的人和接收消息的人就完全解耦了。
異步調(diào)用的優(yōu)勢(shì)包括:
- 耦合度更低
- 性能更好
- 業(yè)務(wù)拓展性強(qiáng)
- 故障隔離,避免級(jí)聯(lián)失敗
當(dāng)然,異步通信也并非完美無(wú)缺,它存在下列缺點(diǎn):
- 完全依賴于Broker的可靠性、安全性和性能
- 架構(gòu)復(fù)雜,后期維護(hù)和調(diào)試麻煩
三.RabbitMQ的基本使用
下面是RabbitMQ的官網(wǎng):https://www.rabbitmq.com/
1.安裝
首先將RabbitMQ的鏡像拉取下來(lái),然后運(yùn)行下面命令:
docker run \ -e RABBITMQ_DEFAULT_USER=hmall \ -e RABBITMQ_DEFAULT_PASS=123 \ -v mq-plugins:/plugins \ --name mq \ --hostname mq \ -p 15672:15672 \ -p 5672:5672 \ --network hm-net\ -d \ rabbitmq:3.8-management
在 docker run
命令中,您將容器的兩個(gè)端口暴露到主機(jī)上:
-p 15672:15672
:將容器內(nèi)部的15672
端口映射到主機(jī)的15672
端口。這個(gè)端口通常用于 RabbitMQ 的 Web 管理控制臺(tái)(Management Console)。-p 5672:5672
:將容器內(nèi)部的5672
端口映射到主機(jī)的5672
端口。這個(gè)端口用于 AMQP 協(xié)議,即客戶端與 RabbitMQ 進(jìn)行消息傳輸?shù)亩丝凇?/li>
隨后我們?cè)L問(wèn)http://虛擬機(jī)IP地址:15672來(lái)打開(kāi)RabbitMQ的控制臺(tái),默認(rèn)賬號(hào)密碼是 guest
/ guest。
在控制臺(tái)上主要可以關(guān)注三個(gè)信息:Exchanges(交換機(jī)),Queues(隊(duì)列),Admin(用戶管理)。
2.架構(gòu)圖
其中包含幾個(gè)概念:
publisher
:生產(chǎn)者,也就是發(fā)送消息的一方consumer
:消費(fèi)者,也就是消費(fèi)消息的一方queue
:隊(duì)列,存儲(chǔ)消息。生產(chǎn)者投遞的消息會(huì)暫存在消息隊(duì)列中,等待消費(fèi)者處理exchange
:交換機(jī),負(fù)責(zé)消息路由。生產(chǎn)者發(fā)送的消息由交換機(jī)決定投遞到哪個(gè)隊(duì)列。virtual host
:虛擬主機(jī),起到數(shù)據(jù)隔離的作用。每個(gè)虛擬主機(jī)相互獨(dú)立,有各自的exchange、queue
RabbitMQ 使用 AMQP 協(xié)議,核心的消息模型包括:
- Producer 將消息發(fā)送到 Exchange。
- Exchange 將消息路由到 Queue。
- Consumer 從 Queue 獲取消息進(jìn)行處理。
RabbitMQ 消息生命周期:
- 消息發(fā)布:Producer 將消息發(fā)送到 RabbitMQ。
- 消息存儲(chǔ):消息通過(guò)交換機(jī)路由到一個(gè)或多個(gè)隊(duì)列,隊(duì)列暫存這些消息。
- 消息消費(fèi):Consumer 從隊(duì)列中獲取并處理消息。
3.RabbitMQ控制臺(tái)的使用
在 RabbitMQ 中,交換機(jī)(Exchange) 和 隊(duì)列(Queue) 是核心概念。它們之間的關(guān)系決定了消息的路由和存儲(chǔ)方式。
(1)Exchanges 交換機(jī)
- 交換機(jī)是 消息的路由器,負(fù)責(zé)決定消息應(yīng)該被發(fā)送到哪個(gè)隊(duì)列。
- 生產(chǎn)者將消息發(fā)送給交換機(jī),而不是直接發(fā)送到隊(duì)列。
- 交換機(jī)根據(jù)路由規(guī)則 決定消息的走向(即發(fā)往哪些隊(duì)列)。
Exchange(交換機(jī))只負(fù)責(zé)轉(zhuǎn)發(fā)消息,不具備存儲(chǔ)消息的能力,因此如果沒(méi)有任何隊(duì)列與Exchange綁定,或者沒(méi)有符合路由規(guī)則的隊(duì)列,那么消息會(huì)丟失!
RabbitMQ 提供了四種常用的交換機(jī)類型,每種類型的路由規(guī)則不同:
- Fanout:廣播,將消息交給所有綁定到交換機(jī)的隊(duì)列。我們最早在控制臺(tái)使用的正是Fanout交換機(jī)。
- Direct:訂閱,基于RoutingKey(路由key)發(fā)送給訂閱了消息的隊(duì)列
- Topic:通配符訂閱,與Direct類似,只不過(guò)RoutingKey可以使用通配符,可以進(jìn)行模糊匹配。
- Headers:頭匹配,基于MQ的 header 信息來(lái)路由消息,用的較少。
我們可以再這里創(chuàng)建交換機(jī),Name表示創(chuàng)建的交換機(jī)的名字,Type表示可以選擇交換機(jī)的四種類型。創(chuàng)建成功后就可以在上面看到創(chuàng)建的交換機(jī)名字:
比如我們點(diǎn)擊amq.fanout查看交換機(jī)數(shù)據(jù)并且可以發(fā)送消息給消費(fèi)者。
注意!??!如果我們不將交換機(jī)指定隊(duì)列的話,由于沒(méi)有消費(fèi)者存在,最終消息丟失了,這樣說(shuō)明交換機(jī)沒(méi)有存儲(chǔ)消息的能力。
所以下面我們要先創(chuàng)建隊(duì)列,然后讓生產(chǎn)者推送的消息經(jīng)過(guò)交換機(jī)的傳遞后,到達(dá)消息隊(duì)列,然后再給消費(fèi)者。所以生產(chǎn)者無(wú)需知道隊(duì)列的存在以此來(lái)達(dá)到解耦的效果。
(2)Queues 隊(duì)列
- 隊(duì)列用于 存儲(chǔ)消息,直到消費(fèi)者消費(fèi)它們。
- 隊(duì)列與消費(fèi)者一一對(duì)應(yīng),即一個(gè)消費(fèi)者從一個(gè)隊(duì)列讀取消息。
- 隊(duì)列按 FIFO(First In, First Out)的順序存儲(chǔ)消息。
交換機(jī)與隊(duì)列的關(guān)系:
交換機(jī)(Exchange)是消息的路由器,它決定了將消息發(fā)送到哪個(gè)隊(duì)列。隊(duì)列(Queue)是消息的存儲(chǔ)和處理地方。交換機(jī)本身并不知道具體的隊(duì)列,它只是通過(guò)綁定(Binding)來(lái)決定消息應(yīng)該被路由到哪些隊(duì)列。
交換機(jī)將消息通過(guò)路由鍵(Routing Key)發(fā)送到綁定的隊(duì)列,但交換機(jī)和隊(duì)列之間的連接并不是自動(dòng)的,需要顯式地設(shè)置綁定。綁定指定了 交換機(jī) 和 隊(duì)列 之間的關(guān)系,以及 路由規(guī)則(例如,路由鍵匹配的規(guī)則)。
在這里我們填寫(xiě)隊(duì)列名字即可,其他暫時(shí)可以不用填寫(xiě)。
隨后我們向交換機(jī)進(jìn)行綁定(bind)隊(duì)列,隨后通過(guò)隊(duì)列傳輸給消費(fèi)者。
這里的Routing key的出現(xiàn)是為了讓 Direct (交換機(jī)的類型)能夠選擇隊(duì)列而存在的。
我們?cè)诮壎?duì)列完成后會(huì)出現(xiàn)下面這樣,這樣證明我們成功為交換機(jī)綁定好兩個(gè)隊(duì)列:
隨后我們?cè)谙旅娲翱谕扑拖ⅲ?/p>
(3)Admin
①Users(用戶管理):
管理 RabbitMQ 中的用戶賬號(hào),在這里 添加、刪除用戶,并設(shè)置每個(gè)用戶的權(quán)限。
每個(gè)用戶可分配不同的 角色:
- administrator:管理員,具有所有權(quán)限。
- monitoring:可以監(jiān)控和查看信息,但不能管理。
- policymaker:可以設(shè)置策略和參數(shù)。
- management:可以訪問(wèn)管理界面但沒(méi)有策略權(quán)限。
Name
:itheima
,也就是用戶名Tags
:administrator
,說(shuō)明itheima
用戶是超級(jí)管理員,擁有所有權(quán)限Can access virtual host
:/
,可以訪問(wèn)的virtual host
,這里的/
是默認(rèn)的virtual host
②Virtual Hosts(虛擬主機(jī)):
將 RabbitMQ 服務(wù)器劃分為多個(gè) 虛擬主機(jī)(vhost),類似于一個(gè)獨(dú)立的命名空間。
- 不同的應(yīng)用可以使用不同的虛擬主機(jī),彼此隔離。
- 每個(gè)虛擬主機(jī)都有自己的 交換機(jī)、隊(duì)列和用戶權(quán)限。
四.SpringAMOP的使用
Spring AMQP(Spring for Advanced Message Queuing Protocol)是 Spring 提供的一個(gè)消息隊(duì)列集成模塊,主要用于簡(jiǎn)化與 RabbitMQ 的集成。它通過(guò) AMQP 協(xié)議來(lái)實(shí)現(xiàn)消息的生產(chǎn)和消費(fèi)。
- Publisher:生產(chǎn)者,不再發(fā)送消息到隊(duì)列中,而是發(fā)給交換機(jī)
- Exchange:交換機(jī),一方面,接收生產(chǎn)者發(fā)送的消息。另一方面,知道如何處理消息,例如遞交給某個(gè)特別隊(duì)列、遞交給所有隊(duì)列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。
- Queue:消息隊(duì)列也與以前一樣,接收消息、緩存消息。不過(guò)隊(duì)列一定要與交換機(jī)綁定。
- Consumer:消費(fèi)者,與以前一樣,訂閱隊(duì)列,沒(méi)有變化
1.導(dǎo)入依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.添加配置
在publisher以及consumer
服務(wù)的application.yml
中添加配置:
spring: rabbitmq: host: 192.168.150.101 # 你的虛擬機(jī)IP port: 5672 # 端口 virtual-host: /hmall # 虛擬主機(jī) username: hmall # 用戶名 password: 123 # 密碼 listener: simple: prefetch: 1 # (能者多勞)每次只能獲取一條消息,處理完成才能獲取下一個(gè)消息
5672
端口用于 AMQP 協(xié)議,通常用于客戶端與 RabbitMQ 進(jìn)行消息傳輸。,所以這里port綁定的是5672。
在 RabbitMQ 中,guest
用戶默認(rèn)只能在 本地 連接到 RabbitMQ 實(shí)例。如果你希望使用 guest
用戶進(jìn)行遠(yuǎn)程連接(即從非本地機(jī)器連接 RabbitMQ),RabbitMQ 默認(rèn)是 不允許 的。
3.在publisher服務(wù)中利用RabbitTemplate實(shí)現(xiàn)消息發(fā)送
RabbitTemplate 解釋:
RabbitTemplate
是 Spring AMQP 的核心實(shí)現(xiàn)類,它實(shí)現(xiàn)了 AmqpTemplate
接口,并且對(duì) AmqpTemplate
提供了一些更高層次的封裝,簡(jiǎn)化了消息發(fā)送和接收的操作。
RabbitTemplate
是大多數(shù) Spring AMQP 用戶的首選,它提供了很多便捷的方法和默認(rèn)的行為,使得消息交互變得更加簡(jiǎn)單。
主要方法:
convertAndSend
: 與AmqpTemplate
相同,提供了消息轉(zhuǎn)換并發(fā)送的功能。receiveAndConvert
: 從隊(duì)列中接收消息并轉(zhuǎn)換成 Java 對(duì)象。send
: 發(fā)送消息,不進(jìn)行轉(zhuǎn)換。receive
: 從隊(duì)列中接收消息。
RabbitTemplate
為發(fā)送消息提供了更豐富的功能,如消息轉(zhuǎn)換器、默認(rèn)交換機(jī)支持等,通常適用于大多數(shù)使用 Spring 的場(chǎng)景。
(1) 發(fā)送消息(convertAndSend)
rabbitTemplate.convertAndSend(String exchange, String routingKey, Object message);
exchange
:指定交換機(jī)的名稱。routingKey
:消息的路由鍵。message
:消息體,可以是任何對(duì)象,RabbitTemplate
會(huì)自動(dòng)將其轉(zhuǎn)換為消息格式(例如,JSON、文本等)。
此方法的作用是將消息發(fā)送到指定交換機(jī),并且根據(jù)給定的路由鍵路由到相關(guān)隊(duì)列。
實(shí)例:
rabbitTemplate.convertAndSend("pay.topic", "pay.success", "支付成功的消息");
這里消息 "支付成功的消息"
被發(fā)送到 pay.topic
交換機(jī),使用 pay.success
路由鍵路由到對(duì)應(yīng)的隊(duì)列。
(2)接收消息(receiveAndConvert)
public <T> T receiveAndConvert(String queueName);
queueName
:消息隊(duì)列的名稱。- 返回類型為
<T>
,表示接收到的消息會(huì)被轉(zhuǎn)換為指定的類型。
receiveAndConvert
會(huì)從指定的隊(duì)列接收消息,并自動(dòng)將消息轉(zhuǎn)換成目標(biāo)對(duì)象類型。如果消息體不是可轉(zhuǎn)換的,方法將拋出異常。
String message = (String) rabbitTemplate.receiveAndConvert("pay.success.queue"); System.out.println("收到的消息: " + message);
這里,pay.success.queue
隊(duì)列中的消息將被接收,并自動(dòng)轉(zhuǎn)換為 String
類型。
還有一種就是 receive 方法:
public Message receive(String queueName);
queueName
:隊(duì)列名稱。- 返回的是
Message
類型,而不是直接轉(zhuǎn)換成對(duì)象。你可以通過(guò)Message
對(duì)象獲取消息的內(nèi)容和其他屬性。
實(shí)例:
Message message = rabbitTemplate.receive("pay.success.queue"); if (message != null) { System.out.println("收到的消息: " + new String(message.getBody())); }
(3)發(fā)送消息并獲取響應(yīng)(convertSendAndReceive)
public <T> T convertSendAndReceive(String exchange, String routingKey, Object message);
exchange
:交換機(jī)名稱。routingKey
:路由鍵。message
:發(fā)送的消息對(duì)象。
convertSendAndReceive
方法既發(fā)送消息到交換機(jī),也等待從隊(duì)列返回響應(yīng)。它會(huì)根據(jù)指定的路由鍵將消息發(fā)送到交換機(jī),并等待響應(yīng)消息,然后將響應(yīng)轉(zhuǎn)換為返回類型。
實(shí)例:
String response = (String) rabbitTemplate.convertSendAndReceive("pay.topic", "pay.success", "請(qǐng)求處理"); System.out.println("收到響應(yīng): " + response);
(4)發(fā)送消息到特定隊(duì)列
public void send(String exchange, String routingKey, Message message);
exchange
:交換機(jī)名稱。routingKey
:路由鍵。message
:消息對(duì)象(可以是Message
類型)。
send
方法允許你將一個(gè) Message
對(duì)象發(fā)送到指定的交換機(jī),并根據(jù)路由鍵進(jìn)行路由。
Message message = new Message("支付成功".getBytes()); rabbitTemplate.send("pay.topic", "pay.success", message);
(5)設(shè)置消息監(jiān)聽(tīng)器
RabbitTemplate
本身并不直接處理消息監(jiān)聽(tīng)(接收消息),但是可以通過(guò)設(shè)置 RabbitListener
來(lái)監(jiān)聽(tīng)消息,并將其與 RabbitTemplate
配合使用。一般來(lái)說(shuō),消息接收和處理是在 @RabbitListener
注解的監(jiān)聽(tīng)器方法中完成的。
@RabbitListener(queues = "pay.success.queue") public void receiveMessage(String message) { System.out.println("接收到消息: " + message); }
下面看一個(gè)代碼實(shí)例:
@SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSimpleQueue() { // 隊(duì)列名稱 String queueName = "simple.queue"; // 消息 String message = "hello, spring amqp!"; // 發(fā)送消息 rabbitTemplate.convertAndSend(queueName, message); } }
4.定義消費(fèi)者實(shí)現(xiàn)異步調(diào)用
@Component @RequiredArgsConstructor public class PayStatusListener { private final IOrderService orderService; @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "trade.pay.success.queue", durable = "true"), exchange = @Exchange(name = "pay.topic"), key = "pay.success" )) public void listenPaySuccess(Long orderId){ //調(diào)用方法 orderService.markOrderPaySuccess(orderId); } }
①@RequiredArgsConstructor
這是 Lombok 提供的注解,自動(dòng)為類中所有 final
修飾的字段生成一個(gè)包含這些字段的構(gòu)造函數(shù)。
使用這個(gè)注解可以免去手動(dòng)編寫(xiě)構(gòu)造函數(shù)的麻煩,尤其是在使用依賴注入時(shí)(例如注入 IOrderService
)。
②@RabbitListener
@RabbitListener
注解用于監(jiān)聽(tīng)來(lái)自 RabbitMQ 隊(duì)列的消息。
它會(huì)自動(dòng)監(jiān)聽(tīng)指定的隊(duì)列,當(dāng)有消息到達(dá)時(shí),會(huì)觸發(fā) listenPaySuccess
方法進(jìn)行處理。
③QueueBinding(隊(duì)列綁定)
通過(guò) @QueueBinding
注解,綁定了 隊(duì)列 和 交換機(jī),并指定了 路由鍵。
@Queue
name = "trade.pay.success.queue"
:指定隊(duì)列的名稱為trade.pay.success.queue
。durable = "true"
:表示隊(duì)列是 持久化 的,即 RabbitMQ 重啟后隊(duì)列依然存在。- 隊(duì)列的作用:隊(duì)列是消息的臨時(shí)存儲(chǔ)地,消費(fèi)者會(huì)從隊(duì)列中拉取消息并處理。
@Exchange
name = "pay.topic"
:指定交換機(jī)的名稱為pay.topic
,這是一個(gè) Topic Exchange(主題交換機(jī))。- 交換機(jī)的作用:交換機(jī)決定消息如何路由到隊(duì)列。Topic Exchange 可以根據(jù)路由鍵的匹配規(guī)則將消息路由到合適的隊(duì)列。
key = "pay.success"
- 路由鍵:指定了路由鍵為
pay.success
。這意味著當(dāng)生產(chǎn)者發(fā)送的消息路由鍵是pay.success
時(shí),消息將被路由到trade.pay.success.queue
隊(duì)列。
5.總流程處理過(guò)程
- 生產(chǎn)者:在支付成功后,生產(chǎn)者會(huì)發(fā)送一條消息到
pay.topic
交換機(jī),消息的路由鍵為pay.success
。 - 交換機(jī):
pay.topic
交換機(jī)會(huì)根據(jù)路由鍵pay.success
將消息路由到trade.pay.success.queue
隊(duì)列。 - 消費(fèi)者:
PayStatusListener
作為消費(fèi)者監(jiān)聽(tīng)trade.pay.success.queue
,當(dāng)有消息到達(dá)隊(duì)列時(shí),它會(huì)接收到訂單 ID 并調(diào)用訂單服務(wù)更新訂單狀態(tài)。
五.使用配置類管理定義交換機(jī),隊(duì)列及兩者關(guān)系
在 Spring AMQP 中,交換機(jī)(Exchange)、隊(duì)列(Queue)、以及綁定(Binding)可以通過(guò)配置類來(lái)定義和管理。配置類可以幫助你靈活地創(chuàng)建和綁定交換機(jī)與隊(duì)列,并且可以根據(jù)業(yè)務(wù)需求自定義各種參數(shù)。
創(chuàng)建配置類效果展示:
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { // 創(chuàng)建隊(duì)列 @Bean public Queue queue() { return new Queue("trade.pay.success.queue", true); // durable=true 表示隊(duì)列持久化 } // 創(chuàng)建交換機(jī) @Bean public TopicExchange exchange() { return new TopicExchange("pay.topic"); // 創(chuàng)建主題交換機(jī) } // 創(chuàng)建綁定關(guān)系(隊(duì)列與交換機(jī)通過(guò) routing key 綁定) @Bean public Binding binding(Queue queue, TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("pay.success"); // 路由鍵是 pay.success } }
1.創(chuàng)建隊(duì)列:Queue
@Bean public Queue queue() { return new Queue("trade.pay.success.queue", true); }
作用:通過(guò) @Bean
注解定義了一個(gè)隊(duì)列 Bean。Spring 容器會(huì)自動(dòng)管理這個(gè)隊(duì)列,并在 RabbitMQ 上創(chuàng)建該隊(duì)列。
參數(shù):
"trade.pay.success.queue"
:這是隊(duì)列的名稱。每個(gè)隊(duì)列在 RabbitMQ 中必須有唯一的名稱。true
:表示這個(gè)隊(duì)列是 持久化 的。持久化的隊(duì)列在 RabbitMQ 服務(wù)重啟后依然存在。- 隊(duì)列還可以設(shè)置其他屬性,例如是否自動(dòng)刪除(
autoDelete
)、是否排他性(exclusive
)等,默認(rèn)是false
。
2.創(chuàng)建交換機(jī):Exchange
@Bean public TopicExchange exchange() { return new TopicExchange("pay.topic"); }
- 作用:通過(guò)
@Bean
注解定義了一個(gè) Topic Exchange 類型的交換機(jī)。 - 參數(shù):
"pay.topic"
:這是交換機(jī)的名稱。同樣,交換機(jī)在 RabbitMQ 中也必須有唯一的名稱。TopicExchange
支持通過(guò)通配符(如*
和#
)進(jìn)行更靈活的消息匹配。
Topic Exchange 是一種交換機(jī)類型,它允許使用通配符來(lái)進(jìn)行路由。例如,路由鍵可以是 "pay.*"
,可以匹配 "pay.success"
或 "pay.failure"
。在這里可以使用四種交換機(jī)類型來(lái)定義交換機(jī),具體場(chǎng)景具體分析使用。
3.創(chuàng)建綁定關(guān)系:Binding
@Bean public Binding binding(Queue queue, TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("pay.success"); }
作用:通過(guò) @Bean
注解定義了隊(duì)列和交換機(jī)的綁定關(guān)系。這個(gè)綁定決定了消息在何種條件下會(huì)從交換機(jī)路由到隊(duì)列。
參數(shù):
queue
:隊(duì)列trade.pay.success.queue
被傳入,表示將該隊(duì)列與交換機(jī)綁定。exchange
:交換機(jī)pay.topic
被傳入,表示將隊(duì)列與該交換機(jī)綁定。.with("pay.success")
:指定了路由鍵pay.success
,路由鍵pay.success
會(huì)告訴交換機(jī),如果生產(chǎn)者發(fā)送的消息帶有這個(gè)路由鍵,那么該消息就會(huì)路由到trade.pay.success.queue
隊(duì)列。
4.多個(gè)隊(duì)列綁定到同一個(gè)交換機(jī)
我們可以將多個(gè)隊(duì)列綁定到同一個(gè)交換機(jī),并使用不同的路由鍵。這樣可以實(shí)現(xiàn)根據(jù)不同的路由鍵來(lái)發(fā)送不同類型的消息到各自的隊(duì)列。
@Bean public Queue paySuccessQueue() { return new Queue("pay.success.queue", true); } @Bean public Queue payFailureQueue() { return new Queue("pay.failure.queue", true); } @Bean public Binding paySuccessBinding(Queue paySuccessQueue, TopicExchange exchange) { return BindingBuilder.bind(paySuccessQueue).to(exchange).with("pay.success"); } @Bean public Binding payFailureBinding(Queue payFailureQueue, TopicExchange exchange) { return BindingBuilder.bind(payFailureQueue).to(exchange).with("pay.failure"); }
- 在這個(gè)例子中,
pay.success.queue
和pay.failure.queue
都綁定到同一個(gè)交換機(jī)pay.topic
,但使用不同的路由鍵。 - 消息路由邏輯:
- 當(dāng)生產(chǎn)者發(fā)送路由鍵為
pay.success
的消息時(shí),消息會(huì)路由到pay.success.queue
隊(duì)列。 - 當(dāng)生產(chǎn)者發(fā)送路由鍵為
pay.failure
的消息時(shí),消息會(huì)路由到pay.failure.queue
隊(duì)列。
- 當(dāng)生產(chǎn)者發(fā)送路由鍵為
BindingBuilder.bind(paySuccessQueue)
:paySuccessQueue
參數(shù)就是綁定到交換機(jī)的隊(duì)列對(duì)象,也就是pay.success.queue
。它會(huì)被傳遞給BindingBuilder
,并且指定此隊(duì)列和交換機(jī)的綁定關(guān)系。.to(exchange)
:這里將隊(duì)列和交換機(jī)綁定,指定消息通過(guò)交換機(jī)路由到隊(duì)列。.with("pay.success")
:指定路由鍵為pay.success
,意思是只有路由鍵為pay.success
的消息才會(huì)被路由到paySuccessQueue
隊(duì)列中。
在 Spring AMQP 中,隊(duì)列對(duì)象(如 paySuccessQueue
)是由 @Bean
注解的 Queue
類型方法返回的。Spring 會(huì)自動(dòng)將返回的隊(duì)列對(duì)象放入到 Spring 容器中,并注入到需要它的地方。所以當(dāng)我們?cè)?Binding
中使用 paySuccessQueue
時(shí),實(shí)際上是在引用之前構(gòu)造并注冊(cè)到 Spring 容器中的隊(duì)列實(shí)例。
在 Spring 的上下文中,paySuccessQueue
和 payFailureQueue
是已經(jīng)被創(chuàng)建并管理的隊(duì)列對(duì)象,我們不需要手動(dòng)創(chuàng)建隊(duì)列,只需要在 Binding
中通過(guò)引用這些對(duì)象來(lái)建立隊(duì)列與交換機(jī)之間的關(guān)系。
5.配置不同類型的交換機(jī)
除了 Topic Exchange,RabbitMQ 還支持其他幾種常見(jiàn)的交換機(jī)類型。這里分別演示如何創(chuàng)建 Direct Exchange、Fanout Exchange 和 Headers Exchange。
(1)Direct Exchange
@Bean public DirectExchange directExchange() { return new DirectExchange("direct.exchange"); } @Bean public Binding directBinding(Queue queue, DirectExchange directExchange) { return BindingBuilder.bind(queue).to(directExchange).with("direct.routing.key"); }
Direct Exchange:直接交換機(jī)會(huì)根據(jù) 完全匹配的路由鍵 將消息發(fā)送到隊(duì)列。只有當(dāng)消息的路由鍵和綁定的路由鍵 完全一致 時(shí),消息才會(huì)被路由到指定隊(duì)列。
(2)Fanout Exchange
@Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanout.exchange"); } @Bean public Binding fanoutBinding(Queue queue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queue).to(fanoutExchange); // 不需要路由鍵 }
Fanout Exchange:扇出交換機(jī)會(huì)將消息發(fā)送到所有綁定的隊(duì)列,不需要考慮路由鍵。這個(gè)交換機(jī)通常用于廣播消息。
(3)Headers Exchange
@Bean public HeadersExchange headersExchange() { return new HeadersExchange("headers.exchange"); } @Bean public Binding headersBinding(Queue queue, HeadersExchange headersExchange) { return BindingBuilder.bind(queue).to(headersExchange).where("header-key").matches("header-value"); }
Headers Exchange:頭交換機(jī)根據(jù)消息頭的內(nèi)容進(jìn)行路由,而不是依賴路由鍵。適用于按消息的元數(shù)據(jù)進(jìn)行路由的場(chǎng)景。
總結(jié):
通過(guò) Spring AMQP 的配置類,你可以非常靈活地定義 RabbitMQ 的 交換機(jī)、隊(duì)列 和 綁定關(guān)系,并通過(guò)不同的路由鍵和交換機(jī)類型實(shí)現(xiàn)復(fù)雜的消息路由邏輯。
以下是一些關(guān)鍵要點(diǎn):
- 隊(duì)列(Queue):消息的臨時(shí)存儲(chǔ)地,可以是持久化的。
- 交換機(jī)(Exchange):控制消息如何分發(fā)到不同的隊(duì)列。
- Direct Exchange:嚴(yán)格匹配路由鍵。
- Topic Exchange:支持通配符匹配路由鍵。
- Fanout Exchange:廣播消息到所有綁定的隊(duì)列。
- Headers Exchange:根據(jù)消息頭的內(nèi)容進(jìn)行路由。
- 綁定(Binding):將隊(duì)列與交換機(jī)連接起來(lái),使用路由鍵來(lái)決定消息的流向。
通過(guò)配置類來(lái)定義這些組件,能夠簡(jiǎn)化 RabbitMQ 與 Spring 應(yīng)用的集成,并且通過(guò)靈活的路由規(guī)則支持復(fù)雜的消息傳遞需求。
六.在Springboot項(xiàng)目中使用RabbitMQ解決高并發(fā)問(wèn)題
在 Spring Boot 項(xiàng)目中使用 RabbitMQ 來(lái)實(shí)現(xiàn)消息傳輸,處理并發(fā)問(wèn)題是非常常見(jiàn)的一種方式。RabbitMQ 可以通過(guò)解耦應(yīng)用程序的不同部分,并將任務(wù)分發(fā)給多個(gè)消費(fèi)者,從而有效地解決并發(fā)和負(fù)載均衡問(wèn)題。
1.引入依賴
首先,需要在 pom.xml
文件中引入 spring-boot-starter-amqp
依賴,這樣 Spring Boot 就可以與 RabbitMQ 集成:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
2.配置RabbitMQ
在 application.yml
或 application.properties
文件中,配置 RabbitMQ 的連接信息:
spring: rabbitmq: host: localhost port: 15672 username: hmall password: 123 virtual-host: / listener: simple: concurrency: 3 # 設(shè)置消費(fèi)者的最小數(shù)量 max-concurrency: 10 # 設(shè)置消費(fèi)者的最大數(shù)量
3.RabbitMQ配置類
創(chuàng)建一個(gè)配置類,用于聲明隊(duì)列、交換機(jī)、綁定等。
package com.example.rabbitmq.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { // 聲明隊(duì)列 @Bean public Queue taskQueue() { return new Queue("taskQueue", true); // true表示隊(duì)列持久化 } // 聲明交換機(jī) @Bean public TopicExchange topicExchange() { return new TopicExchange("taskExchange"); } // 綁定隊(duì)列和交換機(jī) @Bean public Binding binding(Queue taskQueue, TopicExchange topicExchange) { return new Binding("taskQueue", Binding.DestinationType.QUEUE, "taskExchange", "task.#", // 使用路由鍵 null); } }
4.創(chuàng)建消息生產(chǎn)者(Producer)
創(chuàng)建一個(gè)消息生產(chǎn)者(Producer),它將消息發(fā)送到 RabbitMQ 隊(duì)列。
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class TaskProducer { @Autowired private RabbitTemplate rabbitTemplate; // 發(fā)送消息 public void sendMessage(String message) { System.out.println("Sending message: " + message); rabbitTemplate.convertAndSend("taskExchange", "task.new", message); // 發(fā)送到交換機(jī),使用路由鍵 } }
5.創(chuàng)建消息消費(fèi)者(Consumer)
消費(fèi)者從 RabbitMQ 隊(duì)列中異步接收消息,并進(jìn)行并發(fā)處理。在消費(fèi)者類中,使用 @RabbitListener
注解監(jiān)聽(tīng)隊(duì)列,確保多個(gè)消費(fèi)者可以同時(shí)處理消息。
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class TaskConsumer { // 監(jiān)聽(tīng)隊(duì)列并異步處理消息 @RabbitListener(queues = "taskQueue") public void receiveMessage(String message) { System.out.println("Received message: " + message); // 模擬處理消息的業(yè)務(wù)邏輯 try { Thread.sleep(1000); // 模擬耗時(shí)操作 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Finished processing message: " + message); } }
6.增加并發(fā)消費(fèi)者(多個(gè)消費(fèi)者)
為了處理高并發(fā),我們可以通過(guò)設(shè)置 concurrency
和 max-concurrency
來(lái)控制消費(fèi)者的最小和最大并發(fā)數(shù)。這樣,多個(gè)消費(fèi)者可以同時(shí)處理來(lái)自隊(duì)列的消息。我們已經(jīng)在 application.yml
中配置了 concurrency
,現(xiàn)在的配置允許最多啟動(dòng) 20 個(gè)消費(fèi)者來(lái)處理消息。
- 最小并發(fā)數(shù):設(shè)置為 5,意味著在任何時(shí)候都會(huì)至少啟動(dòng) 5 個(gè)消費(fèi)者。
- 最大并發(fā)數(shù):設(shè)置為 20,表示 RabbitMQ 可以根據(jù)負(fù)載增加最多 20 個(gè)消費(fèi)者。
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
java中1+1d/5和1+1/5的區(qū)別說(shuō)明
這篇文章主要介紹了java中1+1d/5和1+1/5的區(qū)別說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-10-10解決java main函數(shù)中的args數(shù)組傳值問(wèn)題
這篇文章主要介紹了解決java main函數(shù)中的args數(shù)組傳值問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-02-02Spring Boot 簡(jiǎn)單使用EhCache緩存框架的方法
本篇文章主要介紹了Spring Boot 簡(jiǎn)單使用EhCache緩存框架的方法,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-07-07SpringBoot 自定義注解異步記錄復(fù)雜日志詳解
這篇文章主要為大家介紹了SpringBoot 自定義注解異步記錄復(fù)雜日志詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-09-09MyBatis不用@Param傳遞多個(gè)參數(shù)的操作
這篇文章主要介紹了MyBatis不用@Param傳遞多個(gè)參數(shù)的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-02-02Java?IO流之StringWriter和StringReader用法分析
這篇文章主要介紹了Java?IO流之StringWriter和StringReader用法分析,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12IntelliJ IDEA 2023.2正式發(fā)布新UI和Profiler轉(zhuǎn)正(最新推薦)
北京時(shí)間2023年7月26日,IntelliJ IDEA 2023.2正式發(fā)布,IntelliJ IDEA 2023.2 引入 AI Assistant(AI助手),通過(guò)一組由 AI 提供支持的功能助力開(kāi)發(fā),今天給大家分享IntelliJ IDEA 2023.2正式發(fā)布新UI和Profiler轉(zhuǎn)正,感興趣的朋友一起看看吧2023-10-10