欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RabbitMQ工作模式中的RPC通信模式詳解

 更新時間:2025年06月03日 11:49:56   作者:新綠MEHO  
在RabbitMQ中,RPC模式通過消息隊列實現(xiàn)遠程調(diào)用功能,這篇文章給大家介紹RabbitMQ工作模式之RPC通信模式,感興趣的朋友一起看看吧

RPC通信模式

概述

在RabbitMQ中,RPC模式通過消息隊列實現(xiàn)遠程調(diào)用功能??蛻舳耍ㄉa(chǎn)者)發(fā)送消息到消費隊列,服務端(消費者)進行消息消費并執(zhí)行相應的程序,然后將結果發(fā)送到回調(diào)隊列供客戶端使用。這是一種雙向的生產(chǎn)消費模式,其中客戶端既是生產(chǎn)者又是消費者,服務端則專注于處理消息并生成響應。

在RPC通信的過程中, 沒有?產(chǎn)者和消費者, ?較像咱們RPC遠程調(diào)?, ?概就是通過兩個隊列實現(xiàn)了?個可回調(diào)的過程.

工作流程

1.客戶端發(fā)送請求:

客戶端連接到RabbitMQ服務器。
客戶端聲明一個用于發(fā)送RPC請求的隊列(通常是固定的,如rpc_queue)。
客戶端創(chuàng)建一個臨時的回調(diào)隊列,并在發(fā)送請求時,將回調(diào)隊列的名稱作為消息屬性(reply_to)發(fā)送給交換機。
客戶端為每個請求生成一個唯一的correlation_id,并將其作為消息屬性發(fā)送,以便在接收響應時能夠匹配請求與響應。

2.交換機路由請求:

交換機接收到RPC請求后,根據(jù)路由鍵將請求路由到服務端監(jiān)聽的隊列。

3.服務端處理請求:

服務端(消費者)從隊列中接收請求。
服務端處理請求,并生成響應。
服務端將響應發(fā)送到客戶端指定的回調(diào)隊列,并在消息屬性中設置相同的correlation_id。

4.客戶端接收響應:

客戶端監(jiān)聽其回調(diào)隊列以接收響應。
當接收到響應時,客戶端檢查correlation_id以確定響應是否與之前的請求匹配。
如果匹配,客戶端處理響應;如果不匹配,客戶端可能丟棄該響應。

特點

1.解耦:客戶端和服務端之間不需要直接通信,降低了系統(tǒng)間的耦合度。
2.靈活性:支持多種語言和平臺之間的遠程調(diào)用。
3.可擴展性:通過增加服務端(消費者)的數(shù)量,可以輕松擴展RPC服務。
4.性能開銷:由于涉及到網(wǎng)絡傳輸和消息隊列的處理,RPC調(diào)用的性能通常低于本地調(diào)用。
5.復雜性:需要處理消息隊列的可靠性、持久性、消息確認等復雜問題。
6.安全性:遠程調(diào)用可能面臨更多的安全風險,如消息篡改、中間人攻擊等。

應用場景

RabbitMQ的RPC通信模式適用于需要遠程調(diào)用服務的場景,如分布式系統(tǒng)中的服務調(diào)用、微服務架構中的服務通信等。通過RabbitMQ的消息隊列機制,可以實現(xiàn)跨系統(tǒng)、跨語言的遠程調(diào)用,提高系統(tǒng)的靈活性和可擴展性。

代碼案例

引入依賴

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.21.0</version>
</dependency>

常量類

public class Constants {
    public static final String HOST = "47.98.109.138";
    public static final int PORT = 5672;
    public static final String USER_NAME = "study";
    public static final String PASSWORD = "study";
    public static final String VIRTUAL_HOST = "aaa";
    //rpc 模式
    public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";
    public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";
}

編寫客戶端代碼

import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
/**
 * rpc 客戶端
 * 1. 發(fā)送請求
 * 2. 接收響應
 */
public class RpcClient {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //1. 建立連接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前開放端口號
        connectionFactory.setUsername(Constants.USER_NAME);//賬號
        connectionFactory.setPassword(Constants.PASSWORD);  //密碼
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機
        Connection connection = connectionFactory.newConnection();
        //2. 開啟信道
        Channel channel = connection.createChannel();
        channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);
        channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);
        //3. 發(fā)送請求
        String msg = "hello rpc...";
        //設置請求的唯一標識
        String correlationID = UUID.randomUUID().toString();
        //設置請求的相關屬性
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
                .correlationId(correlationID)
                .replyTo(Constants.RPC_RESPONSE_QUEUE)
                .build();
        channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());
        //4. 接收響應
        //使用阻塞隊列, 來存儲響應信息
        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String respMsg = new String(body);
                System.out.println("接收到回調(diào)消息: "+ respMsg);
                if (correlationID.equals(properties.getCorrelationId())){
                    //如果correlationID校驗一致
                    response.offer(respMsg);
                }
            }
        };
        channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);
        String result = response.take();
        System.out.println("[RPC Client 響應結果]:"+ result);
    }
}

編寫服務端代碼

import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * RPC server
 * 1. 接收請求
 * 2. 發(fā)送響應
 */
public class RpcServer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立連接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前開放端口號
        connectionFactory.setUsername(Constants.USER_NAME);//賬號
        connectionFactory.setPassword(Constants.PASSWORD);  //密碼
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機
        Connection connection = connectionFactory.newConnection();
        //2. 開啟信道
        Channel channel = connection.createChannel();
        //3. 接收請求
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String request = new String(body,"UTF-8");
                System.out.println("接收到請求:"+ request);
                String response = "針對request:"+ request +", 響應成功";
                AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
                        .correlationId(properties.getCorrelationId())
                        .build();
                channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);
    }
}

運行程序(先運行客戶端,再運行服務端)

可以在管理界面看到其中一個隊列中有1條消息

我們可以看到,服務端接收到了消息并給客戶端發(fā)送了響應,與預期符合。

到此這篇關于RabbitMQ工作模式之RPC通信模式的文章就介紹到這了,更多相關RabbitMQ RPC通信模式內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • SpringBoot如何統(tǒng)一清理數(shù)據(jù)

    SpringBoot如何統(tǒng)一清理數(shù)據(jù)

    這篇文章主要介紹了SpringBoot如何統(tǒng)一清理數(shù)據(jù)問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-01-01
  • jmeter設置全局變量與正則表達式提取器過程圖解

    jmeter設置全局變量與正則表達式提取器過程圖解

    這篇文章主要介紹了jmeter設置全局變量與正則表達式提取器過程圖解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2019-10-10
  • Java Zip文件讀寫操作詳解

    Java Zip文件讀寫操作詳解

    這篇文章主要為大家詳細介紹了如何利用Java ZipInputstream、ZipOutputStream實現(xiàn)獲取每個文件中的內(nèi)容與寫入內(nèi)容,感興趣的可以動手嘗試一下
    2022-11-11
  • Spring事務管理詳細講解

    Spring事務管理詳細講解

    事務的作用就是為了保證用戶的每一個操作都是可靠的,事務中的每一步操作都必須成功執(zhí)行,只要有發(fā)生異常就?回退到事務開始未進行操作的狀態(tài)。事務管理是Spring框架中最為常用的功能之一,我們在使用Spring?Boot開發(fā)應用時,大部分情況下也都需要使用事務
    2022-10-10
  • lambdaQueryWrapper多條件嵌套查詢方式

    lambdaQueryWrapper多條件嵌套查詢方式

    這篇文章主要介紹了lambdaQueryWrapper多條件嵌套查詢方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教。
    2022-01-01
  • 詳解分別用Kotlin和java寫RecyclerView的示例

    詳解分別用Kotlin和java寫RecyclerView的示例

    本篇文章主要介紹了詳解分別用Kotlin和java寫RecyclerView的示例,詳解分別用Kotlin和java寫RecyclerView的示例
    2017-12-12
  • Springboot+Shiro+Jwt實現(xiàn)權限控制的項目實踐

    Springboot+Shiro+Jwt實現(xiàn)權限控制的項目實踐

    如今的互聯(lián)網(wǎng)已經(jīng)成為前后端分離的時代,所以本文在使用SpringBoot整合Shiro框架的時候會聯(lián)合JWT一起搭配使用,具有一定的參考價值,感興趣的可以了解一下
    2023-09-09
  • 詳解mybatis中的if-else的嵌套使用

    詳解mybatis中的if-else的嵌套使用

    本文主要介紹了mybatis中的if-else的嵌套使用,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2022-07-07
  • Java Process.waitFor()方法詳解

    Java Process.waitFor()方法詳解

    這篇文章主要介紹了Java Process.waitFor()方法詳解,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • 基于Tomcat7、Java、WebSocket的服務器推送聊天室實例

    基于Tomcat7、Java、WebSocket的服務器推送聊天室實例

    HTML5 WebSocket實現(xiàn)了服務器與瀏覽器的雙向通訊,本篇文章主要介紹了基于Tomcat7、Java、WebSocket的服務器推送聊天室實例,具有一定的參考價值,有興趣的可以了解一下。
    2016-12-12

最新評論