微服務架構之使用RabbitMQ進行異步處理方式
一.什么是RabbitMQ?
RabbitMQ 是一種流行的消息隊列(Message Queue)實現(xiàn),基于 AMQP 協(xié)議(Advanced Message Queuing Protocol)。它支持異步通信,使多個系統(tǒng)之間以非阻塞的方式交換數(shù)據(jù)。
在我們使用微服務的時候,微服務一旦拆分,必然涉及到服務之間的相互調用,目前我們服務之間調用采用的都是基于 OpenFeign 的調用。這種調用中,調用者發(fā)起請求后需要等待服務提供者執(zhí)行業(yè)務返回結果后,才能繼續(xù)執(zhí)行后面的業(yè)務。也就是說調用者在調用過程中處于阻塞狀態(tài),因此我們稱這種調用方式為同步調用,也可以叫同步通訊。
如果我們的業(yè)務需要實時得到服務提供方的響應,則應該選擇同步通訊(同步調用)。而如果我們追求更高的效率,并且不需要實時響應,則應該選擇異步通訊(異步調用)。
二.異步調用處理邏輯:
異步調用方式其實就是基于消息通知的方式,一般包含三個角色:
- 消息發(fā)送者:投遞消息的人,就是原來的調用方
- 消息Broker:管理、暫存、轉發(fā)消息,你可以把它理解成微信服務器
- 消息接收者:接收和處理消息的人,就是原來的服務提供方
除此之外還有:
- Exchange(交換機):用于路由消息。RabbitMQ 有多種交換機類型(如 direct、topic、fanout、headers),它們決定了消息如何被傳遞到隊列。
- Binding(綁定):連接交換機和隊列的規(guī)則。
在異步調用中,發(fā)送者不再直接同步調用接收者的業(yè)務接口,而是發(fā)送一條消息投遞給消息Broker。然后接收者根據(jù)自己的需求從消息Broker那里訂閱消息。每當發(fā)送方發(fā)送消息后,接受者都能獲取消息并處理。這樣,發(fā)送消息的人和接收消息的人就完全解耦了。
異步調用的優(yōu)勢包括:
- 耦合度更低
- 性能更好
- 業(yè)務拓展性強
- 故障隔離,避免級聯(lián)失敗
當然,異步通信也并非完美無缺,它存在下列缺點:
- 完全依賴于Broker的可靠性、安全性和性能
- 架構復雜,后期維護和調試麻煩
三.RabbitMQ的基本使用
下面是RabbitMQ的官網(wǎng):https://www.rabbitmq.com/
1.安裝
首先將RabbitMQ的鏡像拉取下來,然后運行下面命令:
docker run \ -e RABBITMQ_DEFAULT_USER=hmall \ -e RABBITMQ_DEFAULT_PASS=123 \ -v mq-plugins:/plugins \ --name mq \ --hostname mq \ -p 15672:15672 \ -p 5672:5672 \ --network hm-net\ -d \ rabbitmq:3.8-management
在 docker run
命令中,您將容器的兩個端口暴露到主機上:
-p 15672:15672
:將容器內部的15672
端口映射到主機的15672
端口。這個端口通常用于 RabbitMQ 的 Web 管理控制臺(Management Console)。-p 5672:5672
:將容器內部的5672
端口映射到主機的5672
端口。這個端口用于 AMQP 協(xié)議,即客戶端與 RabbitMQ 進行消息傳輸?shù)亩丝凇?/li>
隨后我們訪問http://虛擬機IP地址:15672來打開RabbitMQ的控制臺,默認賬號密碼是 guest
/ guest。
在控制臺上主要可以關注三個信息:Exchanges(交換機),Queues(隊列),Admin(用戶管理)。
2.架構圖
其中包含幾個概念:
publisher
:生產(chǎn)者,也就是發(fā)送消息的一方consumer
:消費者,也就是消費消息的一方queue
:隊列,存儲消息。生產(chǎn)者投遞的消息會暫存在消息隊列中,等待消費者處理exchange
:交換機,負責消息路由。生產(chǎn)者發(fā)送的消息由交換機決定投遞到哪個隊列。virtual host
:虛擬主機,起到數(shù)據(jù)隔離的作用。每個虛擬主機相互獨立,有各自的exchange、queue
RabbitMQ 使用 AMQP 協(xié)議,核心的消息模型包括:
- Producer 將消息發(fā)送到 Exchange。
- Exchange 將消息路由到 Queue。
- Consumer 從 Queue 獲取消息進行處理。
RabbitMQ 消息生命周期:
- 消息發(fā)布:Producer 將消息發(fā)送到 RabbitMQ。
- 消息存儲:消息通過交換機路由到一個或多個隊列,隊列暫存這些消息。
- 消息消費:Consumer 從隊列中獲取并處理消息。
3.RabbitMQ控制臺的使用
在 RabbitMQ 中,交換機(Exchange) 和 隊列(Queue) 是核心概念。它們之間的關系決定了消息的路由和存儲方式。
(1)Exchanges 交換機
- 交換機是 消息的路由器,負責決定消息應該被發(fā)送到哪個隊列。
- 生產(chǎn)者將消息發(fā)送給交換機,而不是直接發(fā)送到隊列。
- 交換機根據(jù)路由規(guī)則 決定消息的走向(即發(fā)往哪些隊列)。
Exchange(交換機)只負責轉發(fā)消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規(guī)則的隊列,那么消息會丟失!
RabbitMQ 提供了四種常用的交換機類型,每種類型的路由規(guī)則不同:
- Fanout:廣播,將消息交給所有綁定到交換機的隊列。我們最早在控制臺使用的正是Fanout交換機。
- Direct:訂閱,基于RoutingKey(路由key)發(fā)送給訂閱了消息的隊列
- Topic:通配符訂閱,與Direct類似,只不過RoutingKey可以使用通配符,可以進行模糊匹配。
- Headers:頭匹配,基于MQ的 header 信息來路由消息,用的較少。
我們可以再這里創(chuàng)建交換機,Name表示創(chuàng)建的交換機的名字,Type表示可以選擇交換機的四種類型。創(chuàng)建成功后就可以在上面看到創(chuàng)建的交換機名字:
比如我們點擊amq.fanout查看交換機數(shù)據(jù)并且可以發(fā)送消息給消費者。
注意?。。∪绻覀儾粚⒔粨Q機指定隊列的話,由于沒有消費者存在,最終消息丟失了,這樣說明交換機沒有存儲消息的能力。
所以下面我們要先創(chuàng)建隊列,然后讓生產(chǎn)者推送的消息經(jīng)過交換機的傳遞后,到達消息隊列,然后再給消費者。所以生產(chǎn)者無需知道隊列的存在以此來達到解耦的效果。
(2)Queues 隊列
- 隊列用于 存儲消息,直到消費者消費它們。
- 隊列與消費者一一對應,即一個消費者從一個隊列讀取消息。
- 隊列按 FIFO(First In, First Out)的順序存儲消息。
交換機與隊列的關系:
交換機(Exchange)是消息的路由器,它決定了將消息發(fā)送到哪個隊列。隊列(Queue)是消息的存儲和處理地方。交換機本身并不知道具體的隊列,它只是通過綁定(Binding)來決定消息應該被路由到哪些隊列。
交換機將消息通過路由鍵(Routing Key)發(fā)送到綁定的隊列,但交換機和隊列之間的連接并不是自動的,需要顯式地設置綁定。綁定指定了 交換機 和 隊列 之間的關系,以及 路由規(guī)則(例如,路由鍵匹配的規(guī)則)。
在這里我們填寫隊列名字即可,其他暫時可以不用填寫。
隨后我們向交換機進行綁定(bind)隊列,隨后通過隊列傳輸給消費者。
這里的Routing key的出現(xiàn)是為了讓 Direct (交換機的類型)能夠選擇隊列而存在的。
我們在綁定隊列完成后會出現(xiàn)下面這樣,這樣證明我們成功為交換機綁定好兩個隊列:
隨后我們在下面窗口推送消息:
(3)Admin
①Users(用戶管理):
管理 RabbitMQ 中的用戶賬號,在這里 添加、刪除用戶,并設置每個用戶的權限。
每個用戶可分配不同的 角色:
- administrator:管理員,具有所有權限。
- monitoring:可以監(jiān)控和查看信息,但不能管理。
- policymaker:可以設置策略和參數(shù)。
- management:可以訪問管理界面但沒有策略權限。
Name
:itheima
,也就是用戶名Tags
:administrator
,說明itheima
用戶是超級管理員,擁有所有權限Can access virtual host
:/
,可以訪問的virtual host
,這里的/
是默認的virtual host
②Virtual Hosts(虛擬主機):
將 RabbitMQ 服務器劃分為多個 虛擬主機(vhost),類似于一個獨立的命名空間。
- 不同的應用可以使用不同的虛擬主機,彼此隔離。
- 每個虛擬主機都有自己的 交換機、隊列和用戶權限。
四.SpringAMOP的使用
Spring AMQP(Spring for Advanced Message Queuing Protocol)是 Spring 提供的一個消息隊列集成模塊,主要用于簡化與 RabbitMQ 的集成。它通過 AMQP 協(xié)議來實現(xiàn)消息的生產(chǎn)和消費。
- Publisher:生產(chǎn)者,不再發(fā)送消息到隊列中,而是發(fā)給交換機
- Exchange:交換機,一方面,接收生產(chǎn)者發(fā)送的消息。另一方面,知道如何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。
- Queue:消息隊列也與以前一樣,接收消息、緩存消息。不過隊列一定要與交換機綁定。
- Consumer:消費者,與以前一樣,訂閱隊列,沒有變化
1.導入依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.添加配置
在publisher以及consumer
服務的application.yml
中添加配置:
spring: rabbitmq: host: 192.168.150.101 # 你的虛擬機IP port: 5672 # 端口 virtual-host: /hmall # 虛擬主機 username: hmall # 用戶名 password: 123 # 密碼 listener: simple: prefetch: 1 # (能者多勞)每次只能獲取一條消息,處理完成才能獲取下一個消息
5672
端口用于 AMQP 協(xié)議,通常用于客戶端與 RabbitMQ 進行消息傳輸。,所以這里port綁定的是5672。
在 RabbitMQ 中,guest
用戶默認只能在 本地 連接到 RabbitMQ 實例。如果你希望使用 guest
用戶進行遠程連接(即從非本地機器連接 RabbitMQ),RabbitMQ 默認是 不允許 的。
3.在publisher服務中利用RabbitTemplate實現(xiàn)消息發(fā)送
RabbitTemplate 解釋:
RabbitTemplate
是 Spring AMQP 的核心實現(xiàn)類,它實現(xiàn)了 AmqpTemplate
接口,并且對 AmqpTemplate
提供了一些更高層次的封裝,簡化了消息發(fā)送和接收的操作。
RabbitTemplate
是大多數(shù) Spring AMQP 用戶的首選,它提供了很多便捷的方法和默認的行為,使得消息交互變得更加簡單。
主要方法:
convertAndSend
: 與AmqpTemplate
相同,提供了消息轉換并發(fā)送的功能。receiveAndConvert
: 從隊列中接收消息并轉換成 Java 對象。send
: 發(fā)送消息,不進行轉換。receive
: 從隊列中接收消息。
RabbitTemplate
為發(fā)送消息提供了更豐富的功能,如消息轉換器、默認交換機支持等,通常適用于大多數(shù)使用 Spring 的場景。
(1) 發(fā)送消息(convertAndSend)
rabbitTemplate.convertAndSend(String exchange, String routingKey, Object message);
exchange
:指定交換機的名稱。routingKey
:消息的路由鍵。message
:消息體,可以是任何對象,RabbitTemplate
會自動將其轉換為消息格式(例如,JSON、文本等)。
此方法的作用是將消息發(fā)送到指定交換機,并且根據(jù)給定的路由鍵路由到相關隊列。
實例:
rabbitTemplate.convertAndSend("pay.topic", "pay.success", "支付成功的消息");
這里消息 "支付成功的消息"
被發(fā)送到 pay.topic
交換機,使用 pay.success
路由鍵路由到對應的隊列。
(2)接收消息(receiveAndConvert)
public <T> T receiveAndConvert(String queueName);
queueName
:消息隊列的名稱。- 返回類型為
<T>
,表示接收到的消息會被轉換為指定的類型。
receiveAndConvert
會從指定的隊列接收消息,并自動將消息轉換成目標對象類型。如果消息體不是可轉換的,方法將拋出異常。
String message = (String) rabbitTemplate.receiveAndConvert("pay.success.queue"); System.out.println("收到的消息: " + message);
這里,pay.success.queue
隊列中的消息將被接收,并自動轉換為 String
類型。
還有一種就是 receive 方法:
public Message receive(String queueName);
queueName
:隊列名稱。- 返回的是
Message
類型,而不是直接轉換成對象。你可以通過Message
對象獲取消息的內容和其他屬性。
實例:
Message message = rabbitTemplate.receive("pay.success.queue"); if (message != null) { System.out.println("收到的消息: " + new String(message.getBody())); }
(3)發(fā)送消息并獲取響應(convertSendAndReceive)
public <T> T convertSendAndReceive(String exchange, String routingKey, Object message);
exchange
:交換機名稱。routingKey
:路由鍵。message
:發(fā)送的消息對象。
convertSendAndReceive
方法既發(fā)送消息到交換機,也等待從隊列返回響應。它會根據(jù)指定的路由鍵將消息發(fā)送到交換機,并等待響應消息,然后將響應轉換為返回類型。
實例:
String response = (String) rabbitTemplate.convertSendAndReceive("pay.topic", "pay.success", "請求處理"); System.out.println("收到響應: " + response);
(4)發(fā)送消息到特定隊列
public void send(String exchange, String routingKey, Message message);
exchange
:交換機名稱。routingKey
:路由鍵。message
:消息對象(可以是Message
類型)。
send
方法允許你將一個 Message
對象發(fā)送到指定的交換機,并根據(jù)路由鍵進行路由。
Message message = new Message("支付成功".getBytes()); rabbitTemplate.send("pay.topic", "pay.success", message);
(5)設置消息監(jiān)聽器
RabbitTemplate
本身并不直接處理消息監(jiān)聽(接收消息),但是可以通過設置 RabbitListener
來監(jiān)聽消息,并將其與 RabbitTemplate
配合使用。一般來說,消息接收和處理是在 @RabbitListener
注解的監(jiān)聽器方法中完成的。
@RabbitListener(queues = "pay.success.queue") public void receiveMessage(String message) { System.out.println("接收到消息: " + message); }
下面看一個代碼實例:
@SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSimpleQueue() { // 隊列名稱 String queueName = "simple.queue"; // 消息 String message = "hello, spring amqp!"; // 發(fā)送消息 rabbitTemplate.convertAndSend(queueName, message); } }
4.定義消費者實現(xiàn)異步調用
@Component @RequiredArgsConstructor public class PayStatusListener { private final IOrderService orderService; @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "trade.pay.success.queue", durable = "true"), exchange = @Exchange(name = "pay.topic"), key = "pay.success" )) public void listenPaySuccess(Long orderId){ //調用方法 orderService.markOrderPaySuccess(orderId); } }
①@RequiredArgsConstructor
這是 Lombok 提供的注解,自動為類中所有 final
修飾的字段生成一個包含這些字段的構造函數(shù)。
使用這個注解可以免去手動編寫構造函數(shù)的麻煩,尤其是在使用依賴注入時(例如注入 IOrderService
)。
②@RabbitListener
@RabbitListener
注解用于監(jiān)聽來自 RabbitMQ 隊列的消息。
它會自動監(jiān)聽指定的隊列,當有消息到達時,會觸發(fā) listenPaySuccess
方法進行處理。
③QueueBinding(隊列綁定)
通過 @QueueBinding
注解,綁定了 隊列 和 交換機,并指定了 路由鍵。
@Queue
name = "trade.pay.success.queue"
:指定隊列的名稱為trade.pay.success.queue
。durable = "true"
:表示隊列是 持久化 的,即 RabbitMQ 重啟后隊列依然存在。- 隊列的作用:隊列是消息的臨時存儲地,消費者會從隊列中拉取消息并處理。
@Exchange
name = "pay.topic"
:指定交換機的名稱為pay.topic
,這是一個 Topic Exchange(主題交換機)。- 交換機的作用:交換機決定消息如何路由到隊列。Topic Exchange 可以根據(jù)路由鍵的匹配規(guī)則將消息路由到合適的隊列。
key = "pay.success"
- 路由鍵:指定了路由鍵為
pay.success
。這意味著當生產(chǎn)者發(fā)送的消息路由鍵是pay.success
時,消息將被路由到trade.pay.success.queue
隊列。
5.總流程處理過程
- 生產(chǎn)者:在支付成功后,生產(chǎn)者會發(fā)送一條消息到
pay.topic
交換機,消息的路由鍵為pay.success
。 - 交換機:
pay.topic
交換機會根據(jù)路由鍵pay.success
將消息路由到trade.pay.success.queue
隊列。 - 消費者:
PayStatusListener
作為消費者監(jiān)聽trade.pay.success.queue
,當有消息到達隊列時,它會接收到訂單 ID 并調用訂單服務更新訂單狀態(tài)。
五.使用配置類管理定義交換機,隊列及兩者關系
在 Spring AMQP 中,交換機(Exchange)、隊列(Queue)、以及綁定(Binding)可以通過配置類來定義和管理。配置類可以幫助你靈活地創(chuàng)建和綁定交換機與隊列,并且可以根據(jù)業(yè)務需求自定義各種參數(shù)。
創(chuàng)建配置類效果展示:
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { // 創(chuàng)建隊列 @Bean public Queue queue() { return new Queue("trade.pay.success.queue", true); // durable=true 表示隊列持久化 } // 創(chuàng)建交換機 @Bean public TopicExchange exchange() { return new TopicExchange("pay.topic"); // 創(chuàng)建主題交換機 } // 創(chuàng)建綁定關系(隊列與交換機通過 routing key 綁定) @Bean public Binding binding(Queue queue, TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("pay.success"); // 路由鍵是 pay.success } }
1.創(chuàng)建隊列:Queue
@Bean public Queue queue() { return new Queue("trade.pay.success.queue", true); }
作用:通過 @Bean
注解定義了一個隊列 Bean。Spring 容器會自動管理這個隊列,并在 RabbitMQ 上創(chuàng)建該隊列。
參數(shù):
"trade.pay.success.queue"
:這是隊列的名稱。每個隊列在 RabbitMQ 中必須有唯一的名稱。true
:表示這個隊列是 持久化 的。持久化的隊列在 RabbitMQ 服務重啟后依然存在。- 隊列還可以設置其他屬性,例如是否自動刪除(
autoDelete
)、是否排他性(exclusive
)等,默認是false
。
2.創(chuàng)建交換機:Exchange
@Bean public TopicExchange exchange() { return new TopicExchange("pay.topic"); }
- 作用:通過
@Bean
注解定義了一個 Topic Exchange 類型的交換機。 - 參數(shù):
"pay.topic"
:這是交換機的名稱。同樣,交換機在 RabbitMQ 中也必須有唯一的名稱。TopicExchange
支持通過通配符(如*
和#
)進行更靈活的消息匹配。
Topic Exchange 是一種交換機類型,它允許使用通配符來進行路由。例如,路由鍵可以是 "pay.*"
,可以匹配 "pay.success"
或 "pay.failure"
。在這里可以使用四種交換機類型來定義交換機,具體場景具體分析使用。
3.創(chuàng)建綁定關系:Binding
@Bean public Binding binding(Queue queue, TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("pay.success"); }
作用:通過 @Bean
注解定義了隊列和交換機的綁定關系。這個綁定決定了消息在何種條件下會從交換機路由到隊列。
參數(shù):
queue
:隊列trade.pay.success.queue
被傳入,表示將該隊列與交換機綁定。exchange
:交換機pay.topic
被傳入,表示將隊列與該交換機綁定。.with("pay.success")
:指定了路由鍵pay.success
,路由鍵pay.success
會告訴交換機,如果生產(chǎn)者發(fā)送的消息帶有這個路由鍵,那么該消息就會路由到trade.pay.success.queue
隊列。
4.多個隊列綁定到同一個交換機
我們可以將多個隊列綁定到同一個交換機,并使用不同的路由鍵。這樣可以實現(xiàn)根據(jù)不同的路由鍵來發(fā)送不同類型的消息到各自的隊列。
@Bean public Queue paySuccessQueue() { return new Queue("pay.success.queue", true); } @Bean public Queue payFailureQueue() { return new Queue("pay.failure.queue", true); } @Bean public Binding paySuccessBinding(Queue paySuccessQueue, TopicExchange exchange) { return BindingBuilder.bind(paySuccessQueue).to(exchange).with("pay.success"); } @Bean public Binding payFailureBinding(Queue payFailureQueue, TopicExchange exchange) { return BindingBuilder.bind(payFailureQueue).to(exchange).with("pay.failure"); }
- 在這個例子中,
pay.success.queue
和pay.failure.queue
都綁定到同一個交換機pay.topic
,但使用不同的路由鍵。 - 消息路由邏輯:
- 當生產(chǎn)者發(fā)送路由鍵為
pay.success
的消息時,消息會路由到pay.success.queue
隊列。 - 當生產(chǎn)者發(fā)送路由鍵為
pay.failure
的消息時,消息會路由到pay.failure.queue
隊列。
- 當生產(chǎn)者發(fā)送路由鍵為
BindingBuilder.bind(paySuccessQueue)
:paySuccessQueue
參數(shù)就是綁定到交換機的隊列對象,也就是pay.success.queue
。它會被傳遞給BindingBuilder
,并且指定此隊列和交換機的綁定關系。.to(exchange)
:這里將隊列和交換機綁定,指定消息通過交換機路由到隊列。.with("pay.success")
:指定路由鍵為pay.success
,意思是只有路由鍵為pay.success
的消息才會被路由到paySuccessQueue
隊列中。
在 Spring AMQP 中,隊列對象(如 paySuccessQueue
)是由 @Bean
注解的 Queue
類型方法返回的。Spring 會自動將返回的隊列對象放入到 Spring 容器中,并注入到需要它的地方。所以當我們在 Binding
中使用 paySuccessQueue
時,實際上是在引用之前構造并注冊到 Spring 容器中的隊列實例。
在 Spring 的上下文中,paySuccessQueue
和 payFailureQueue
是已經(jīng)被創(chuàng)建并管理的隊列對象,我們不需要手動創(chuàng)建隊列,只需要在 Binding
中通過引用這些對象來建立隊列與交換機之間的關系。
5.配置不同類型的交換機
除了 Topic Exchange,RabbitMQ 還支持其他幾種常見的交換機類型。這里分別演示如何創(chuàng)建 Direct Exchange、Fanout Exchange 和 Headers Exchange。
(1)Direct Exchange
@Bean public DirectExchange directExchange() { return new DirectExchange("direct.exchange"); } @Bean public Binding directBinding(Queue queue, DirectExchange directExchange) { return BindingBuilder.bind(queue).to(directExchange).with("direct.routing.key"); }
Direct Exchange:直接交換機會根據(jù) 完全匹配的路由鍵 將消息發(fā)送到隊列。只有當消息的路由鍵和綁定的路由鍵 完全一致 時,消息才會被路由到指定隊列。
(2)Fanout Exchange
@Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanout.exchange"); } @Bean public Binding fanoutBinding(Queue queue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queue).to(fanoutExchange); // 不需要路由鍵 }
Fanout Exchange:扇出交換機會將消息發(fā)送到所有綁定的隊列,不需要考慮路由鍵。這個交換機通常用于廣播消息。
(3)Headers Exchange
@Bean public HeadersExchange headersExchange() { return new HeadersExchange("headers.exchange"); } @Bean public Binding headersBinding(Queue queue, HeadersExchange headersExchange) { return BindingBuilder.bind(queue).to(headersExchange).where("header-key").matches("header-value"); }
Headers Exchange:頭交換機根據(jù)消息頭的內容進行路由,而不是依賴路由鍵。適用于按消息的元數(shù)據(jù)進行路由的場景。
總結:
通過 Spring AMQP 的配置類,你可以非常靈活地定義 RabbitMQ 的 交換機、隊列 和 綁定關系,并通過不同的路由鍵和交換機類型實現(xiàn)復雜的消息路由邏輯。
以下是一些關鍵要點:
- 隊列(Queue):消息的臨時存儲地,可以是持久化的。
- 交換機(Exchange):控制消息如何分發(fā)到不同的隊列。
- Direct Exchange:嚴格匹配路由鍵。
- Topic Exchange:支持通配符匹配路由鍵。
- Fanout Exchange:廣播消息到所有綁定的隊列。
- Headers Exchange:根據(jù)消息頭的內容進行路由。
- 綁定(Binding):將隊列與交換機連接起來,使用路由鍵來決定消息的流向。
通過配置類來定義這些組件,能夠簡化 RabbitMQ 與 Spring 應用的集成,并且通過靈活的路由規(guī)則支持復雜的消息傳遞需求。
六.在Springboot項目中使用RabbitMQ解決高并發(fā)問題
在 Spring Boot 項目中使用 RabbitMQ 來實現(xiàn)消息傳輸,處理并發(fā)問題是非常常見的一種方式。RabbitMQ 可以通過解耦應用程序的不同部分,并將任務分發(fā)給多個消費者,從而有效地解決并發(fā)和負載均衡問題。
1.引入依賴
首先,需要在 pom.xml
文件中引入 spring-boot-starter-amqp
依賴,這樣 Spring Boot 就可以與 RabbitMQ 集成:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
2.配置RabbitMQ
在 application.yml
或 application.properties
文件中,配置 RabbitMQ 的連接信息:
spring: rabbitmq: host: localhost port: 15672 username: hmall password: 123 virtual-host: / listener: simple: concurrency: 3 # 設置消費者的最小數(shù)量 max-concurrency: 10 # 設置消費者的最大數(shù)量
3.RabbitMQ配置類
創(chuàng)建一個配置類,用于聲明隊列、交換機、綁定等。
package com.example.rabbitmq.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { // 聲明隊列 @Bean public Queue taskQueue() { return new Queue("taskQueue", true); // true表示隊列持久化 } // 聲明交換機 @Bean public TopicExchange topicExchange() { return new TopicExchange("taskExchange"); } // 綁定隊列和交換機 @Bean public Binding binding(Queue taskQueue, TopicExchange topicExchange) { return new Binding("taskQueue", Binding.DestinationType.QUEUE, "taskExchange", "task.#", // 使用路由鍵 null); } }
4.創(chuàng)建消息生產(chǎn)者(Producer)
創(chuàng)建一個消息生產(chǎn)者(Producer),它將消息發(fā)送到 RabbitMQ 隊列。
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class TaskProducer { @Autowired private RabbitTemplate rabbitTemplate; // 發(fā)送消息 public void sendMessage(String message) { System.out.println("Sending message: " + message); rabbitTemplate.convertAndSend("taskExchange", "task.new", message); // 發(fā)送到交換機,使用路由鍵 } }
5.創(chuàng)建消息消費者(Consumer)
消費者從 RabbitMQ 隊列中異步接收消息,并進行并發(fā)處理。在消費者類中,使用 @RabbitListener
注解監(jiān)聽隊列,確保多個消費者可以同時處理消息。
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class TaskConsumer { // 監(jiān)聽隊列并異步處理消息 @RabbitListener(queues = "taskQueue") public void receiveMessage(String message) { System.out.println("Received message: " + message); // 模擬處理消息的業(yè)務邏輯 try { Thread.sleep(1000); // 模擬耗時操作 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Finished processing message: " + message); } }
6.增加并發(fā)消費者(多個消費者)
為了處理高并發(fā),我們可以通過設置 concurrency
和 max-concurrency
來控制消費者的最小和最大并發(fā)數(shù)。這樣,多個消費者可以同時處理來自隊列的消息。我們已經(jīng)在 application.yml
中配置了 concurrency
,現(xiàn)在的配置允許最多啟動 20 個消費者來處理消息。
- 最小并發(fā)數(shù):設置為 5,意味著在任何時候都會至少啟動 5 個消費者。
- 最大并發(fā)數(shù):設置為 20,表示 RabbitMQ 可以根據(jù)負載增加最多 20 個消費者。
總結
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
解決java main函數(shù)中的args數(shù)組傳值問題
這篇文章主要介紹了解決java main函數(shù)中的args數(shù)組傳值問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-02-02Spring Boot 簡單使用EhCache緩存框架的方法
本篇文章主要介紹了Spring Boot 簡單使用EhCache緩存框架的方法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-07-07Java?IO流之StringWriter和StringReader用法分析
這篇文章主要介紹了Java?IO流之StringWriter和StringReader用法分析,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12IntelliJ IDEA 2023.2正式發(fā)布新UI和Profiler轉正(最新推薦)
北京時間2023年7月26日,IntelliJ IDEA 2023.2正式發(fā)布,IntelliJ IDEA 2023.2 引入 AI Assistant(AI助手),通過一組由 AI 提供支持的功能助力開發(fā),今天給大家分享IntelliJ IDEA 2023.2正式發(fā)布新UI和Profiler轉正,感興趣的朋友一起看看吧2023-10-10