欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

微服務(wù)架構(gòu)之使用RabbitMQ進(jìn)行異步處理方式

 更新時(shí)間:2025年02月05日 14:41:22   作者:記得開(kāi)心一點(diǎn)嘛  
本文介紹了RabbitMQ的基本概念、異步調(diào)用處理邏輯、RabbitMQ的基本使用方法以及在Spring Boot項(xiàng)目中使用RabbitMQ解決高并發(fā)問(wèn)題,RabbitMQ是一種流行的消息隊(duì)列實(shí)現(xiàn),支持異步通信,可以有效解耦應(yīng)用程序的不同部分,并將任務(wù)分發(fā)給多個(gè)消費(fèi)者

一.什么是RabbitMQ?

RabbitMQ 是一種流行的消息隊(duì)列(Message Queue)實(shí)現(xiàn),基于 AMQP 協(xié)議(Advanced Message Queuing Protocol)。它支持異步通信,使多個(gè)系統(tǒng)之間以非阻塞的方式交換數(shù)據(jù)。

在我們使用微服務(wù)的時(shí)候,微服務(wù)一旦拆分,必然涉及到服務(wù)之間的相互調(diào)用,目前我們服務(wù)之間調(diào)用采用的都是基于 OpenFeign 的調(diào)用。這種調(diào)用中,調(diào)用者發(fā)起請(qǐng)求后需要等待服務(wù)提供者執(zhí)行業(yè)務(wù)返回結(jié)果后,才能繼續(xù)執(zhí)行后面的業(yè)務(wù)。也就是說(shuō)調(diào)用者在調(diào)用過(guò)程中處于阻塞狀態(tài),因此我們稱這種調(diào)用方式為同步調(diào)用,也可以叫同步通。

如果我們的業(yè)務(wù)需要實(shí)時(shí)得到服務(wù)提供方的響應(yīng),則應(yīng)該選擇同步通訊(同步調(diào)用)。而如果我們追求更高的效率,并且不需要實(shí)時(shí)響應(yīng),則應(yīng)該選擇異步通訊(異步調(diào)用)。

二.異步調(diào)用處理邏輯:

異步調(diào)用方式其實(shí)就是基于消息通知的方式,一般包含三個(gè)角色:

  • 消息發(fā)送者:投遞消息的人,就是原來(lái)的調(diào)用方
  • 消息Broker:管理、暫存、轉(zhuǎn)發(fā)消息,你可以把它理解成微信服務(wù)器
  • 消息接收者:接收和處理消息的人,就是原來(lái)的服務(wù)提供方

除此之外還有:

  • Exchange(交換機(jī)):用于路由消息。RabbitMQ 有多種交換機(jī)類型(如 direct、topic、fanout、headers),它們決定了消息如何被傳遞到隊(duì)列。
  • Binding(綁定):連接交換機(jī)和隊(duì)列的規(guī)則。

在異步調(diào)用中,發(fā)送者不再直接同步調(diào)用接收者的業(yè)務(wù)接口,而是發(fā)送一條消息投遞給消息Broker。然后接收者根據(jù)自己的需求從消息Broker那里訂閱消息。每當(dāng)發(fā)送方發(fā)送消息后,接受者都能獲取消息并處理。這樣,發(fā)送消息的人和接收消息的人就完全解耦了。

異步調(diào)用的優(yōu)勢(shì)包括:

  • 耦合度更低
  • 性能更好
  • 業(yè)務(wù)拓展性強(qiáng)
  • 故障隔離,避免級(jí)聯(lián)失敗

當(dāng)然,異步通信也并非完美無(wú)缺,它存在下列缺點(diǎn):

  • 完全依賴于Broker的可靠性、安全性和性能
  • 架構(gòu)復(fù)雜,后期維護(hù)和調(diào)試麻煩

三.RabbitMQ的基本使用

下面是RabbitMQ的官網(wǎng):https://www.rabbitmq.com/

1.安裝

首先將RabbitMQ的鏡像拉取下來(lái),然后運(yùn)行下面命令:

docker run \
 -e RABBITMQ_DEFAULT_USER=hmall \
 -e RABBITMQ_DEFAULT_PASS=123 \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq \
 -p 15672:15672 \
 -p 5672:5672 \
 --network hm-net\
 -d \
 rabbitmq:3.8-management

docker run 命令中,您將容器的兩個(gè)端口暴露到主機(jī)上:

  • -p 15672:15672:將容器內(nèi)部的 15672 端口映射到主機(jī)的 15672 端口。這個(gè)端口通常用于 RabbitMQ 的 Web 管理控制臺(tái)(Management Console)。
  • -p 5672:5672:將容器內(nèi)部的 5672 端口映射到主機(jī)的 5672 端口。這個(gè)端口用于 AMQP 協(xié)議,即客戶端與 RabbitMQ 進(jìn)行消息傳輸?shù)亩丝凇?/li>

隨后我們?cè)L問(wèn)http://虛擬機(jī)IP地址:15672來(lái)打開(kāi)RabbitMQ的控制臺(tái),默認(rèn)賬號(hào)密碼是 guest / guest。

在控制臺(tái)上主要可以關(guān)注三個(gè)信息:Exchanges(交換機(jī)),Queues(隊(duì)列),Admin(用戶管理)。

2.架構(gòu)圖

其中包含幾個(gè)概念:

  • publisher:生產(chǎn)者,也就是發(fā)送消息的一方
  • consumer:消費(fèi)者,也就是消費(fèi)消息的一方
  • queue:隊(duì)列,存儲(chǔ)消息。生產(chǎn)者投遞的消息會(huì)暫存在消息隊(duì)列中,等待消費(fèi)者處理
  • exchange:交換機(jī),負(fù)責(zé)消息路由。生產(chǎn)者發(fā)送的消息由交換機(jī)決定投遞到哪個(gè)隊(duì)列。
  • virtual host:虛擬主機(jī),起到數(shù)據(jù)隔離的作用。每個(gè)虛擬主機(jī)相互獨(dú)立,有各自的exchange、queue

RabbitMQ 使用 AMQP 協(xié)議,核心的消息模型包括:

  1. Producer 將消息發(fā)送到 Exchange。
  2. Exchange 將消息路由到 Queue
  3. ConsumerQueue 獲取消息進(jìn)行處理。

RabbitMQ 消息生命周期

  1. 消息發(fā)布:Producer 將消息發(fā)送到 RabbitMQ。
  2. 消息存儲(chǔ):消息通過(guò)交換機(jī)路由到一個(gè)或多個(gè)隊(duì)列,隊(duì)列暫存這些消息。
  3. 消息消費(fèi):Consumer 從隊(duì)列中獲取并處理消息。

3.RabbitMQ控制臺(tái)的使用

RabbitMQ 中,交換機(jī)(Exchange)隊(duì)列(Queue) 是核心概念。它們之間的關(guān)系決定了消息的路由和存儲(chǔ)方式。

(1)Exchanges 交換機(jī)

  • 交換機(jī)是 消息的路由器,負(fù)責(zé)決定消息應(yīng)該被發(fā)送到哪個(gè)隊(duì)列。
  • 生產(chǎn)者將消息發(fā)送給交換機(jī),而不是直接發(fā)送到隊(duì)列。
  • 交換機(jī)根據(jù)路由規(guī)則 決定消息的走向(即發(fā)往哪些隊(duì)列)。

Exchange(交換機(jī))只負(fù)責(zé)轉(zhuǎn)發(fā)消息,不具備存儲(chǔ)消息的能力,因此如果沒(méi)有任何隊(duì)列與Exchange綁定,或者沒(méi)有符合路由規(guī)則的隊(duì)列,那么消息會(huì)丟失!

RabbitMQ 提供了四種常用的交換機(jī)類型,每種類型的路由規(guī)則不同:

  • Fanout:廣播,將消息交給所有綁定到交換機(jī)的隊(duì)列。我們最早在控制臺(tái)使用的正是Fanout交換機(jī)。
  • Direct:訂閱,基于RoutingKey(路由key)發(fā)送給訂閱了消息的隊(duì)列
  • Topic:通配符訂閱,與Direct類似,只不過(guò)RoutingKey可以使用通配符,可以進(jìn)行模糊匹配。
  • Headers:頭匹配,基于MQ的 header 信息來(lái)路由消息,用的較少。

我們可以再這里創(chuàng)建交換機(jī),Name表示創(chuàng)建的交換機(jī)的名字,Type表示可以選擇交換機(jī)的四種類型。創(chuàng)建成功后就可以在上面看到創(chuàng)建的交換機(jī)名字:

比如我們點(diǎn)擊amq.fanout查看交換機(jī)數(shù)據(jù)并且可以發(fā)送消息給消費(fèi)者。

注意!??!如果我們不將交換機(jī)指定隊(duì)列的話,由于沒(méi)有消費(fèi)者存在,最終消息丟失了,這樣說(shuō)明交換機(jī)沒(méi)有存儲(chǔ)消息的能力。

所以下面我們要先創(chuàng)建隊(duì)列,然后讓生產(chǎn)者推送的消息經(jīng)過(guò)交換機(jī)的傳遞后,到達(dá)消息隊(duì)列,然后再給消費(fèi)者。所以生產(chǎn)者無(wú)需知道隊(duì)列的存在以此來(lái)達(dá)到解耦的效果。

(2)Queues 隊(duì)列

  • 隊(duì)列用于 存儲(chǔ)消息,直到消費(fèi)者消費(fèi)它們。
  • 隊(duì)列與消費(fèi)者一一對(duì)應(yīng),即一個(gè)消費(fèi)者從一個(gè)隊(duì)列讀取消息。
  • 隊(duì)列按 FIFO(First In, First Out)的順序存儲(chǔ)消息。

交換機(jī)與隊(duì)列的關(guān)系:

交換機(jī)(Exchange)是消息的路由器,它決定了將消息發(fā)送到哪個(gè)隊(duì)列。隊(duì)列(Queue)是消息的存儲(chǔ)和處理地方。交換機(jī)本身并不知道具體的隊(duì)列,它只是通過(guò)綁定(Binding)來(lái)決定消息應(yīng)該被路由到哪些隊(duì)列。

交換機(jī)將消息通過(guò)路由鍵(Routing Key)發(fā)送到綁定的隊(duì)列,但交換機(jī)和隊(duì)列之間的連接并不是自動(dòng)的,需要顯式地設(shè)置綁定。綁定指定了 交換機(jī)隊(duì)列 之間的關(guān)系,以及 路由規(guī)則(例如,路由鍵匹配的規(guī)則)。

在這里我們填寫(xiě)隊(duì)列名字即可,其他暫時(shí)可以不用填寫(xiě)。

隨后我們向交換機(jī)進(jìn)行綁定(bind)隊(duì)列,隨后通過(guò)隊(duì)列傳輸給消費(fèi)者。

這里的Routing key的出現(xiàn)是為了讓 Direct (交換機(jī)的類型)能夠選擇隊(duì)列而存在的。

我們?cè)诮壎?duì)列完成后會(huì)出現(xiàn)下面這樣,這樣證明我們成功為交換機(jī)綁定好兩個(gè)隊(duì)列:

隨后我們?cè)谙旅娲翱谕扑拖ⅲ?/p>

(3)Admin

①Users(用戶管理):

管理 RabbitMQ 中的用戶賬號(hào),在這里 添加、刪除用戶,并設(shè)置每個(gè)用戶的權(quán)限。

每個(gè)用戶可分配不同的 角色

  • administrator:管理員,具有所有權(quán)限。
  • monitoring:可以監(jiān)控和查看信息,但不能管理。
  • policymaker:可以設(shè)置策略和參數(shù)。
  • management:可以訪問(wèn)管理界面但沒(méi)有策略權(quán)限。

  • Nameitheima,也就是用戶名
  • Tagsadministrator,說(shuō)明itheima用戶是超級(jí)管理員,擁有所有權(quán)限
  • Can access virtual host/,可以訪問(wèn)的virtual host,這里的/是默認(rèn)的virtual host

②Virtual Hosts(虛擬主機(jī)):

將 RabbitMQ 服務(wù)器劃分為多個(gè) 虛擬主機(jī)(vhost),類似于一個(gè)獨(dú)立的命名空間。

  • 不同的應(yīng)用可以使用不同的虛擬主機(jī),彼此隔離。
  • 每個(gè)虛擬主機(jī)都有自己的 交換機(jī)、隊(duì)列和用戶權(quán)限。

四.SpringAMOP的使用

Spring AMQPSpring for Advanced Message Queuing Protocol)是 Spring 提供的一個(gè)消息隊(duì)列集成模塊,主要用于簡(jiǎn)化與 RabbitMQ 的集成。它通過(guò) AMQP 協(xié)議來(lái)實(shí)現(xiàn)消息的生產(chǎn)和消費(fèi)。

  • Publisher:生產(chǎn)者,不再發(fā)送消息到隊(duì)列中,而是發(fā)給交換機(jī)
  • Exchange:交換機(jī),一方面,接收生產(chǎn)者發(fā)送的消息。另一方面,知道如何處理消息,例如遞交給某個(gè)特別隊(duì)列、遞交給所有隊(duì)列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。
  • Queue:消息隊(duì)列也與以前一樣,接收消息、緩存消息。不過(guò)隊(duì)列一定要與交換機(jī)綁定。
  • Consumer:消費(fèi)者,與以前一樣,訂閱隊(duì)列,沒(méi)有變化

1.導(dǎo)入依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.添加配置

publisher以及consumer服務(wù)的application.yml中添加配置:

spring:
  rabbitmq:
    host: 192.168.150.101 # 你的虛擬機(jī)IP
    port: 5672 # 端口
    virtual-host: /hmall # 虛擬主機(jī)
    username: hmall # 用戶名
    password: 123 # 密碼
    listener:
      simple:
        prefetch: 1 # (能者多勞)每次只能獲取一條消息,處理完成才能獲取下一個(gè)消息

5672 端口用于 AMQP 協(xié)議,通常用于客戶端與 RabbitMQ 進(jìn)行消息傳輸。,所以這里port綁定的是5672。

在 RabbitMQ 中,guest 用戶默認(rèn)只能在 本地 連接到 RabbitMQ 實(shí)例。如果你希望使用 guest 用戶進(jìn)行遠(yuǎn)程連接(即從非本地機(jī)器連接 RabbitMQ),RabbitMQ 默認(rèn)是 不允許 的。

3.在publisher服務(wù)中利用RabbitTemplate實(shí)現(xiàn)消息發(fā)送

RabbitTemplate 解釋:

RabbitTemplate 是 Spring AMQP 的核心實(shí)現(xiàn)類,它實(shí)現(xiàn)了 AmqpTemplate 接口,并且對(duì) AmqpTemplate 提供了一些更高層次的封裝,簡(jiǎn)化了消息發(fā)送和接收的操作。

RabbitTemplate 是大多數(shù) Spring AMQP 用戶的首選,它提供了很多便捷的方法和默認(rèn)的行為,使得消息交互變得更加簡(jiǎn)單。

主要方法:

  • convertAndSend: 與 AmqpTemplate 相同,提供了消息轉(zhuǎn)換并發(fā)送的功能。
  • receiveAndConvert: 從隊(duì)列中接收消息并轉(zhuǎn)換成 Java 對(duì)象。
  • send: 發(fā)送消息,不進(jìn)行轉(zhuǎn)換。
  • receive: 從隊(duì)列中接收消息。

RabbitTemplate 為發(fā)送消息提供了更豐富的功能,如消息轉(zhuǎn)換器、默認(rèn)交換機(jī)支持等,通常適用于大多數(shù)使用 Spring 的場(chǎng)景。

(1) 發(fā)送消息(convertAndSend)

rabbitTemplate.convertAndSend(String exchange, String routingKey, Object message);
  • exchange:指定交換機(jī)的名稱。
  • routingKey:消息的路由鍵。
  • message:消息體,可以是任何對(duì)象,RabbitTemplate 會(huì)自動(dòng)將其轉(zhuǎn)換為消息格式(例如,JSON、文本等)。

此方法的作用是將消息發(fā)送到指定交換機(jī),并且根據(jù)給定的路由鍵路由到相關(guān)隊(duì)列。

實(shí)例:

rabbitTemplate.convertAndSend("pay.topic", "pay.success", "支付成功的消息");

這里消息 "支付成功的消息" 被發(fā)送到 pay.topic 交換機(jī),使用 pay.success 路由鍵路由到對(duì)應(yīng)的隊(duì)列。

(2)接收消息(receiveAndConvert)

public <T> T receiveAndConvert(String queueName);
  • queueName:消息隊(duì)列的名稱。
  • 返回類型為 <T>,表示接收到的消息會(huì)被轉(zhuǎn)換為指定的類型。

receiveAndConvert 會(huì)從指定的隊(duì)列接收消息,并自動(dòng)將消息轉(zhuǎn)換成目標(biāo)對(duì)象類型。如果消息體不是可轉(zhuǎn)換的,方法將拋出異常。

String message = (String) rabbitTemplate.receiveAndConvert("pay.success.queue");
System.out.println("收到的消息: " + message);

這里,pay.success.queue 隊(duì)列中的消息將被接收,并自動(dòng)轉(zhuǎn)換為 String 類型。

還有一種就是 receive 方法:

public Message receive(String queueName);
  • queueName:隊(duì)列名稱。
  • 返回的是 Message 類型,而不是直接轉(zhuǎn)換成對(duì)象。你可以通過(guò) Message 對(duì)象獲取消息的內(nèi)容和其他屬性。

實(shí)例:

Message message = rabbitTemplate.receive("pay.success.queue");
if (message != null) {
    System.out.println("收到的消息: " + new String(message.getBody()));
}

(3)發(fā)送消息并獲取響應(yīng)(convertSendAndReceive)

public <T> T convertSendAndReceive(String exchange, String routingKey, Object message);
  • exchange:交換機(jī)名稱。
  • routingKey:路由鍵。
  • message:發(fā)送的消息對(duì)象。

convertSendAndReceive 方法既發(fā)送消息到交換機(jī),也等待從隊(duì)列返回響應(yīng)。它會(huì)根據(jù)指定的路由鍵將消息發(fā)送到交換機(jī),并等待響應(yīng)消息,然后將響應(yīng)轉(zhuǎn)換為返回類型。

實(shí)例:

String response = (String) rabbitTemplate.convertSendAndReceive("pay.topic", "pay.success", "請(qǐng)求處理");
System.out.println("收到響應(yīng): " + response);

(4)發(fā)送消息到特定隊(duì)列

public void send(String exchange, String routingKey, Message message);
  • exchange:交換機(jī)名稱。
  • routingKey:路由鍵。
  • message:消息對(duì)象(可以是 Message 類型)。

send 方法允許你將一個(gè) Message 對(duì)象發(fā)送到指定的交換機(jī),并根據(jù)路由鍵進(jìn)行路由。

Message message = new Message("支付成功".getBytes());
rabbitTemplate.send("pay.topic", "pay.success", message);

(5)設(shè)置消息監(jiān)聽(tīng)器

RabbitTemplate 本身并不直接處理消息監(jiān)聽(tīng)(接收消息),但是可以通過(guò)設(shè)置 RabbitListener 來(lái)監(jiān)聽(tīng)消息,并將其與 RabbitTemplate 配合使用。一般來(lái)說(shuō),消息接收和處理是在 @RabbitListener 注解的監(jiān)聽(tīng)器方法中完成的。

@RabbitListener(queues = "pay.success.queue")
public void receiveMessage(String message) {
    System.out.println("接收到消息: " + message);
}

下面看一個(gè)代碼實(shí)例:

@SpringBootTest
public class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleQueue() {
        // 隊(duì)列名稱
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 發(fā)送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

4.定義消費(fèi)者實(shí)現(xiàn)異步調(diào)用

@Component
@RequiredArgsConstructor
public class PayStatusListener {

    private final IOrderService orderService;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "trade.pay.success.queue", durable = "true"),
            exchange = @Exchange(name = "pay.topic"),
            key = "pay.success"
    ))
    public void listenPaySuccess(Long orderId){
        //調(diào)用方法
        orderService.markOrderPaySuccess(orderId);
    }
}

①@RequiredArgsConstructor

這是 Lombok 提供的注解,自動(dòng)為類中所有 final 修飾的字段生成一個(gè)包含這些字段的構(gòu)造函數(shù)。

使用這個(gè)注解可以免去手動(dòng)編寫(xiě)構(gòu)造函數(shù)的麻煩,尤其是在使用依賴注入時(shí)(例如注入 IOrderService)。

②@RabbitListener

@RabbitListener 注解用于監(jiān)聽(tīng)來(lái)自 RabbitMQ 隊(duì)列的消息。

它會(huì)自動(dòng)監(jiān)聽(tīng)指定的隊(duì)列,當(dāng)有消息到達(dá)時(shí),會(huì)觸發(fā) listenPaySuccess 方法進(jìn)行處理。

③QueueBinding(隊(duì)列綁定)

通過(guò) @QueueBinding 注解,綁定了 隊(duì)列交換機(jī),并指定了 路由鍵。

@Queue

  • name = "trade.pay.success.queue":指定隊(duì)列的名稱為 trade.pay.success.queue。
  • durable = "true":表示隊(duì)列是 持久化 的,即 RabbitMQ 重啟后隊(duì)列依然存在。
  • 隊(duì)列的作用:隊(duì)列是消息的臨時(shí)存儲(chǔ)地,消費(fèi)者會(huì)從隊(duì)列中拉取消息并處理。

@Exchange

  • name = "pay.topic":指定交換機(jī)的名稱為 pay.topic,這是一個(gè) Topic Exchange(主題交換機(jī))。
  • 交換機(jī)的作用:交換機(jī)決定消息如何路由到隊(duì)列。Topic Exchange 可以根據(jù)路由鍵的匹配規(guī)則將消息路由到合適的隊(duì)列。

key = "pay.success"

  • 路由鍵:指定了路由鍵為 pay.success。這意味著當(dāng)生產(chǎn)者發(fā)送的消息路由鍵是 pay.success 時(shí),消息將被路由到 trade.pay.success.queue 隊(duì)列。

5.總流程處理過(guò)程

  • 生產(chǎn)者:在支付成功后,生產(chǎn)者會(huì)發(fā)送一條消息到 pay.topic 交換機(jī),消息的路由鍵為 pay.success。
  • 交換機(jī)pay.topic 交換機(jī)會(huì)根據(jù)路由鍵 pay.success 將消息路由到 trade.pay.success.queue 隊(duì)列。
  • 消費(fèi)者PayStatusListener 作為消費(fèi)者監(jiān)聽(tīng) trade.pay.success.queue,當(dāng)有消息到達(dá)隊(duì)列時(shí),它會(huì)接收到訂單 ID 并調(diào)用訂單服務(wù)更新訂單狀態(tài)。

五.使用配置類管理定義交換機(jī),隊(duì)列及兩者關(guān)系

在 Spring AMQP 中,交換機(jī)(Exchange)、隊(duì)列(Queue)、以及綁定(Binding)可以通過(guò)配置類來(lái)定義和管理。配置類可以幫助你靈活地創(chuàng)建和綁定交換機(jī)與隊(duì)列,并且可以根據(jù)業(yè)務(wù)需求自定義各種參數(shù)。

創(chuàng)建配置類效果展示:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    // 創(chuàng)建隊(duì)列
    @Bean
    public Queue queue() {
        return new Queue("trade.pay.success.queue", true); // durable=true 表示隊(duì)列持久化
    }

    // 創(chuàng)建交換機(jī)
    @Bean
    public TopicExchange exchange() {
        return new TopicExchange("pay.topic"); // 創(chuàng)建主題交換機(jī)
    }

    // 創(chuàng)建綁定關(guān)系(隊(duì)列與交換機(jī)通過(guò) routing key 綁定)
    @Bean
    public Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("pay.success"); // 路由鍵是 pay.success
    }
}

1.創(chuàng)建隊(duì)列:Queue

@Bean
public Queue queue() {
    return new Queue("trade.pay.success.queue", true);
}

作用:通過(guò) @Bean 注解定義了一個(gè)隊(duì)列 Bean。Spring 容器會(huì)自動(dòng)管理這個(gè)隊(duì)列,并在 RabbitMQ 上創(chuàng)建該隊(duì)列。

參數(shù)

  • "trade.pay.success.queue":這是隊(duì)列的名稱。每個(gè)隊(duì)列在 RabbitMQ 中必須有唯一的名稱。
  • true:表示這個(gè)隊(duì)列是 持久化 的。持久化的隊(duì)列在 RabbitMQ 服務(wù)重啟后依然存在。
  • 隊(duì)列還可以設(shè)置其他屬性,例如是否自動(dòng)刪除(autoDelete)、是否排他性(exclusive)等,默認(rèn)是 false

2.創(chuàng)建交換機(jī):Exchange

@Bean
public TopicExchange exchange() {
    return new TopicExchange("pay.topic");
}
  • 作用:通過(guò) @Bean 注解定義了一個(gè) Topic Exchange 類型的交換機(jī)。
  • 參數(shù)"pay.topic":這是交換機(jī)的名稱。同樣,交換機(jī)在 RabbitMQ 中也必須有唯一的名稱。TopicExchange 支持通過(guò)通配符(如 *#)進(jìn)行更靈活的消息匹配。

Topic Exchange 是一種交換機(jī)類型,它允許使用通配符來(lái)進(jìn)行路由。例如,路由鍵可以是 "pay.*",可以匹配 "pay.success""pay.failure"。在這里可以使用四種交換機(jī)類型來(lái)定義交換機(jī),具體場(chǎng)景具體分析使用。

3.創(chuàng)建綁定關(guān)系:Binding

@Bean
public Binding binding(Queue queue, TopicExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with("pay.success");
}

作用:通過(guò) @Bean 注解定義了隊(duì)列和交換機(jī)的綁定關(guān)系。這個(gè)綁定決定了消息在何種條件下會(huì)從交換機(jī)路由到隊(duì)列。

參數(shù)

  • queue:隊(duì)列 trade.pay.success.queue 被傳入,表示將該隊(duì)列與交換機(jī)綁定。
  • exchange:交換機(jī) pay.topic 被傳入,表示將隊(duì)列與該交換機(jī)綁定。
  • .with("pay.success"):指定了路由鍵 pay.success,路由鍵 pay.success 會(huì)告訴交換機(jī),如果生產(chǎn)者發(fā)送的消息帶有這個(gè)路由鍵,那么該消息就會(huì)路由到 trade.pay.success.queue 隊(duì)列。

4.多個(gè)隊(duì)列綁定到同一個(gè)交換機(jī)

我們可以將多個(gè)隊(duì)列綁定到同一個(gè)交換機(jī),并使用不同的路由鍵。這樣可以實(shí)現(xiàn)根據(jù)不同的路由鍵來(lái)發(fā)送不同類型的消息到各自的隊(duì)列。

@Bean
public Queue paySuccessQueue() {
    return new Queue("pay.success.queue", true);
}

@Bean
public Queue payFailureQueue() {
    return new Queue("pay.failure.queue", true);
}

@Bean
public Binding paySuccessBinding(Queue paySuccessQueue, TopicExchange exchange) {
    return BindingBuilder.bind(paySuccessQueue).to(exchange).with("pay.success");
}

@Bean
public Binding payFailureBinding(Queue payFailureQueue, TopicExchange exchange) {
    return BindingBuilder.bind(payFailureQueue).to(exchange).with("pay.failure");
}
  • 在這個(gè)例子中,pay.success.queuepay.failure.queue 都綁定到同一個(gè)交換機(jī) pay.topic,但使用不同的路由鍵。
  • 消息路由邏輯
    • 當(dāng)生產(chǎn)者發(fā)送路由鍵為 pay.success 的消息時(shí),消息會(huì)路由到 pay.success.queue 隊(duì)列。
    • 當(dāng)生產(chǎn)者發(fā)送路由鍵為 pay.failure 的消息時(shí),消息會(huì)路由到 pay.failure.queue 隊(duì)列。
  • BindingBuilder.bind(paySuccessQueue)paySuccessQueue 參數(shù)就是綁定到交換機(jī)的隊(duì)列對(duì)象,也就是 pay.success.queue。它會(huì)被傳遞給 BindingBuilder,并且指定此隊(duì)列和交換機(jī)的綁定關(guān)系。
  • .to(exchange):這里將隊(duì)列和交換機(jī)綁定,指定消息通過(guò)交換機(jī)路由到隊(duì)列。
  • .with("pay.success"):指定路由鍵為 pay.success,意思是只有路由鍵為 pay.success 的消息才會(huì)被路由到 paySuccessQueue 隊(duì)列中。

在 Spring AMQP 中,隊(duì)列對(duì)象(如 paySuccessQueue)是由 @Bean 注解的 Queue 類型方法返回的。Spring 會(huì)自動(dòng)將返回的隊(duì)列對(duì)象放入到 Spring 容器中,并注入到需要它的地方。所以當(dāng)我們?cè)?Binding 中使用 paySuccessQueue 時(shí),實(shí)際上是在引用之前構(gòu)造并注冊(cè)到 Spring 容器中的隊(duì)列實(shí)例。

在 Spring 的上下文中,paySuccessQueuepayFailureQueue 是已經(jīng)被創(chuàng)建并管理的隊(duì)列對(duì)象,我們不需要手動(dòng)創(chuàng)建隊(duì)列,只需要在 Binding 中通過(guò)引用這些對(duì)象來(lái)建立隊(duì)列與交換機(jī)之間的關(guān)系。

5.配置不同類型的交換機(jī)

除了 Topic Exchange,RabbitMQ 還支持其他幾種常見(jiàn)的交換機(jī)類型。這里分別演示如何創(chuàng)建 Direct ExchangeFanout ExchangeHeaders Exchange。

(1)Direct Exchange

@Bean
public DirectExchange directExchange() {
    return new DirectExchange("direct.exchange");
}

@Bean
public Binding directBinding(Queue queue, DirectExchange directExchange) {
    return BindingBuilder.bind(queue).to(directExchange).with("direct.routing.key");
}

Direct Exchange:直接交換機(jī)會(huì)根據(jù) 完全匹配的路由鍵 將消息發(fā)送到隊(duì)列。只有當(dāng)消息的路由鍵和綁定的路由鍵 完全一致 時(shí),消息才會(huì)被路由到指定隊(duì)列。

(2)Fanout Exchange

@Bean
public FanoutExchange fanoutExchange() {
    return new FanoutExchange("fanout.exchange");
}

@Bean
public Binding fanoutBinding(Queue queue, FanoutExchange fanoutExchange) {
    return BindingBuilder.bind(queue).to(fanoutExchange);  // 不需要路由鍵
}

Fanout Exchange:扇出交換機(jī)會(huì)將消息發(fā)送到所有綁定的隊(duì)列,不需要考慮路由鍵。這個(gè)交換機(jī)通常用于廣播消息。

(3)Headers Exchange

@Bean
public HeadersExchange headersExchange() {
    return new HeadersExchange("headers.exchange");
}

@Bean
public Binding headersBinding(Queue queue, HeadersExchange headersExchange) {
    return BindingBuilder.bind(queue).to(headersExchange).where("header-key").matches("header-value");
}

Headers Exchange:頭交換機(jī)根據(jù)消息頭的內(nèi)容進(jìn)行路由,而不是依賴路由鍵。適用于按消息的元數(shù)據(jù)進(jìn)行路由的場(chǎng)景。

總結(jié):

通過(guò) Spring AMQP 的配置類,你可以非常靈活地定義 RabbitMQ 的 交換機(jī)、隊(duì)列綁定關(guān)系,并通過(guò)不同的路由鍵和交換機(jī)類型實(shí)現(xiàn)復(fù)雜的消息路由邏輯。

以下是一些關(guān)鍵要點(diǎn):

  1. 隊(duì)列(Queue):消息的臨時(shí)存儲(chǔ)地,可以是持久化的。
  2. 交換機(jī)(Exchange):控制消息如何分發(fā)到不同的隊(duì)列。
    • Direct Exchange:嚴(yán)格匹配路由鍵。
    • Topic Exchange:支持通配符匹配路由鍵。
    • Fanout Exchange:廣播消息到所有綁定的隊(duì)列。
    • Headers Exchange:根據(jù)消息頭的內(nèi)容進(jìn)行路由。
  3. 綁定(Binding):將隊(duì)列與交換機(jī)連接起來(lái),使用路由鍵來(lái)決定消息的流向。

通過(guò)配置類來(lái)定義這些組件,能夠簡(jiǎn)化 RabbitMQ 與 Spring 應(yīng)用的集成,并且通過(guò)靈活的路由規(guī)則支持復(fù)雜的消息傳遞需求。

六.在Springboot項(xiàng)目中使用RabbitMQ解決高并發(fā)問(wèn)題

在 Spring Boot 項(xiàng)目中使用 RabbitMQ 來(lái)實(shí)現(xiàn)消息傳輸,處理并發(fā)問(wèn)題是非常常見(jiàn)的一種方式。RabbitMQ 可以通過(guò)解耦應(yīng)用程序的不同部分,并將任務(wù)分發(fā)給多個(gè)消費(fèi)者,從而有效地解決并發(fā)和負(fù)載均衡問(wèn)題。

1.引入依賴

首先,需要在 pom.xml 文件中引入 spring-boot-starter-amqp 依賴,這樣 Spring Boot 就可以與 RabbitMQ 集成:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
</dependencies>

2.配置RabbitMQ

application.ymlapplication.properties 文件中,配置 RabbitMQ 的連接信息:

spring:
  rabbitmq:
    host: localhost
    port: 15672
    username: hmall
    password: 123
    virtual-host: /
    listener:
      simple:
        concurrency: 3  # 設(shè)置消費(fèi)者的最小數(shù)量
        max-concurrency: 10  # 設(shè)置消費(fèi)者的最大數(shù)量

3.RabbitMQ配置類

創(chuàng)建一個(gè)配置類,用于聲明隊(duì)列、交換機(jī)、綁定等。

package com.example.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    // 聲明隊(duì)列
    @Bean
    public Queue taskQueue() {
        return new Queue("taskQueue", true);  // true表示隊(duì)列持久化
    }

    // 聲明交換機(jī)
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("taskExchange");
    }

    // 綁定隊(duì)列和交換機(jī)
    @Bean
    public Binding binding(Queue taskQueue, TopicExchange topicExchange) {
        return new Binding("taskQueue",
                Binding.DestinationType.QUEUE,
                "taskExchange",
                "task.#", // 使用路由鍵
                null);
    }
}

4.創(chuàng)建消息生產(chǎn)者(Producer)

創(chuàng)建一個(gè)消息生產(chǎn)者(Producer),它將消息發(fā)送到 RabbitMQ 隊(duì)列。

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class TaskProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 發(fā)送消息
    public void sendMessage(String message) {
        System.out.println("Sending message: " + message);
        rabbitTemplate.convertAndSend("taskExchange", "task.new", message);  // 發(fā)送到交換機(jī),使用路由鍵
    }
}

5.創(chuàng)建消息消費(fèi)者(Consumer)

消費(fèi)者從 RabbitMQ 隊(duì)列中異步接收消息,并進(jìn)行并發(fā)處理。在消費(fèi)者類中,使用 @RabbitListener 注解監(jiān)聽(tīng)隊(duì)列,確保多個(gè)消費(fèi)者可以同時(shí)處理消息。

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class TaskConsumer {

    // 監(jiān)聽(tīng)隊(duì)列并異步處理消息
    @RabbitListener(queues = "taskQueue")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
        
        // 模擬處理消息的業(yè)務(wù)邏輯
        try {
            Thread.sleep(1000);  // 模擬耗時(shí)操作
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Finished processing message: " + message);
    }
}

6.增加并發(fā)消費(fèi)者(多個(gè)消費(fèi)者)

為了處理高并發(fā),我們可以通過(guò)設(shè)置 concurrencymax-concurrency 來(lái)控制消費(fèi)者的最小和最大并發(fā)數(shù)。這樣,多個(gè)消費(fèi)者可以同時(shí)處理來(lái)自隊(duì)列的消息。我們已經(jīng)在 application.yml 中配置了 concurrency,現(xiàn)在的配置允許最多啟動(dòng) 20 個(gè)消費(fèi)者來(lái)處理消息。

  • 最小并發(fā)數(shù):設(shè)置為 5,意味著在任何時(shí)候都會(huì)至少啟動(dòng) 5 個(gè)消費(fèi)者。
  • 最大并發(fā)數(shù):設(shè)置為 20,表示 RabbitMQ 可以根據(jù)負(fù)載增加最多 20 個(gè)消費(fèi)者。

總結(jié)

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • java中1+1d/5和1+1/5的區(qū)別說(shuō)明

    java中1+1d/5和1+1/5的區(qū)別說(shuō)明

    這篇文章主要介紹了java中1+1d/5和1+1/5的區(qū)別說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-10-10
  • Java的訪問(wèn)修飾符與變量的作用域講解

    Java的訪問(wèn)修飾符與變量的作用域講解

    這篇文章主要介紹了Java的訪問(wèn)修飾符與變量的作用域講解,是Java入門(mén)學(xué)習(xí)中的基礎(chǔ)知識(shí),需要的朋友可以參考下
    2015-09-09
  • 解決java main函數(shù)中的args數(shù)組傳值問(wèn)題

    解決java main函數(shù)中的args數(shù)組傳值問(wèn)題

    這篇文章主要介紹了解決java main函數(shù)中的args數(shù)組傳值問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2021-02-02
  • Java跳出多重嵌套循環(huán)過(guò)程解析

    Java跳出多重嵌套循環(huán)過(guò)程解析

    這篇文章主要介紹了Java跳出多重嵌套循環(huán)過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-12-12
  • 一文詳解Java抽象類到底有多抽象

    一文詳解Java抽象類到底有多抽象

    這篇文章主要介紹了一文詳解Java抽象類到底有多抽象,抽象方法所在的類必須是抽象類,子類若繼承了一個(gè)抽象類,就必須覆寫(xiě)父類的所有抽象方法,這里的子類是普通類,是強(qiáng)制要求覆寫(xiě)所有抽象方法,但是如果子類也是一個(gè)抽象類,那么就可以不覆寫(xiě)
    2022-06-06
  • Spring Boot 簡(jiǎn)單使用EhCache緩存框架的方法

    Spring Boot 簡(jiǎn)單使用EhCache緩存框架的方法

    本篇文章主要介紹了Spring Boot 簡(jiǎn)單使用EhCache緩存框架的方法,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2018-07-07
  • SpringBoot 自定義注解異步記錄復(fù)雜日志詳解

    SpringBoot 自定義注解異步記錄復(fù)雜日志詳解

    這篇文章主要為大家介紹了SpringBoot 自定義注解異步記錄復(fù)雜日志詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-09-09
  • MyBatis不用@Param傳遞多個(gè)參數(shù)的操作

    MyBatis不用@Param傳遞多個(gè)參數(shù)的操作

    這篇文章主要介紹了MyBatis不用@Param傳遞多個(gè)參數(shù)的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2021-02-02
  • Java?IO流之StringWriter和StringReader用法分析

    Java?IO流之StringWriter和StringReader用法分析

    這篇文章主要介紹了Java?IO流之StringWriter和StringReader用法分析,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • IntelliJ IDEA 2023.2正式發(fā)布新UI和Profiler轉(zhuǎn)正(最新推薦)

    IntelliJ IDEA 2023.2正式發(fā)布新UI和Profiler轉(zhuǎn)正(最新推薦)

    北京時(shí)間2023年7月26日,IntelliJ IDEA 2023.2正式發(fā)布,IntelliJ IDEA 2023.2 引入 AI Assistant(AI助手),通過(guò)一組由 AI 提供支持的功能助力開(kāi)發(fā),今天給大家分享IntelliJ IDEA 2023.2正式發(fā)布新UI和Profiler轉(zhuǎn)正,感興趣的朋友一起看看吧
    2023-10-10

最新評(píng)論