SpringBoot整合RabbitMQ實現(xiàn)RPC遠程調(diào)用功能
1. 交互過程
- 啟動 RPC 服務(wù)端和客戶端,創(chuàng)建連接和通道。
- 聲明請求隊列和回復(fù)隊列,確保使用相同的隊列名稱。
- 客戶端發(fā)送請求:客戶端將請求消息發(fā)送到指定的
請求隊列中。 - 服務(wù)端監(jiān)聽請求隊列:服務(wù)端在指定的
請求隊列上監(jiān)聽請求消息。 - 服務(wù)端接收請求:服務(wù)端接收到客戶端發(fā)送的請求消息。
- 服務(wù)端處理請求:服務(wù)端根據(jù)請求消息中的參數(shù),執(zhí)行相應(yīng)的業(yè)務(wù)邏輯,并得到處理結(jié)果。
- 服務(wù)端發(fā)送響應(yīng):服務(wù)端將處理結(jié)果作為響應(yīng)消息發(fā)送到客戶端指定的回復(fù)隊列中。
- 客戶端監(jiān)聽響應(yīng)隊列:客戶端在指定的回復(fù)隊列上監(jiān)聽響應(yīng)消息。
- 客戶端接收響應(yīng):客戶端接收到服務(wù)端發(fā)送的響應(yīng)消息。
- 客戶端處理響應(yīng):客戶端根據(jù)響應(yīng)消息中的結(jié)果進行相應(yīng)的處理。
2. 導(dǎo)入依賴
創(chuàng)建一個SpringBoot項目并導(dǎo)入依賴坐標(biāo)
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.10.RELEASE</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.3.10.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.3.10.RELEASE</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.12.0</version>
</dependency>
</dependencies>application.yml
spring:
rabbitmq:
host: localhost # 主機名
port: 5672 # 端口
username: guest # 用戶名
password: guest # 密碼
virtual-host: /
template:
receive-timeout: 2000
reply-timeout: 2000
listener:
simple:
concurrency: 1
max-concurrency: 3
prefetch: 1 # 消費者每次只能預(yù)取1條數(shù)據(jù)到內(nèi)存并處理,默認(rèn)為250條
acknowledge-mode: manual # 確定機制 manual:手動確認(rèn)
publisher-returns: true
publisher-confirm-type: correlated注意:需要提前開啟
RabbitMQ服務(wù),否則項目運行會報錯
3. RPC 服務(wù)端
首先,我們來看一下RPC服務(wù)端的代碼。
package com.rabbit.rpc;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* rpc服務(wù)端
*/
public class RPCServer {
// 定義請求隊列常量
private final static String REQUEST_QUEUE_NAME = "rpc_queue";
// 定義回復(fù)隊列常量
private final static String REPLY_QUEUE_NAME = "rpc_reply_queue";
/**
* 服務(wù)端啟動入口
*/
public static void main(String[] args) {
// 創(chuàng)建連接
ConnectionFactory factory = new ConnectionFactory();
//factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 聲明請求隊列和回復(fù)隊列
channel.queueDeclare(REQUEST_QUEUE_NAME, false, false, false, null);
channel.queueDeclare(REPLY_QUEUE_NAME, false, false, false, null);
// 每次僅接收一條未經(jīng)確認(rèn)的消息
channel.basicQos(1);
// 構(gòu)建消費者屬性
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 獲取消息
String message = new String(body, StandardCharsets.UTF_8);
// 進行業(yè)務(wù)處理構(gòu)建響應(yīng)數(shù)據(jù) (這里做字符串拼接模擬響應(yīng)數(shù)據(jù))
String response = message + ":::";
// 構(gòu)造響應(yīng)基本屬性
AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
.correlationId(properties.getCorrelationId()) // 設(shè)置關(guān)聯(lián)id
.build();
// 發(fā)送響應(yīng)數(shù)據(jù)到回復(fù)隊列 (rpc_reply_queue)
channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes(StandardCharsets.UTF_8));
// 手動回執(zhí)消息確認(rèn)消費
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 消費rpc_queue隊列的消息
channel.basicConsume(REQUEST_QUEUE_NAME, false, consumer);
// 持續(xù)監(jiān)聽請求消息
while (true) {
Thread.sleep(50);
}
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
}
}
}在RPC服務(wù)端的代碼中,我們首先創(chuàng)建了一個連接和一個通道,然后聲明了請求隊列和回復(fù)隊列。我們設(shè)置每一次只接收一條未確認(rèn)的消息,并創(chuàng)建了一個消費者對象,用于處理接收到的消息。
在handleDelivery方法中,我們從消息中獲取請求數(shù)據(jù),并進行業(yè)務(wù)處理,然后構(gòu)造響應(yīng)數(shù)據(jù)并發(fā)送到回復(fù)隊列。最后,我們手動確認(rèn)消費,并繼續(xù)監(jiān)聽請求消息。
4. RPC 客戶端
接下來,我們看一下RPC客戶端的代碼。
package com.rabbit.rpc;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
/**
* rpc客戶端
*/
public class RPCClient {
// 定義請求隊列常量
private final static String REQUEST_QUEUE_NAME = "rpc_queue";
// 定義回復(fù)隊列常量
private final static String REPLY_QUEUE_NAME = "rpc_reply_queue";
/**
* 客戶端啟動入口
*/
public static void main(String[] args) {
System.out.println("Response1: " + call("Hello, RPC Server1"));
System.out.println("Response2: " + call("Hello, RPC Server2"));
System.out.println("Response3: " + call("Hello, RPC Server3"));
}
/**
* 發(fā)送請求到隊列并返回響應(yīng)數(shù)據(jù)
*
* @param message 請求消息
* @return 響應(yīng)數(shù)據(jù)
*/
public static String call(String message) {
// 創(chuàng)建連接
ConnectionFactory factory = new ConnectionFactory();
//factory.setHost("localhost");
String result = "";
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 聲明請求隊列和回復(fù)隊列
channel.queueDeclare(REQUEST_QUEUE_NAME, false, false, false, null);
channel.queueDeclare(REPLY_QUEUE_NAME, false, false, false, null);
// 每次僅接收一條未經(jīng)確認(rèn)的消息
channel.basicQos(1);
// 生成關(guān)聯(lián)id
String correlationId = UUID.randomUUID().toString();
// 構(gòu)造請求基本屬性
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.correlationId(correlationId) // 設(shè)置關(guān)聯(lián)id
.replyTo(REPLY_QUEUE_NAME) // 設(shè)置回復(fù)隊列 (rpc_reply_queue)
.build();
// 發(fā)送請求消息到rpc_queue隊列
channel.basicPublish("", REQUEST_QUEUE_NAME, props, message.getBytes(StandardCharsets.UTF_8));
// 用于保存響應(yīng)消息的阻塞隊列
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
// 監(jiān)聽回復(fù)隊列接收響應(yīng)消息
String consumerTag = channel.basicConsume(REPLY_QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
// 通過關(guān)聯(lián)id獲取到響應(yīng)數(shù)據(jù)
if (properties.getCorrelationId().equals(correlationId)) {
response.offer(new String(body, StandardCharsets.UTF_8));
}
}
});
// 清空阻塞隊列
response.clear();
// 等待接收響應(yīng)消息
result = response.take();
// 取消消費者的監(jiān)聽
channel.basicCancel(consumerTag);
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
}
return result;
}
}在RPC客戶端的代碼中,我們同樣創(chuàng)建了一個連接和一個通信,并聲明了請求隊列和回復(fù)隊列。然后,我們設(shè)置每次只接收一條未確認(rèn)的消息,并創(chuàng)建了一個唯一的關(guān)聯(lián)id。
在call方法中,我們構(gòu)建了請求消息的基礎(chǔ)屬性,并將請求消息發(fā)送到請求隊列。接下來,我們創(chuàng)建了一個阻塞隊列,用于保存response響應(yīng)消息。
通過監(jiān)聽回復(fù)隊列接收響應(yīng)消息,并通過關(guān)聯(lián)id接收到對應(yīng)的響應(yīng)數(shù)據(jù),然后將其放入response隊列中。最后,通過阻塞隊列接收響應(yīng)消息,并返回響應(yīng)數(shù)據(jù)。
5. 運行代碼
現(xiàn)在,我們可以運行RPC服務(wù)端和客戶端的代碼了。首先運行服務(wù)端代碼,它會啟動一個監(jiān)聽請求的進程。然后,運行客戶端代碼,它會發(fā)送請求消息并等待接收響應(yīng)消息。
最終會看到客戶端發(fā)出了三次請求,并打印了對應(yīng)的響應(yīng)數(shù)據(jù)。

項目地址:rabbitmq-rpc: RabbitMQ實現(xiàn)RPC遠程調(diào)用功能練習(xí)Demo (gitee.com)
以上就是SpringBoot整合RabbitMQ實現(xiàn)RPC遠程調(diào)用功能的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot RabbitMQ RPC遠程調(diào)用的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Kafka常用命令之kafka-console-consumer.sh解讀
這篇文章主要介紹了Kafka常用命令之kafka-console-consumer.sh解讀,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-03-03
Java使用組合模式實現(xiàn)表示公司組織結(jié)構(gòu)功能示例
這篇文章主要介紹了Java使用組合模式實現(xiàn)表示公司組織結(jié)構(gòu)功能,簡單描述了組合模式的概念、功能并結(jié)合實例形式分析了Java使用組合模式實現(xiàn)公司組織結(jié)構(gòu)表示功能具體操作步驟與相關(guān)注意事項,需要的朋友可以參考下2018-05-05
Java Chassis3過載狀態(tài)下的快速失敗解決分析
本文解密了Java Chassis 3快速失敗相關(guān)的機制和背后故事,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2024-01-01
Java?Collection接口中的常用方法總結(jié)
這篇文章將大概用代碼案例簡單總結(jié)一下?Collection?接口中的一些方法,我們會以他的實現(xiàn)類?Arraylist?為例創(chuàng)建對象??煲黄饋砜纯窗?/div> 2022-12-12最新評論

