欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot整合RabbitMQ實(shí)現(xiàn)RPC遠(yuǎn)程調(diào)用功能

 更新時(shí)間:2023年06月12日 11:40:45   作者:霽晨晨晨  
在分布式系統(tǒng)中,RPC(Remote?Procedure?Call)是一種常用的通信機(jī)制,它可以讓不同的節(jié)點(diǎn)之間像調(diào)用本地函數(shù)一樣進(jìn)行函數(shù)調(diào)用,隱藏了底層的網(wǎng)絡(luò)通信細(xì)節(jié),通過(guò)本教程,你可以了解RPC的基本原理以及如何使用Java實(shí)現(xiàn)一個(gè)簡(jiǎn)單的RPC客戶(hù)端和服務(wù)端

1. 交互過(guò)程

  • 啟動(dòng) RPC 服務(wù)端和客戶(hù)端,創(chuàng)建連接和通道。
  • 聲明請(qǐng)求隊(duì)列和回復(fù)隊(duì)列,確保使用相同的隊(duì)列名稱(chēng)。
  • 客戶(hù)端發(fā)送請(qǐng)求:客戶(hù)端將請(qǐng)求消息發(fā)送到指定的請(qǐng)求隊(duì)列中。
  • 服務(wù)端監(jiān)聽(tīng)請(qǐng)求隊(duì)列:服務(wù)端在指定的請(qǐng)求隊(duì)列上監(jiān)聽(tīng)請(qǐng)求消息。
  • 服務(wù)端接收請(qǐng)求:服務(wù)端接收到客戶(hù)端發(fā)送的請(qǐng)求消息。
  • 服務(wù)端處理請(qǐng)求:服務(wù)端根據(jù)請(qǐng)求消息中的參數(shù),執(zhí)行相應(yīng)的業(yè)務(wù)邏輯,并得到處理結(jié)果。
  • 服務(wù)端發(fā)送響應(yīng):服務(wù)端將處理結(jié)果作為響應(yīng)消息發(fā)送到客戶(hù)端指定的回復(fù)隊(duì)列中。
  • 客戶(hù)端監(jiān)聽(tīng)響應(yīng)隊(duì)列:客戶(hù)端在指定的回復(fù)隊(duì)列上監(jiān)聽(tīng)響應(yīng)消息。
  • 客戶(hù)端接收響應(yīng):客戶(hù)端接收到服務(wù)端發(fā)送的響應(yīng)消息。
  • 客戶(hù)端處理響應(yīng):客戶(hù)端根據(jù)響應(yīng)消息中的結(jié)果進(jìn)行相應(yīng)的處理。

2. 導(dǎo)入依賴(lài)

創(chuàng)建一個(gè)SpringBoot項(xiàng)目并導(dǎo)入依賴(lài)坐標(biāo)

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.10.RELEASE</version>
    <relativePath/>
</parent>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        <version>2.3.10.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        <version>2.3.10.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.12.0</version>
    </dependency>
</dependencies>

application.yml

spring:
  rabbitmq:
    host: localhost # 主機(jī)名
    port: 5672 # 端口
    username: guest # 用戶(hù)名
    password: guest # 密碼
    virtual-host: /
    template:
      receive-timeout: 2000
      reply-timeout: 2000
    listener:
      simple:
        concurrency: 1
        max-concurrency: 3
        prefetch: 1 # 消費(fèi)者每次只能預(yù)取1條數(shù)據(jù)到內(nèi)存并處理,默認(rèn)為250條
        acknowledge-mode: manual # 確定機(jī)制 manual:手動(dòng)確認(rèn)
    publisher-returns: true
    publisher-confirm-type: correlated

注意:需要提前開(kāi)啟RabbitMQ服務(wù),否則項(xiàng)目運(yùn)行會(huì)報(bào)錯(cuò)

3. RPC 服務(wù)端

首先,我們來(lái)看一下RPC服務(wù)端的代碼。

package com.rabbit.rpc;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
 * rpc服務(wù)端
 */
public class RPCServer {
    // 定義請(qǐng)求隊(duì)列常量
    private final static String REQUEST_QUEUE_NAME = "rpc_queue";
    // 定義回復(fù)隊(duì)列常量
    private final static String REPLY_QUEUE_NAME = "rpc_reply_queue";
    /**
     * 服務(wù)端啟動(dòng)入口
     */
    public static void main(String[] args) {
        // 創(chuàng)建連接
        ConnectionFactory factory = new ConnectionFactory();
        //factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 聲明請(qǐng)求隊(duì)列和回復(fù)隊(duì)列
            channel.queueDeclare(REQUEST_QUEUE_NAME, false, false, false, null);
            channel.queueDeclare(REPLY_QUEUE_NAME, false, false, false, null);
            // 每次僅接收一條未經(jīng)確認(rèn)的消息
            channel.basicQos(1);
            // 構(gòu)建消費(fèi)者屬性
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // 獲取消息
                    String message = new String(body, StandardCharsets.UTF_8);
                    // 進(jìn)行業(yè)務(wù)處理構(gòu)建響應(yīng)數(shù)據(jù) (這里做字符串拼接模擬響應(yīng)數(shù)據(jù))
                    String response = message + ":::";
                    // 構(gòu)造響應(yīng)基本屬性
                    AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
                            .correlationId(properties.getCorrelationId())   // 設(shè)置關(guān)聯(lián)id
                            .build();
                    // 發(fā)送響應(yīng)數(shù)據(jù)到回復(fù)隊(duì)列 (rpc_reply_queue)
                    channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes(StandardCharsets.UTF_8));
                    // 手動(dòng)回執(zhí)消息確認(rèn)消費(fèi)
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            // 消費(fèi)rpc_queue隊(duì)列的消息
            channel.basicConsume(REQUEST_QUEUE_NAME, false, consumer);
            // 持續(xù)監(jiān)聽(tīng)請(qǐng)求消息
            while (true) {
                Thread.sleep(50);
            }
        } catch (IOException | TimeoutException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在RPC服務(wù)端的代碼中,我們首先創(chuàng)建了一個(gè)連接和一個(gè)通道,然后聲明了請(qǐng)求隊(duì)列和回復(fù)隊(duì)列。我們?cè)O(shè)置每一次只接收一條未確認(rèn)的消息,并創(chuàng)建了一個(gè)消費(fèi)者對(duì)象,用于處理接收到的消息。

在handleDelivery方法中,我們從消息中獲取請(qǐng)求數(shù)據(jù),并進(jìn)行業(yè)務(wù)處理,然后構(gòu)造響應(yīng)數(shù)據(jù)并發(fā)送到回復(fù)隊(duì)列。最后,我們手動(dòng)確認(rèn)消費(fèi),并繼續(xù)監(jiān)聽(tīng)請(qǐng)求消息。

4. RPC 客戶(hù)端

接下來(lái),我們看一下RPC客戶(hù)端的代碼。

package com.rabbit.rpc;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
/**
 * rpc客戶(hù)端
 */
public class RPCClient {
    // 定義請(qǐng)求隊(duì)列常量
    private final static String REQUEST_QUEUE_NAME = "rpc_queue";
    // 定義回復(fù)隊(duì)列常量
    private final static String REPLY_QUEUE_NAME = "rpc_reply_queue";
    /**
     * 客戶(hù)端啟動(dòng)入口
     */
    public static void main(String[] args) {
        System.out.println("Response1: " + call("Hello, RPC Server1"));
        System.out.println("Response2: " + call("Hello, RPC Server2"));
        System.out.println("Response3: " + call("Hello, RPC Server3"));
    }
    /**
     * 發(fā)送請(qǐng)求到隊(duì)列并返回響應(yīng)數(shù)據(jù)
     *
     * @param message 請(qǐng)求消息
     * @return 響應(yīng)數(shù)據(jù)
     */
    public static String call(String message) {
        // 創(chuàng)建連接
        ConnectionFactory factory = new ConnectionFactory();
        //factory.setHost("localhost");
        String result = "";
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 聲明請(qǐng)求隊(duì)列和回復(fù)隊(duì)列
            channel.queueDeclare(REQUEST_QUEUE_NAME, false, false, false, null);
            channel.queueDeclare(REPLY_QUEUE_NAME, false, false, false, null);
            // 每次僅接收一條未經(jīng)確認(rèn)的消息
            channel.basicQos(1);
            // 生成關(guān)聯(lián)id
            String correlationId = UUID.randomUUID().toString();
            // 構(gòu)造請(qǐng)求基本屬性
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                    .correlationId(correlationId)   // 設(shè)置關(guān)聯(lián)id
                    .replyTo(REPLY_QUEUE_NAME)      // 設(shè)置回復(fù)隊(duì)列 (rpc_reply_queue)
                    .build();
            // 發(fā)送請(qǐng)求消息到rpc_queue隊(duì)列
            channel.basicPublish("", REQUEST_QUEUE_NAME, props, message.getBytes(StandardCharsets.UTF_8));
            // 用于保存響應(yīng)消息的阻塞隊(duì)列
            final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
            // 監(jiān)聽(tīng)回復(fù)隊(duì)列接收響應(yīng)消息
            String consumerTag = channel.basicConsume(REPLY_QUEUE_NAME, true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                    // 通過(guò)關(guān)聯(lián)id獲取到響應(yīng)數(shù)據(jù)
                    if (properties.getCorrelationId().equals(correlationId)) {
                        response.offer(new String(body, StandardCharsets.UTF_8));
                    }
                }
            });
            // 清空阻塞隊(duì)列
            response.clear();
            // 等待接收響應(yīng)消息
            result = response.take();
            // 取消消費(fèi)者的監(jiān)聽(tīng)
            channel.basicCancel(consumerTag);
        } catch (IOException | TimeoutException | InterruptedException e) {
            e.printStackTrace();
        }
        return result;
    }
}

在RPC客戶(hù)端的代碼中,我們同樣創(chuàng)建了一個(gè)連接和一個(gè)通信,并聲明了請(qǐng)求隊(duì)列和回復(fù)隊(duì)列。然后,我們?cè)O(shè)置每次只接收一條未確認(rèn)的消息,并創(chuàng)建了一個(gè)唯一的關(guān)聯(lián)id。

在call方法中,我們構(gòu)建了請(qǐng)求消息的基礎(chǔ)屬性,并將請(qǐng)求消息發(fā)送到請(qǐng)求隊(duì)列。接下來(lái),我們創(chuàng)建了一個(gè)阻塞隊(duì)列,用于保存response響應(yīng)消息。

通過(guò)監(jiān)聽(tīng)回復(fù)隊(duì)列接收響應(yīng)消息,并通過(guò)關(guān)聯(lián)id接收到對(duì)應(yīng)的響應(yīng)數(shù)據(jù),然后將其放入response隊(duì)列中。最后,通過(guò)阻塞隊(duì)列接收響應(yīng)消息,并返回響應(yīng)數(shù)據(jù)。

5. 運(yùn)行代碼

現(xiàn)在,我們可以運(yùn)行RPC服務(wù)端和客戶(hù)端的代碼了。首先運(yùn)行服務(wù)端代碼,它會(huì)啟動(dòng)一個(gè)監(jiān)聽(tīng)請(qǐng)求的進(jìn)程。然后,運(yùn)行客戶(hù)端代碼,它會(huì)發(fā)送請(qǐng)求消息并等待接收響應(yīng)消息。

最終會(huì)看到客戶(hù)端發(fā)出了三次請(qǐng)求,并打印了對(duì)應(yīng)的響應(yīng)數(shù)據(jù)。

項(xiàng)目地址:rabbitmq-rpc: RabbitMQ實(shí)現(xiàn)RPC遠(yuǎn)程調(diào)用功能練習(xí)Demo (gitee.com)

以上就是SpringBoot整合RabbitMQ實(shí)現(xiàn)RPC遠(yuǎn)程調(diào)用功能的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot RabbitMQ RPC遠(yuǎn)程調(diào)用的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • java web支持jsonp的實(shí)現(xiàn)代碼

    java web支持jsonp的實(shí)現(xiàn)代碼

    這篇文章主要介紹了java web支持jsonp的實(shí)現(xiàn)代碼,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2018-11-11
  • java各種類(lèi)型對(duì)象占用內(nèi)存情況分析

    java各種類(lèi)型對(duì)象占用內(nèi)存情況分析

    這篇文章主要介紹了java各種類(lèi)型對(duì)象占用內(nèi)存情況分析,對(duì)內(nèi)存或者性能優(yōu)化感興趣的同學(xué),一定要看一下
    2021-04-04
  • Kafka常用命令之kafka-console-consumer.sh解讀

    Kafka常用命令之kafka-console-consumer.sh解讀

    這篇文章主要介紹了Kafka常用命令之kafka-console-consumer.sh解讀,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-03-03
  • 基于Process#waitFor()阻塞問(wèn)題的解決

    基于Process#waitFor()阻塞問(wèn)題的解決

    這篇文章主要介紹了Process#waitFor()阻塞問(wèn)題的解決,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • Java使用組合模式實(shí)現(xiàn)表示公司組織結(jié)構(gòu)功能示例

    Java使用組合模式實(shí)現(xiàn)表示公司組織結(jié)構(gòu)功能示例

    這篇文章主要介紹了Java使用組合模式實(shí)現(xiàn)表示公司組織結(jié)構(gòu)功能,簡(jiǎn)單描述了組合模式的概念、功能并結(jié)合實(shí)例形式分析了Java使用組合模式實(shí)現(xiàn)公司組織結(jié)構(gòu)表示功能具體操作步驟與相關(guān)注意事項(xiàng),需要的朋友可以參考下
    2018-05-05
  • Java Chassis3過(guò)載狀態(tài)下的快速失敗解決分析

    Java Chassis3過(guò)載狀態(tài)下的快速失敗解決分析

    本文解密了Java Chassis 3快速失敗相關(guān)的機(jī)制和背后故事,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2024-01-01
  • java實(shí)現(xiàn)文件上傳下載和圖片壓縮代碼示例

    java實(shí)現(xiàn)文件上傳下載和圖片壓縮代碼示例

    本文給大家介紹的是項(xiàng)目中經(jīng)常需要用到的一個(gè)常用的功能,使用java實(shí)現(xiàn)文件的上傳下載和圖片的壓縮功能,這里推薦給大家,有需要的小伙伴參考下。
    2015-03-03
  • 詳解如何在Java中使用阿里云對(duì)象存儲(chǔ)OSS

    詳解如何在Java中使用阿里云對(duì)象存儲(chǔ)OSS

    Java是世界上最流行的編程語(yǔ)言之一,擁有著廣泛的應(yīng)用場(chǎng)景和強(qiáng)大的生態(tài)系統(tǒng),阿里云對(duì)象存儲(chǔ) OSS 是一種企業(yè)級(jí)的云存儲(chǔ)服務(wù),本文將介紹如何在 Java 中使用阿里云對(duì)象存儲(chǔ) OSS,并寫(xiě)一點(diǎn)相應(yīng)的代碼示例供大家參考
    2023-06-06
  • spring boot發(fā)簡(jiǎn)單文本郵件案例

    spring boot發(fā)簡(jiǎn)單文本郵件案例

    這篇文章主要介紹了spring boot發(fā)簡(jiǎn)單文本郵件案例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-10-10
  • Java?Collection接口中的常用方法總結(jié)

    Java?Collection接口中的常用方法總結(jié)

    這篇文章將大概用代碼案例簡(jiǎn)單總結(jié)一下?Collection?接口中的一些方法,我們會(huì)以他的實(shí)現(xiàn)類(lèi)?Arraylist?為例創(chuàng)建對(duì)象??煲黄饋?lái)看看吧
    2022-12-12

最新評(píng)論