欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RabbitMQ工作隊(duì)列模式的使用解析

 更新時(shí)間:2025年08月17日 16:57:43   作者:沒(méi)事學(xué)AI  
文章介紹了RabbitMQ工作隊(duì)列模式,通過(guò)多消費(fèi)者競(jìng)爭(zhēng)消費(fèi)消息實(shí)現(xiàn)負(fù)載均衡,對(duì)比簡(jiǎn)單模式突出其分布式處理優(yōu)勢(shì),詳解輪詢(xún)與公平分發(fā)策略,并提供環(huán)境配置、生產(chǎn)消費(fèi)代碼示例及運(yùn)行分析,最后強(qiáng)調(diào)消息確認(rèn)、持久化和動(dòng)態(tài)擴(kuò)容等使用技巧

一、工作隊(duì)列模式核心原理

1.1 模式定義與應(yīng)用場(chǎng)景

工作隊(duì)列模式(Work Queues)是RabbitMQ中一種基于生產(chǎn)者-消費(fèi)者模型的消息分發(fā)機(jī)制,其核心設(shè)計(jì)目標(biāo)是實(shí)現(xiàn)消息的負(fù)載均衡處理。當(dāng)系統(tǒng)中存在大量任務(wù)需要處理,且單個(gè)消費(fèi)者處理能力有限時(shí),通過(guò)引入多個(gè)消費(fèi)者共同消費(fèi)隊(duì)列中的消息,可顯著提升任務(wù)處理效率。

典型應(yīng)用場(chǎng)景包括:日志處理系統(tǒng)中多節(jié)點(diǎn)并行消費(fèi)日志消息、電商平臺(tái)訂單創(chuàng)建后多服務(wù)并行處理訂單信息(庫(kù)存扣減、物流通知等)、大數(shù)據(jù)任務(wù)調(diào)度中多worker節(jié)點(diǎn)協(xié)同處理計(jì)算任務(wù)等。

1.2 與簡(jiǎn)單模式的核心區(qū)別

簡(jiǎn)單模式中僅存在一個(gè)生產(chǎn)者一個(gè)消費(fèi)者,消息由唯一的消費(fèi)者串行處理;而工作隊(duì)列模式在保留單一生產(chǎn)者和單一隊(duì)列的基礎(chǔ)上,引入多個(gè)消費(fèi)者,消費(fèi)者之間形成競(jìng)爭(zhēng)關(guān)系——每條消息只能被其中一個(gè)消費(fèi)者處理,從而實(shí)現(xiàn)任務(wù)的分布式處理。

1.3 消息分發(fā)策略

RabbitMQ默認(rèn)采用輪詢(xún)(Round-Robin)策略分發(fā)消息:將隊(duì)列中的消息依次分配給各個(gè)消費(fèi)者,確保每個(gè)消費(fèi)者處理的消息數(shù)量大致均衡。例如,隊(duì)列中有10條消息,2個(gè)消費(fèi)者時(shí),消費(fèi)者1處理序號(hào)為0、2、4、6、8的消息,消費(fèi)者2處理序號(hào)為1、3、5、7、9的消息。

需注意的是,默認(rèn)策略不考慮消費(fèi)者的處理能力差異。若需根據(jù)消費(fèi)者處理速度動(dòng)態(tài)調(diào)整消息分配(如處理快的消費(fèi)者多分配消息),可通過(guò)設(shè)置prefetchCount參數(shù)實(shí)現(xiàn)公平分發(fā)(后續(xù)實(shí)戰(zhàn)案例中會(huì)詳細(xì)說(shuō)明)。

二、工作隊(duì)列模式實(shí)戰(zhàn)案例

2.1 環(huán)境準(zhǔn)備與依賴(lài)配置

2.1.1 開(kāi)發(fā)環(huán)境

  • JDK 1.8及以上
  • Maven 3.6+
  • RabbitMQ 3.9+(確保服務(wù)已啟動(dòng),默認(rèn)端口5672)

2.1.2 依賴(lài)引入

在Maven項(xiàng)目的pom.xml中添加RabbitMQ Java客戶(hù)端依賴(lài):

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.20.0</version>
</dependency>

2.1.3 常量類(lèi)定義

創(chuàng)建RabbitMQConstants類(lèi)統(tǒng)一管理連接信息和隊(duì)列名稱(chēng),避免硬編碼:

public class RabbitMQConstants {
    // RabbitMQ連接信息
    public static final String HOST = "localhost";
    public static final int PORT = 5672;
    public static final String USERNAME = "guest";
    public static final String PASSWORD = "guest";
    public static final String VIRTUAL_HOST = "/";
    
    // 工作隊(duì)列名稱(chēng)
    public static final String WORK_QUEUE_NAME = "work.queue";
}

2.2 生產(chǎn)者實(shí)現(xiàn)(發(fā)送任務(wù)消息)

生產(chǎn)者負(fù)責(zé)創(chuàng)建連接、聲明隊(duì)列并發(fā)送消息。以下示例中,生產(chǎn)者將發(fā)送10條帶有序號(hào)的消息,模擬需要處理的任務(wù):

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class WorkQueueProducer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 創(chuàng)建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(RabbitMQConstants.HOST);
        factory.setPort(RabbitMQConstants.PORT);
        factory.setUsername(RabbitMQConstants.USERNAME);
        factory.setPassword(RabbitMQConstants.PASSWORD);
        factory.setVirtualHost(RabbitMQConstants.VIRTUAL_HOST);
        
        // 2. 創(chuàng)建連接
        Connection connection = factory.newConnection();
        
        // 3. 創(chuàng)建通道
        Channel channel = connection.createChannel();
        
        // 4. 聲明隊(duì)列(參數(shù):隊(duì)列名稱(chēng)、是否持久化、是否排他、是否自動(dòng)刪除、額外參數(shù))
        channel.queueDeclare(RabbitMQConstants.WORK_QUEUE_NAME, false, false, false, null);
        
        // 5. 發(fā)送10條消息
        for (int i = 0; i < 10; i++) {
            String message = "hello work queue......" + i;
            // 發(fā)送消息(參數(shù):交換機(jī)名稱(chēng)、隊(duì)列名稱(chēng)、消息屬性、消息體)
            channel.basicPublish("", RabbitMQConstants.WORK_QUEUE_NAME, null, message.getBytes());
            System.out.println("生產(chǎn)者發(fā)送消息:" + message);
        }
        
        // 6. 關(guān)閉資源
        channel.close();
        connection.close();
    }
}

代碼說(shuō)明

  • 連接工廠通過(guò)ConnectionFactory配置RabbitMQ服務(wù)地址、端口及認(rèn)證信息;
  • 通道(Channel)是與RabbitMQ交互的核心接口,用于聲明隊(duì)列和發(fā)送消息;
  • queueDeclare方法聲明隊(duì)列時(shí),若隊(duì)列不存在則自動(dòng)創(chuàng)建;
  • basicPublish方法中,交換機(jī)名稱(chēng)為空表示使用默認(rèn)交換機(jī)(Direct Exchange),消息將直接路由到指定隊(duì)列。

2.3 消費(fèi)者實(shí)現(xiàn)(處理任務(wù)消息)

創(chuàng)建兩個(gè)消費(fèi)者類(lèi)WorkQueueConsumer1WorkQueueConsumer2,代碼結(jié)構(gòu)一致,僅通過(guò)打印信息區(qū)分不同消費(fèi)者:

2.3.1 消費(fèi)者1代碼

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class WorkQueueConsumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 創(chuàng)建連接工廠(同生產(chǎn)者配置)
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(RabbitMQConstants.HOST);
        factory.setPort(RabbitMQConstants.PORT);
        factory.setUsername(RabbitMQConstants.USERNAME);
        factory.setPassword(RabbitMQConstants.PASSWORD);
        factory.setVirtualHost(RabbitMQConstants.VIRTUAL_HOST);
        
        // 2. 創(chuàng)建連接
        Connection connection = factory.newConnection();
        
        // 3. 創(chuàng)建通道
        Channel channel = connection.createChannel();
        
        // 4. 聲明隊(duì)列(需與生產(chǎn)者隊(duì)列名稱(chēng)一致)
        channel.queueDeclare(RabbitMQConstants.WORK_QUEUE_NAME, false, false, false, null);
        
        // 5. 定義消息消費(fèi)回調(diào)
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            System.out.println("消費(fèi)者1接收到消息:" + message);
            // 模擬任務(wù)處理耗時(shí)(100ms)
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 手動(dòng)確認(rèn)消息已處理(參數(shù):消息標(biāo)識(shí)、是否批量確認(rèn))
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        
        // 6. 取消消費(fèi)回調(diào)(可選)
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("消費(fèi)者1取消消費(fèi)");
        };
        
        // 7. 消費(fèi)消息(參數(shù):隊(duì)列名稱(chēng)、是否自動(dòng)確認(rèn)、消息接收回調(diào)、取消消費(fèi)回調(diào))
        channel.basicConsume(RabbitMQConstants.WORK_QUEUE_NAME, false, deliverCallback, cancelCallback);
    }
}

2.3.2 消費(fèi)者2代碼

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class WorkQueueConsumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 連接配置與消費(fèi)者1一致
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(RabbitMQConstants.HOST);
        factory.setPort(RabbitMQConstants.PORT);
        factory.setUsername(RabbitMQConstants.USERNAME);
        factory.setPassword(RabbitMQConstants.PASSWORD);
        factory.setVirtualHost(RabbitMQConstants.VIRTUAL_HOST);
        
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitMQConstants.WORK_QUEUE_NAME, false, false, false, null);
        
        // 消息消費(fèi)回調(diào)(處理耗時(shí)模擬為200ms,與消費(fèi)者1形成差異)
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            System.out.println("消費(fèi)者2接收到消息:" + message);
            try {
                Thread.sleep(200); // 處理耗時(shí)更長(zhǎng)
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("消費(fèi)者2取消消費(fèi)");
        };
        
        channel.basicConsume(RabbitMQConstants.WORK_QUEUE_NAME, false, deliverCallback, cancelCallback);
    }
}

代碼說(shuō)明

  • 消費(fèi)者需與生產(chǎn)者聲明相同的隊(duì)列,否則無(wú)法接收消息;
  • basicConsume方法通過(guò)DeliverCallback回調(diào)處理接收到的消息,CancelCallback用于處理消費(fèi)被取消的場(chǎng)景;
  • 示例中關(guān)閉了自動(dòng)消息確認(rèn)(autoAck=false),通過(guò)basicAck手動(dòng)確認(rèn)消息已處理,避免消息丟失;
  • 兩個(gè)消費(fèi)者通過(guò)Thread.sleep模擬不同的處理速度,為后續(xù)演示公平分發(fā)策略做準(zhǔn)備。

2.4 運(yùn)行結(jié)果與分析

2.4.1 輪詢(xún)策略下的消息分發(fā)

  • 先啟動(dòng)WorkQueueConsumer1WorkQueueConsumer2;
  • 再啟動(dòng)WorkQueueProducer發(fā)送10條消息;

觀察消費(fèi)者控制臺(tái)輸出:

  • 消費(fèi)者1接收消息:hello work queue......0、hello work queue......2、hello work queue......4、hello work queue......6hello work queue......8(偶數(shù)序號(hào));
  • 消費(fèi)者2接收消息:hello work queue......1、hello work queue......3、hello work queue......5hello work queue......7、hello work queue......9(奇數(shù)序號(hào))。

結(jié)論:默認(rèn)輪詢(xún)策略下,消息平均分配給消費(fèi)者,但未考慮處理能力差異(消費(fèi)者2處理速度慢卻分配了相同數(shù)量的消息)。

2.4.2 公平分發(fā)策略的實(shí)現(xiàn)

為解決輪詢(xún)策略的缺陷,通過(guò)設(shè)置prefetchCount=1實(shí)現(xiàn)公平分發(fā):消費(fèi)者處理完一條消息并確認(rèn)后,才會(huì)接收下一條消息。

在消費(fèi)者創(chuàng)建通道后添加以下代碼:

// 設(shè)置每次最多接收1條未確認(rèn)消息(公平分發(fā)關(guān)鍵配置)
channel.basicQos(1);

修改后重新運(yùn)行:

  • 消費(fèi)者1處理速度快,會(huì)分配更多消息(如處理6-7條);
  • 消費(fèi)者2處理速度慢,分配較少消息(如處理3-4條)。

結(jié)論basicQos(1)確保消費(fèi)者不會(huì)被分配超過(guò)其處理能力的消息,實(shí)現(xiàn)基于處理速度的動(dòng)態(tài)負(fù)載均衡。

三、工作隊(duì)列模式使用技巧與注意事項(xiàng)

3.1 消息確認(rèn)機(jī)制

  • 始終使用手動(dòng)消息確認(rèn)autoAck=false),并在消息處理完成后調(diào)用basicAck確認(rèn),避免消費(fèi)者崩潰導(dǎo)致消息丟失;
  • 若消息處理失敗,可調(diào)用basicNackbasicReject拒絕消息,根據(jù)業(yè)務(wù)需求決定是否重新入隊(duì)。

3.2 隊(duì)列持久化配置

為防止RabbitMQ服務(wù)重啟后隊(duì)列丟失,聲明隊(duì)列時(shí)設(shè)置durable=true

channel.queueDeclare(RabbitMQConstants.WORK_QUEUE_NAME, true, false, false, null);

同時(shí),發(fā)送消息時(shí)需設(shè)置消息持久化屬性:

AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
        .deliveryMode(2) // 2表示持久化消息
        .build();
channel.basicPublish("", RabbitMQConstants.WORK_QUEUE_NAME, properties, message.getBytes());

3.3 消費(fèi)者動(dòng)態(tài)擴(kuò)容

工作隊(duì)列模式支持動(dòng)態(tài)增減消費(fèi)者:新增消費(fèi)者會(huì)自動(dòng)參與消息競(jìng)爭(zhēng),無(wú)需重啟生產(chǎn)者或修改隊(duì)列配置,適合應(yīng)對(duì)突發(fā)流量場(chǎng)景(如電商大促時(shí)臨時(shí)增加消費(fèi)者節(jié)點(diǎn))。

3.4 避免消息堆積

  • 合理設(shè)置消費(fèi)者數(shù)量,確保消費(fèi)速度大于生產(chǎn)速度;
  • 結(jié)合RabbitMQ的監(jiān)控工具(如Management Plugin)實(shí)時(shí)監(jiān)控隊(duì)列消息堆積情況,及時(shí)擴(kuò)容或排查消費(fèi)端問(wèn)題。

通過(guò)以上原理分析和實(shí)戰(zhàn)案例,相信讀者已掌握RabbitMQ工作隊(duì)列模式的核心用法。在實(shí)際開(kāi)發(fā)中,需根據(jù)業(yè)務(wù)場(chǎng)景選擇合適的消息分發(fā)策略,并做好消息可靠性保障和系統(tǒng)監(jiān)控,以構(gòu)建高效、穩(wěn)定的分布式消息處理系統(tǒng)。

總結(jié)

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • idea導(dǎo)入若依項(xiàng)目教程

    idea導(dǎo)入若依項(xiàng)目教程

    文章介紹了如何在IntelliJ?IDEA中導(dǎo)入若依管理系統(tǒng)項(xiàng)目,并詳細(xì)步驟包括克隆項(xiàng)目、修改配置文件、創(chuàng)建數(shù)據(jù)庫(kù)、運(yùn)行項(xiàng)目和前端展示
    2025-03-03
  • 解決Mybatis-Plus操作分頁(yè)后數(shù)據(jù)失效問(wèn)題

    解決Mybatis-Plus操作分頁(yè)后數(shù)據(jù)失效問(wèn)題

    這篇文章主要介紹了解決Mybatis-Plus操作分頁(yè)后數(shù)據(jù)失效問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2020-11-11
  • spring AMQP代碼生成rabbitmq的exchange and queue教程

    spring AMQP代碼生成rabbitmq的exchange and queue教程

    使用Spring AMQP代碼直接創(chuàng)建RabbitMQ exchange和queue,并確保綁定關(guān)系自動(dòng)成立,簡(jiǎn)化消息隊(duì)列配置,此經(jīng)驗(yàn)分享供參考,歡迎支持腳本之家
    2025-08-08
  • java list用法示例詳解

    java list用法示例詳解

    java中可變數(shù)組的原理就是不斷的創(chuàng)建新的數(shù)組,將原數(shù)組加到新的數(shù)組中,下文對(duì)java list用法做了詳解
    2014-01-01
  • spring?boot?Mybatis?攔截器實(shí)現(xiàn)拼接sql和修改的代碼詳解

    spring?boot?Mybatis?攔截器實(shí)現(xiàn)拼接sql和修改的代碼詳解

    這篇文章主要介紹了spring?boot?Mybatis?攔截器實(shí)現(xiàn)拼接sql和修改,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2022-05-05
  • Java中的Spring Security配置過(guò)濾器

    Java中的Spring Security配置過(guò)濾器

    這篇文章主要介紹了Java中的Spring Security配置過(guò)濾器,文章通過(guò)圍繞文章主題的相關(guān)資料展開(kāi)詳細(xì)內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下
    2022-05-05
  • 根據(jù)list中對(duì)象的屬性去重和排序小結(jié)(必看篇)

    根據(jù)list中對(duì)象的屬性去重和排序小結(jié)(必看篇)

    下面小編就為大家?guī)?lái)一篇根據(jù)list中對(duì)象的屬性去重和排序小結(jié)(必看篇)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2017-05-05
  • 解決java.lang.IllegalArgumentException異常問(wèn)題

    解決java.lang.IllegalArgumentException異常問(wèn)題

    這篇文章主要介紹了解決java.lang.IllegalArgumentException異常問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-04-04
  • 使用JSCH框架通過(guò)跳轉(zhuǎn)機(jī)訪問(wèn)其他節(jié)點(diǎn)的方法

    使用JSCH框架通過(guò)跳轉(zhuǎn)機(jī)訪問(wèn)其他節(jié)點(diǎn)的方法

    下面小編就為大家分享一篇使用JSCH框架通過(guò)跳轉(zhuǎn)機(jī)訪問(wèn)其他節(jié)點(diǎn)的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2017-12-12
  • Java中I/O輸入輸出的深入講解

    Java中I/O輸入輸出的深入講解

    Java的I/O技術(shù)可以將數(shù)據(jù)保存到文本文件、二進(jìn)制文件甚至是ZIP壓縮文件中,以達(dá)到永久性保存數(shù)據(jù)的要求,下面這篇文章主要給大家介紹了關(guān)于Java中I/O輸入輸出的相關(guān)資料,需要的朋友可以參考下
    2022-08-08

最新評(píng)論