RabbitMQ工作模式中的RPC通信模式詳解
RPC通信模式
概述
在RabbitMQ中,RPC模式通過消息隊列實現(xiàn)遠程調(diào)用功能??蛻舳耍ㄉa(chǎn)者)發(fā)送消息到消費隊列,服務端(消費者)進行消息消費并執(zhí)行相應的程序,然后將結果發(fā)送到回調(diào)隊列供客戶端使用。這是一種雙向的生產(chǎn)消費模式,其中客戶端既是生產(chǎn)者又是消費者,服務端則專注于處理消息并生成響應。
在RPC通信的過程中, 沒有?產(chǎn)者和消費者, ?較像咱們RPC遠程調(diào)?, ?概就是通過兩個隊列實現(xiàn)了?個可回調(diào)的過程.
工作流程
1.客戶端發(fā)送請求:
客戶端連接到RabbitMQ服務器。
客戶端聲明一個用于發(fā)送RPC請求的隊列(通常是固定的,如rpc_queue)。
客戶端創(chuàng)建一個臨時的回調(diào)隊列,并在發(fā)送請求時,將回調(diào)隊列的名稱作為消息屬性(reply_to)發(fā)送給交換機。
客戶端為每個請求生成一個唯一的correlation_id,并將其作為消息屬性發(fā)送,以便在接收響應時能夠匹配請求與響應。
2.交換機路由請求:
交換機接收到RPC請求后,根據(jù)路由鍵將請求路由到服務端監(jiān)聽的隊列。
3.服務端處理請求:
服務端(消費者)從隊列中接收請求。
服務端處理請求,并生成響應。
服務端將響應發(fā)送到客戶端指定的回調(diào)隊列,并在消息屬性中設置相同的correlation_id。
4.客戶端接收響應:
客戶端監(jiān)聽其回調(diào)隊列以接收響應。
當接收到響應時,客戶端檢查correlation_id以確定響應是否與之前的請求匹配。
如果匹配,客戶端處理響應;如果不匹配,客戶端可能丟棄該響應。
特點
1.解耦:客戶端和服務端之間不需要直接通信,降低了系統(tǒng)間的耦合度。
2.靈活性:支持多種語言和平臺之間的遠程調(diào)用。
3.可擴展性:通過增加服務端(消費者)的數(shù)量,可以輕松擴展RPC服務。
4.性能開銷:由于涉及到網(wǎng)絡傳輸和消息隊列的處理,RPC調(diào)用的性能通常低于本地調(diào)用。
5.復雜性:需要處理消息隊列的可靠性、持久性、消息確認等復雜問題。
6.安全性:遠程調(diào)用可能面臨更多的安全風險,如消息篡改、中間人攻擊等。
應用場景
RabbitMQ的RPC通信模式適用于需要遠程調(diào)用服務的場景,如分布式系統(tǒng)中的服務調(diào)用、微服務架構中的服務通信等。通過RabbitMQ的消息隊列機制,可以實現(xiàn)跨系統(tǒng)、跨語言的遠程調(diào)用,提高系統(tǒng)的靈活性和可擴展性。
代碼案例
引入依賴
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.21.0</version> </dependency>
常量類
public class Constants { public static final String HOST = "47.98.109.138"; public static final int PORT = 5672; public static final String USER_NAME = "study"; public static final String PASSWORD = "study"; public static final String VIRTUAL_HOST = "aaa"; //rpc 模式 public static final String RPC_REQUEST_QUEUE = "rpc.request.queue"; public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue"; }
編寫客戶端代碼
import com.rabbitmq.client.*; import rabbitmq.constant.Constants; import java.io.IOException; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeoutException; /** * rpc 客戶端 * 1. 發(fā)送請求 * 2. 接收響應 */ public class RpcClient { public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //1. 建立連接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); //需要提前開放端口號 connectionFactory.setUsername(Constants.USER_NAME);//賬號 connectionFactory.setPassword(Constants.PASSWORD); //密碼 connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機 Connection connection = connectionFactory.newConnection(); //2. 開啟信道 Channel channel = connection.createChannel(); channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null); channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null); //3. 發(fā)送請求 String msg = "hello rpc..."; //設置請求的唯一標識 String correlationID = UUID.randomUUID().toString(); //設置請求的相關屬性 AMQP.BasicProperties props = new AMQP.BasicProperties().builder() .correlationId(correlationID) .replyTo(Constants.RPC_RESPONSE_QUEUE) .build(); channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes()); //4. 接收響應 //使用阻塞隊列, 來存儲響應信息 final BlockingQueue<String> response = new ArrayBlockingQueue<>(1); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String respMsg = new String(body); System.out.println("接收到回調(diào)消息: "+ respMsg); if (correlationID.equals(properties.getCorrelationId())){ //如果correlationID校驗一致 response.offer(respMsg); } } }; channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer); String result = response.take(); System.out.println("[RPC Client 響應結果]:"+ result); } }
編寫服務端代碼
import com.rabbitmq.client.*; import rabbitmq.constant.Constants; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * RPC server * 1. 接收請求 * 2. 發(fā)送響應 */ public class RpcServer { public static void main(String[] args) throws IOException, TimeoutException { //1. 建立連接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); //需要提前開放端口號 connectionFactory.setUsername(Constants.USER_NAME);//賬號 connectionFactory.setPassword(Constants.PASSWORD); //密碼 connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機 Connection connection = connectionFactory.newConnection(); //2. 開啟信道 Channel channel = connection.createChannel(); //3. 接收請求 channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String request = new String(body,"UTF-8"); System.out.println("接收到請求:"+ request); String response = "針對request:"+ request +", 響應成功"; AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder() .correlationId(properties.getCorrelationId()) .build(); channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes()); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer); } }
運行程序(先運行客戶端,再運行服務端)
可以在管理界面看到其中一個隊列中有1條消息
我們可以看到,服務端接收到了消息并給客戶端發(fā)送了響應,與預期符合。
到此這篇關于RabbitMQ工作模式之RPC通信模式的文章就介紹到這了,更多相關RabbitMQ RPC通信模式內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
SpringBoot如何統(tǒng)一清理數(shù)據(jù)
這篇文章主要介紹了SpringBoot如何統(tǒng)一清理數(shù)據(jù)問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-01-01詳解分別用Kotlin和java寫RecyclerView的示例
本篇文章主要介紹了詳解分別用Kotlin和java寫RecyclerView的示例,詳解分別用Kotlin和java寫RecyclerView的示例2017-12-12Springboot+Shiro+Jwt實現(xiàn)權限控制的項目實踐
如今的互聯(lián)網(wǎng)已經(jīng)成為前后端分離的時代,所以本文在使用SpringBoot整合Shiro框架的時候會聯(lián)合JWT一起搭配使用,具有一定的參考價值,感興趣的可以了解一下2023-09-09基于Tomcat7、Java、WebSocket的服務器推送聊天室實例
HTML5 WebSocket實現(xiàn)了服務器與瀏覽器的雙向通訊,本篇文章主要介紹了基于Tomcat7、Java、WebSocket的服務器推送聊天室實例,具有一定的參考價值,有興趣的可以了解一下。2016-12-12