欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RabbitMQ消費端單線程與多線程案例講解

 更新時間:2025年07月26日 09:30:25   作者:你我約定有三  
文章解析RabbitMQ消費端單線程與多線程處理機制,說明concurrency控制消費者數(shù)量,max-concurrency控制最大線程數(shù),prefetch影響消息預(yù)取量,強調(diào)線程池使用會導(dǎo)致順序混亂,適用于對順序無要求的批量處理場景,感興趣的朋友一起看看吧

?? 一、基礎(chǔ)概念

模型消費者數(shù)量每個消費者內(nèi)部線程數(shù)順序性場景說明
單消費者單線程11? 保序處理邏輯簡單,保證順序的常見場景
單消費者多線程1>1? 不保序提升處理能力,放棄順序要求
多消費者單線程>11? 不保序多個隊列/分區(qū)消費,提升并發(fā)
多消費者多線程>1>1? 不保序高并發(fā)場景下批量處理,放棄順序
concurrency# 初始消費者線程數(shù)
max-concurrency# 最大消費者線程數(shù)
prefetch# 每個消費者預(yù)取的消息數(shù)
  • concurrency: 2
    • 表示初始創(chuàng)建的消費者線程數(shù)量
    • 系統(tǒng)啟動時會立即創(chuàng)建 2 個消費者線程
    • 這些線程會持續(xù)監(jiān)聽消息隊列
  • max-concurrency: 2
    • 表示允許的最大消費者線程數(shù)量
    • 這里設(shè)置為 2(與 concurrency 相同),表示線程數(shù)不會動態(tài)擴展
    • 如果設(shè)置 max-concurrency > concurrency,系統(tǒng)會在負載高時動態(tài)增加消費者

詳細解釋:

       concurrency和max-concurrency不會影響每個消費者是否是多線程執(zhí)行,只會導(dǎo)致有多個消費者線程,只有用線程池才會導(dǎo)致每個消費者多線程消費

        而沒有用線程池,也設(shè)置prefetch是因為消息被大量預(yù)取,單線程處理不過來時堆積等待,單線程并不會影響消息的順序性,只有使用了線程池才會影響

        使用了線程池一定會導(dǎo)致消息順序性問題這與設(shè)不設(shè)置prefetch無關(guān),因為使用線程池后,任務(wù)交個線程池就返回了屬于異步

舉個例子:

                1. RabbitMQ 給消費者推送消息1,消費者收到,提交給線程池任務(wù)A(耗時長)。

                2. 消費者馬上ACK消息1(因為業(yè)務(wù)交給線程池了,自己處理完畢的感覺

                3.  RabbitMQ 再給消費者推送消息2,消費者收到,提交給線程池任務(wù)B(耗時短)。

                4. R線程池調(diào)度先跑完任務(wù)B,后跑任務(wù)A。

? 單消費者 + 單線程消費

  • 保證順序:消費者內(nèi)部串行執(zhí)行。
  • 配置關(guān)鍵
spring:
  rabbitmq:
    listener:
      simple:
        concurrency: 1
        max-concurrency: 1
        prefetch: 1

消費者代碼

@Component
public class MultiConsumerSingleThread {
    @RabbitListener(queues = "order_queue", concurrency = "2")
    public void receive(String message) {
        System.out.println("?? [線程:" + Thread.currentThread().getName() + "] 收到消息:" + message);
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

? 單消費者 + 多線程消費

  • 不保順序:一個消費者使用線程池異步處理消息。
  • 配置關(guān)鍵:默認(rèn)配置 + 手動異步處理
  • 消費者代碼
@Component
public class MultiThreadConsumer {
    private final ExecutorService executor = Executors.newFixedThreadPool(5);
    @RabbitListener(queues = "order_queue")
    public void receive(String message) {
        executor.submit(() -> {
            System.out.println("?? [線程:" + Thread.currentThread().getName() + "] 收到消息:" + message);
            try {
                Thread.sleep(500); // 模擬耗時
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}

說明:消息提交到線程池,先到的不一定先處理完成,順序可能亂。

? 多消費者 + 單線程消費

  • 不保順序:多個消費者實例輪詢分配消息,各自順序保留,但整體順序錯亂。
  • 配置關(guān)鍵
spring:
  rabbitmq:
    listener:
      simple:
        concurrency: 2
        max-concurrency: 2
        prefetch: 1

消費者代碼(共享類,也可拆成多個類模擬多實例)

@Component
public class MultiConsumerSingleThread {
    //concurrency = "2":它和配置文件中的 concurrency: 2 作用一致,但優(yōu)先級更高。
    @RabbitListener(queues = "order_queue", concurrency = "2")
    public void receive(String message) {
        System.out.println("?? [線程:" + Thread.currentThread().getName() + "] 收到消息:" + message);
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

? 多消費者 + 多線程消費

  • 不保順序:每個消費者又使用線程池異步處理消息,最大吞吐量模式。
  • 適合場景:數(shù)據(jù)導(dǎo)入、日志收集、發(fā)送通知等對順序無要求的批量處理。
  • 配置關(guān)鍵
spring:
  rabbitmq:
    listener:
      simple:
        concurrency: 3
        max-concurrency: 3
        prefetch: 10

消費者代碼

@Component
public class MultiConsumerMultiThread {
    private final ExecutorService executor = Executors.newFixedThreadPool(10);
    @RabbitListener(queues = "order_queue", concurrency = "3")
    public void receive(String message) {
        executor.submit(() -> {
            System.out.println("?? [線程:" + Thread.currentThread().getName() + "] 收到消息:" + message);
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}

?? 補充說明

  • concurrency: 控制并發(fā)消費者數(shù)量,等于消費者數(shù)。
  • prefetch: 控制每個消費者本地最多拉取多少條消息(如 1 表示嚴(yán)格串行處理)。
  • 每個 @RabbitListener 本質(zhì)上是一個容器,可以通過 concurrency 配置“實例個數(shù)”。

到此這篇關(guān)于RabbitMQ消費端單線程與多線程的文章就介紹到這了,更多相關(guān)RabbitMQ單線程與多線程內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • SpringBoot中使用@Async注解失效場景及說明

    SpringBoot中使用@Async注解失效場景及說明

    在Spring?Boot中,@Async注解就像一把刀,能幫你輕松處理那些耗時的任務(wù),讓主線程可以繼續(xù)忙別的事兒,不過,跟所有強大的工具一樣,用不好它也可能出岔子,為了避免這些坑,咱們得深入了解下@Async注解,接下來,咱們就來聊聊7種常見的@Async失效情況,需要的朋友可以參考下
    2024-07-07
  • Java對int[]數(shù)組做新增刪除去重操作代碼

    Java對int[]數(shù)組做新增刪除去重操作代碼

    這篇文章主要介紹了Java里面對int[]數(shù)組做新增刪除去重實現(xiàn),這里記錄下使用int[]數(shù)組對數(shù)組進行新增刪除去重等操作,用來更加了解java里面的集合類思想,需要的朋友可以參考下
    2023-10-10
  • 詳解Java List的擴容機制原理及應(yīng)用

    詳解Java List的擴容機制原理及應(yīng)用

    在Java中,List是一種非常常用的數(shù)據(jù)結(jié)構(gòu),用于存儲有序的元素集合,本文將分析Java List的擴容機制原理,并通過示例代碼和測試代碼來加強闡述內(nèi)容,具有一定的參考價值,感興趣的可以了解一下
    2023-08-08
  • 使用Java實現(xiàn)簡單搭建內(nèi)網(wǎng)穿透

    使用Java實現(xiàn)簡單搭建內(nèi)網(wǎng)穿透

    內(nèi)網(wǎng)穿透是一種網(wǎng)絡(luò)技術(shù),適用于需要遠程訪問本地部署服務(wù)的場景,本文主要為大家介紹了如何使用Java實現(xiàn)簡單搭建內(nèi)網(wǎng)穿透,感興趣的可以了解下
    2024-02-02
  • Java遍歷字符串和統(tǒng)計字符個數(shù)的操作方法

    Java遍歷字符串和統(tǒng)計字符個數(shù)的操作方法

    這篇文章主要介紹了Java遍歷字符串和統(tǒng)計字符個數(shù)的操作方法,本文通過實例代碼給大家介紹的非常詳細,感興趣的朋友跟隨小編一起看看吧
    2024-12-12
  • java 多線程Thread與runnable的區(qū)別

    java 多線程Thread與runnable的區(qū)別

    這篇文章主要介紹了java 多線程Thread與runnable的區(qū)別的相關(guān)資料,java線程有兩種方法繼承thread類與實現(xiàn)runnable接口,下面就提供實例幫助大家理解,需要的朋友可以參考下
    2017-08-08
  • scala中常用特殊符號詳解

    scala中常用特殊符號詳解

    這篇文章主要介紹了scala中常用特殊符號詳解,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-06-06
  • MyBatis動態(tài)創(chuàng)建表的實例代碼

    MyBatis動態(tài)創(chuàng)建表的實例代碼

    在項目需求中,我們經(jīng)常會遇到動態(tài)操作數(shù)據(jù)表的需求,常見的我們會把日志、設(shè)備實時位置信息等存入數(shù)據(jù)表,并且以一定時間段生成一個表來存儲。接下來通過本文給大家介紹MyBatis動態(tài)創(chuàng)建表的方法,感興趣的朋友一起看看吧
    2018-07-07
  • 解決 java.lang.NoSuchMethodError的錯誤

    解決 java.lang.NoSuchMethodError的錯誤

    這篇文章主要介紹了解決 java.lang.NoSuchMethodError的錯誤的相關(guān)資料,需要的朋友可以參考下
    2017-06-06
  • springMVC發(fā)送郵件的簡單實現(xiàn)

    springMVC發(fā)送郵件的簡單實現(xiàn)

    本篇文章主要介紹了springMVC發(fā)送郵件的簡單實現(xiàn) ,主要是利用利用javax.mail發(fā)送郵件,圖片與附件都可發(fā)送,有興趣的可以了解一下
    2017-04-04

最新評論