RabbitMQ消費端單線程與多線程案例講解
?? 一、基礎(chǔ)概念
模型 | 消費者數(shù)量 | 每個消費者內(nèi)部線程數(shù) | 順序性 | 場景說明 |
---|---|---|---|---|
單消費者單線程 | 1 | 1 | ? 保序 | 處理邏輯簡單,保證順序的常見場景 |
單消費者多線程 | 1 | >1 | ? 不保序 | 提升處理能力,放棄順序要求 |
多消費者單線程 | >1 | 1 | ? 不保序 | 多個隊列/分區(qū)消費,提升并發(fā) |
多消費者多線程 | >1 | >1 | ? 不保序 | 高并發(fā)場景下批量處理,放棄順序 |
concurrency# 初始消費者線程數(shù) max-concurrency# 最大消費者線程數(shù) prefetch# 每個消費者預(yù)取的消息數(shù)
concurrency: 2
- 表示初始創(chuàng)建的消費者線程數(shù)量
- 系統(tǒng)啟動時會立即創(chuàng)建 2 個消費者線程
- 這些線程會持續(xù)監(jiān)聽消息隊列
max-concurrency: 2
- 表示允許的最大消費者線程數(shù)量
- 這里設(shè)置為 2(與 concurrency 相同),表示線程數(shù)不會動態(tài)擴展
- 如果設(shè)置 max-concurrency > concurrency,系統(tǒng)會在負載高時動態(tài)增加消費者
詳細解釋:
concurrency和max-concurrency
不會影響每個消費者是否是多線程執(zhí)行,只會導(dǎo)致有多個消費者線程,只有用線程池才會導(dǎo)致每個消費者多線程消費
而沒有用線程池,也設(shè)置prefetch是因為消息被大量預(yù)取,單線程處理不過來時堆積等待,單線程并不會影響消息的順序性,只有使用了線程池才會影響
使用了線程池一定會導(dǎo)致消息順序性問題這與設(shè)不設(shè)置prefetch無關(guān),因為使用線程池后,任務(wù)交個線程池就返回了屬于異步
舉個例子:
1. RabbitMQ 給消費者推送消息1,消費者收到,提交給線程池任務(wù)A(耗時長)。
2. 消費者馬上ACK消息1(因為業(yè)務(wù)交給線程池了,自己處理完畢的感覺
3. RabbitMQ 再給消費者推送消息2,消費者收到,提交給線程池任務(wù)B(耗時短)。
4. R線程池調(diào)度先跑完任務(wù)B,后跑任務(wù)A。
? 單消費者 + 單線程消費
- 保證順序:消費者內(nèi)部串行執(zhí)行。
- 配置關(guān)鍵:
spring: rabbitmq: listener: simple: concurrency: 1 max-concurrency: 1 prefetch: 1
消費者代碼:
@Component public class MultiConsumerSingleThread { @RabbitListener(queues = "order_queue", concurrency = "2") public void receive(String message) { System.out.println("?? [線程:" + Thread.currentThread().getName() + "] 收到消息:" + message); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } }
? 單消費者 + 多線程消費
- 不保順序:一個消費者使用線程池異步處理消息。
- 配置關(guān)鍵:默認(rèn)配置 + 手動異步處理
- 消費者代碼:
@Component public class MultiThreadConsumer { private final ExecutorService executor = Executors.newFixedThreadPool(5); @RabbitListener(queues = "order_queue") public void receive(String message) { executor.submit(() -> { System.out.println("?? [線程:" + Thread.currentThread().getName() + "] 收到消息:" + message); try { Thread.sleep(500); // 模擬耗時 } catch (InterruptedException e) { e.printStackTrace(); } }); } }
說明:消息提交到線程池,先到的不一定先處理完成,順序可能亂。
? 多消費者 + 單線程消費
- 不保順序:多個消費者實例輪詢分配消息,各自順序保留,但整體順序錯亂。
- 配置關(guān)鍵:
spring: rabbitmq: listener: simple: concurrency: 2 max-concurrency: 2 prefetch: 1
消費者代碼(共享類,也可拆成多個類模擬多實例):
@Component public class MultiConsumerSingleThread { //concurrency = "2":它和配置文件中的 concurrency: 2 作用一致,但優(yōu)先級更高。 @RabbitListener(queues = "order_queue", concurrency = "2") public void receive(String message) { System.out.println("?? [線程:" + Thread.currentThread().getName() + "] 收到消息:" + message); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } }
? 多消費者 + 多線程消費
- 不保順序:每個消費者又使用線程池異步處理消息,最大吞吐量模式。
- 適合場景:數(shù)據(jù)導(dǎo)入、日志收集、發(fā)送通知等對順序無要求的批量處理。
- 配置關(guān)鍵:
spring: rabbitmq: listener: simple: concurrency: 3 max-concurrency: 3 prefetch: 10
消費者代碼:
@Component public class MultiConsumerMultiThread { private final ExecutorService executor = Executors.newFixedThreadPool(10); @RabbitListener(queues = "order_queue", concurrency = "3") public void receive(String message) { executor.submit(() -> { System.out.println("?? [線程:" + Thread.currentThread().getName() + "] 收到消息:" + message); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } }); } }
?? 補充說明
concurrency
: 控制并發(fā)消費者數(shù)量,等于消費者數(shù)。prefetch
: 控制每個消費者本地最多拉取多少條消息(如 1 表示嚴(yán)格串行處理)。- 每個
@RabbitListener
本質(zhì)上是一個容器,可以通過concurrency
配置“實例個數(shù)”。
到此這篇關(guān)于RabbitMQ消費端單線程與多線程的文章就介紹到這了,更多相關(guān)RabbitMQ單線程與多線程內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
使用Java實現(xiàn)簡單搭建內(nèi)網(wǎng)穿透
內(nèi)網(wǎng)穿透是一種網(wǎng)絡(luò)技術(shù),適用于需要遠程訪問本地部署服務(wù)的場景,本文主要為大家介紹了如何使用Java實現(xiàn)簡單搭建內(nèi)網(wǎng)穿透,感興趣的可以了解下2024-02-02Java遍歷字符串和統(tǒng)計字符個數(shù)的操作方法
這篇文章主要介紹了Java遍歷字符串和統(tǒng)計字符個數(shù)的操作方法,本文通過實例代碼給大家介紹的非常詳細,感興趣的朋友跟隨小編一起看看吧2024-12-12java 多線程Thread與runnable的區(qū)別
這篇文章主要介紹了java 多線程Thread與runnable的區(qū)別的相關(guān)資料,java線程有兩種方法繼承thread類與實現(xiàn)runnable接口,下面就提供實例幫助大家理解,需要的朋友可以參考下2017-08-08MyBatis動態(tài)創(chuàng)建表的實例代碼
在項目需求中,我們經(jīng)常會遇到動態(tài)操作數(shù)據(jù)表的需求,常見的我們會把日志、設(shè)備實時位置信息等存入數(shù)據(jù)表,并且以一定時間段生成一個表來存儲。接下來通過本文給大家介紹MyBatis動態(tài)創(chuàng)建表的方法,感興趣的朋友一起看看吧2018-07-07解決 java.lang.NoSuchMethodError的錯誤
這篇文章主要介紹了解決 java.lang.NoSuchMethodError的錯誤的相關(guān)資料,需要的朋友可以參考下2017-06-06