RocketMQ中的消費模式和消費策略詳解
前言
首先明確一點,RocketMQ 是基于發(fā)布訂閱模型的消息中間件。所謂的發(fā)布訂閱就是說,consumer 訂閱了 broker 上的某個 topic,當 producer 發(fā)布消息到 broker 上的該 topic 時,consumer 就能收到該條消息。
之前我們講過 consumer group 的概念,即消費同一類消息的多個 consumer 實例組成一個消費者組,也可以稱為一個 consumer 集群,這些 consumer 實例使用同一個 group name。需要注意一點,除了使用同一個 group name,訂閱的 tag 也必須是一樣的,只有符合這兩個條件的 consumer 實例才能組成 consumer 集群。
1. 消費模式
1.1 集群消費
當 consumer 使用集群消費時,每條消息只會被 consumer 集群內(nèi)的任意一個 consumer 實例消費一次。舉個例子,當一個 consumer 集群內(nèi)有 3 個consumer 實例(假設為consumer 1、consumer 2、consumer 3)時,一條消息投遞過來,只會被consumer 1、consumer 2、consumer 3中的一個消費。
同時記住一點,使用集群消費的時候,consumer 的消費進度是存儲在 broker 上,consumer 自身是不存儲消費進度的。消息進度存儲在 broker 上的好處在于,當你 consumer 集群是擴大或者縮小時,由于消費進度統(tǒng)一在broker上,消息重復的概率會被大大降低了。
注意:在集群消費模式下,并不能保證每一次消息失敗重投都投遞到同一個 consumer 實例。
1.2 廣播消費
當 consumer 使用廣播消費時,每條消息都會被 consumer 集群內(nèi)所有的 consumer 實例消費一次,也就是說每條消息至少被每一個 consumer 實例消費一次。舉個例子,當一個 consumer 集群內(nèi)有 3 個 consumer 實例(假設為 consumer 1、consumer 2、consumer 3)時,一條消息投遞過來,會被 consumer 1、consumer 2、consumer 3都消費一次。
與集群消費不同的是,consumer 的消費進度是存儲在各個 consumer 實例上,這就容易造成消息重復。還有很重要的一點,對于廣播消費來說,是不會進行消費失敗重投的,所以在 consumer 端消費邏輯處理時,需要額外關(guān)注消費失敗的情況。
雖然廣播消費能保證集群內(nèi)每個 consumer 實例都能消費消息,但是消費進度的維護、不具備消息重投的機制大大影響了實際的使用。因此,在實際使用中,更推薦使用集群消費,因為集群消費不僅擁有消費進度存儲的可靠性,還具有消息重投的機制。而且,我們通過集群消費也可以達到廣播消費的效果。
1.3 使用集群消費模擬廣播消費
如果業(yè)務上確實需要使用廣播消費,那么我們可以通過創(chuàng)建多個 consumer 實例,每個 consumer 實例屬于不同的 consumer group,但是它們都訂閱同一個 topic。舉個例子,我們創(chuàng)建 3 個 consumer 實例,consumer 1(屬于consumer group 1)、consumer 2(屬于 consumer group 2)、consumer 3(屬于consumer group 3),它們都訂閱了 topic A ,那么當 producer 發(fā)送一條消息到 topic A 上時,由于 3 個consumer 屬于不同的 consumer group,所以 3 個consumer都能收到消息,也就達到了廣播消費的效果了。 除此之外,每個 consumer 實例的消費邏輯可以一樣也可以不一樣,每個consumer group還可以根據(jù)需要增加 consumer 實例,比起廣播消費來說更加靈活。
2. 消費策略
Consumer在拉取消息之前需要對TopicMessage進行負載操作,負載操作由一個定時器來完成單位,定時間隔默認20s
簡單來說就是將Topic下的MessageQueue分配給這些Consumer,至于怎么分,就是通過這些負載策略定義的算法規(guī)則來劃分。
2.1 AllocateMessageQueueAveragely
平均負載策略,RocketMQ默認使用的就是這種方式,如果某個Consumer集群,訂閱了某個Topic,Topic下面的這些MessageQueue會被平均分配給集群中的Consumer,為了幫助大家理解,我畫了個圖
我來講下這個圖代表的意思,假設topic為 testMsg,該testMsg下面有4個MessageQueue,然后這些Consumer組成了一個集群(都監(jiān)聽了該Topic并且消費groupId是一樣的),首先會給Consumer和MessageQueue進行排序,誰是老大,誰先拿MessageQueue, 平均分配分為兩種情況
2.1.1 MessageQueue數(shù)量大于Consumer數(shù)量
如果隊列數(shù)量不是消費者數(shù)量的整數(shù)倍,跟上圖中2個Consumer和4個Consumer的情況一樣,先分每個Consumer應得的數(shù)量,拿2個Consumer來舉個例子,C1 和C2 各自分到了2個MessageQueue,C1排序時在C2前面,所以C1先把 Q0 和Q1拿走,C2再拿2個,也就是Q2和Q3。
如果隊列數(shù)量不是消費者的整數(shù)倍,跟上圖3個Consumer和5個Consumer的情況一樣,5個Comsumer的比較特殊,我們過會再講,我們拿3個Consumer的情況來舉例,4個消息隊列,每個Consumer能分到1個,還剩下1個,弱肉強食嘛,剩下的當然給排在前面的大哥C1啦,最后分下來,C1分到2個隊列,C2和C3只分到一個,分完數(shù)量以后,也是按照順序來拿,C1拿到了Q0和Q1,然后C2就只能從Q2開始拿,C3只能拿剩下的Q3啦
2.1.2 MessageQueue數(shù)量小于Consumer數(shù)量
這種情況平均下來,每個人1個Consumer都分不到一個,也就是我們上圖中的5個Comsumer的情況老規(guī)矩,按照排序順序,每人先拿1個隊列,由于C5排在最后面,隊列全被別人拿走了,C5就一直分不到消息隊列,除非前面的某個Consumer掛了,20s之后,在隊列重新負載的時候就能拿到MessageQueue。
具體算法:
//consumer的排序后的 int index = cidAll.indexOf(currentCID); //取模 int mod = mqAll.size() % cidAll.size(); //如果隊列數(shù)小于消費者數(shù)量,則將分到隊列數(shù)設置為1,如果余數(shù)大于當前消費者的index,則 //能分到的隊列數(shù)+1,否則就是平均值 int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size()); //consumer獲取第一個MessageQueue的索引 int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; // 如果消費者大于隊列數(shù),rang會是負數(shù),循環(huán)也就不會執(zhí)行 int range = Math.min(averageSize, mqAll.size() - startIndex); for (int i = 0; i < range; i++) { result.add(mqAll.get((startIndex + i) % mqAll.size())); }
2.2 AllocateMessageQueueAveragelyByCircle
環(huán)形平均分配,這個和平均分配唯一的區(qū)別就是,再分隊列的時候,平均隊列是將屬于自己的MessageQueue全部拿走,而環(huán)形平均則是,一人拿一個,拿到的Queue不是連續(xù)的。我也畫了張圖來幫助大家理解
這種環(huán)形平均分配和平均分配,每個Consumer拿到的MessageQueue數(shù)量是不變的,我們就拿3個Consumer的情況舉個例子,
也是對Consumer和MessageQueue排序,先確定每個Consumer能拿到的MessageQueue數(shù)量,C1能分到2個,C2和C3只能分到1個
C1先拿1個,然后C2拿一個,C3拿一個,C1再拿一個。也就是圖上3個Consumer畫的這個情況。
另外,如果Consumer的數(shù)量大于消息隊列的數(shù)量,處理方式和平均分配時一樣的。
//當前consumer排序后的索引 int index = cidAll.indexOf(currentCID); //index會是consumer第一個拿到的消息隊列索引 for (int i = index; i < mqAll.size(); i++) { //這里采用了取模的方式 if (i % cidAll.size() == index) { result.add(mqAll.get(i)); } }
2.3 AllocateMessageQueueByConfig
用戶自定義配置,用戶在創(chuàng)建Consumer的時候,可以設置要使用的負載策略,如果我們設置為AllocateMessageQueueByConfig方式時,我們可以自己指定需要監(jiān)聽的MessageQueues,它維護了一個List messageQueueList,我們可以往這里面塞目標的MessageQueues,這個策略了解一下就行,用的不多,具體設置代碼如下:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroup"); consumer.setNamesrvAddr("127.0.0.1:9876"); //訂閱topic consumer.subscribe("testMsg","*"); //注冊消息監(jiān)聽 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { // do job return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //用戶自定義queue策略 AllocateMessageQueueByConfig allocateMessageQueueByConfig = new AllocateMessageQueueByConfig(); //指定MessageQueue allocateMessageQueueByConfig.setMessageQueueList(Arrays.asList(new MessageQueue("testMsg","broker-a",0))); //設置consumer的負載策略 consumer.setAllocateMessageQueueStrategy(allocateMessageQueueByConfig); //啟動consumer consumer.start();
2.4 AllocateMessageQueueByMachineRoom
? 機房負載策略,其實這個策略就是當前Consumer只負載處在指定的機房內(nèi)的MessageQueue,還有brokerName的命名必須要按要求的格式來設置: 機房名@brokerName
我們先看下具體的使用
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroup"); consumer.setNamesrvAddr("127.0.0.1:9876"); //訂閱topic consumer.subscribe("testMsg","*"); //注冊消息監(jiān)聽 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { // do job return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); AllocateMessageQueueByMachineRoom allocateMachineRoom = new AllocateMessageQueueByMachineRoom(); //指定機房名稱 machine_room1、machine_room2 allocateMachineRoom.setConsumeridcs(new HashSet<>(Arrays.asList("machine_room1","machine_room2"))); //設置consumer的負載策略 consumer.setAllocateMessageQueueStrategy(allocateMachineRoom); //啟動consumer consumer.start();
總結(jié)一下源碼的意思:
- 首先賽選出當前Topic處在指定機房的隊列
- 賽選出隊列后,按照平均負載策略進行具體的分配(算法極其相似)
哈哈,發(fā)現(xiàn)總結(jié)的真簡潔呀,別慌,來張圖給你潤潤喉
其實這個策略就是對MessageQueue進行了過濾,過濾完了以后,后續(xù)操作就按照平均負載策略來進行具體負載操作。這個算法和平均負載的算法得到的結(jié)果是一樣的,我感覺這兩個策略應該是兩個人寫的,不然不會寫兩套不同的算法來實現(xiàn)一個功能。也不知道是不是我自己太差了,理解不到大佬的思路。
2.5 AllocateMachineRoomNearby
這個策略我個人感覺是AllocateMessageQueueByMachineRoom的改進版本,因為這個策略的處理方式要比AllocateMessageQueueByMachineRoom更加靈活,還考慮到了那些同機房只有MessageQueue卻沒有Consumer的情況,下面我們來具體講這個策略。使用該策略需要自己定義一個類,來區(qū)分每個broker處于哪個機房,該策略RocketMQ有個測試單元,我稍微改造了一下,就是把這個類提出來了。
public class MyMachineResolver implements AllocateMachineRoomNearby.MachineRoomResolver { /** * 判斷當前broker處于哪個機房 * @param messageQueue * @return */ @Override public String brokerDeployIn(MessageQueue messageQueue) { return messageQueue.getBrokerName().split("-")[0]; } /** * 判斷consumer處于哪個機房 * @param clientID * @return */ @Override public String consumerDeployIn(String clientID) { return clientID.split("-")[0]; } }
我們從代碼中,可以看出來需要在設置brokerName和Consumer的Id的時候需要加上機房名稱,eg:hz_aliyun_room1-broker-a、hz_aliyun_root1-Client1。我們先看一下在代碼里面怎么使用這個同機房分配策略
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroup"); consumer.setNamesrvAddr("127.0.0.1:9876"); //訂閱topic consumer.subscribe("testMsg","*"); //注冊消息監(jiān)聽 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { // do job return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //用戶同機房分配策略 consumer.setAllocateMessageQueueStrategy(new AllocateMachineRoomNearby(new AllocateMessageQueueAveragely() ,new MyMachineResolver())); //啟動consumer consumer.start();
我們可以看到,我們創(chuàng)建這個同機房分配策略的時候,還加了一個平均分配的策略進去,它本身就是一個策略,為啥還要傳另一個策略。(該策略只會講MessageQueue和Consumer按機房進行分組,分組以后具體的負載,就是通過我們傳的另外一個負載策略來分配的)我們到源碼里面去看,后面會解釋到
//消息隊列按機房分組 Map<String/*machine room */, List<MessageQueue>> mr2Mq = new TreeMap<String, List<MessageQueue>>(); for (MessageQueue mq : mqAll) { //這里調(diào)用我們自己定義的類方法,得到broker的機房的名稱 String brokerMachineRoom = machineRoomResolver.brokerDeployIn(mq); //機房不為空,將broker放到分組中 if (StringUtils.isNoneEmpty(brokerMachineRoom)) { if (mr2Mq.get(brokerMachineRoom) == null) { mr2Mq.put(brokerMachineRoom, new ArrayList<MessageQueue>()); } mr2Mq.get(brokerMachineRoom).add(mq); } else { throw new IllegalArgumentException("Machine room is null for mq " + mq); } } //consumer按機房分組 Map<String/*machine room */, List<String/*clientId*/>> mr2c = new TreeMap<String, List<String>>(); for (String cid : cidAll) { //這里調(diào)用我們自己定義的類方法,得到broker的機房的名稱 String consumerMachineRoom = machineRoomResolver.consumerDeployIn(cid); if (StringUtils.isNoneEmpty(consumerMachineRoom)) { if (mr2c.get(consumerMachineRoom) == null) { mr2c.put(consumerMachineRoom, new ArrayList<String>()); } mr2c.get(consumerMachineRoom).add(cid); } else { throw new IllegalArgumentException("Machine room is null for consumer id " + cid); } } //當前consumer分到的所有MessageQueue List<MessageQueue> allocateResults = new ArrayList<MessageQueue>(); //1.給當前consumer分當前機房的那些MessageQeueue String currentMachineRoom = machineRoomResolver.consumerDeployIn(currentCID); //得到當前機房的MessageQueue List<MessageQueue> mqInThisMachineRoom = mr2Mq.remove(currentMachineRoom); //得到當前機房的Consumer List<String> consumerInThisMachineRoom = mr2c.get(currentMachineRoom); if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) { //得到當前機房所有MessageQueue和Consumers后根據(jù)指定的策略再負載 allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineRoom, consumerInThisMachineRoom)); } //2.如果該MessageQueue的機房 沒有同機房的consumer,將這些MessageQueue按配置好的備用策略分配給所有的consumer for (String machineRoom : mr2Mq.keySet()) { if (!mr2c.containsKey(machineRoom)) { //添加分配到的游離態(tài)MessageQueue allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mr2Mq.get(machineRoom), cidAll)); } }
總結(jié)一下源碼的意思
- 分別給MessageQueue和Consumer按機房分組
- 得到當前Consumer所在機房的所有Consumer和MessageQueue
- 通過設置的負載策略,再進行具體的負載,得到當前Consumer分到的MessageQueue
- 如果存在MessageQueue的某個機房中,沒有和MessageQueue同機房的Consumer,將這些MessageQueue按配置的負載策略分配給集群中所有的Consumer去負載
- 最終該Consumer分到的MessageQueue會包含同機房分配到的和部分游離態(tài)分配的
這里我也畫個圖來解釋一下,
先同機房的Consumer和MessageQueue進行負載,這里按照平均負載來分(我們創(chuàng)建機房就近策略使用的是平均負載),然后將游離態(tài)的通過設置的負載策略來分。
2.6 AllocateMessageQueueConsistentHash
一致性哈希策略,這里我簡單介紹一下一致性哈希,這里不好我先給個圖,我們再來解釋
一致性哈希有一個哈希環(huán)的概念,哈希環(huán)由數(shù)值 0到2^32-1 組成,不管內(nèi)容多長的字符,經(jīng)過哈希計算都能得到一個等長的數(shù)字,最后都會落在哈希環(huán)上的某個點,哈希環(huán)上的點都是虛擬的,比如我們這里使用Consumer的Id來進行哈希計算,得到的這幾個是物理的點,然后把得到的點存到TreeMap里面,然后將所有的MessageQueue依次進行同樣的哈希計算,得到距離MessageQueue順時針方向最近的那個Consumer點,這個就是MessageQeueu最終歸屬的那個Consumer。
我們看下源碼:
//將所有consumer變成節(jié)點 到時候經(jīng)過hash計算 分布在hash環(huán)上 Collection<ClientNode> cidNodes = new ArrayList<ClientNode>(); for (String cid : cidAll) { cidNodes.add(new ClientNode(cid)); } final ConsistentHashRouter<ClientNode> router; //構(gòu)建哈希環(huán) if (customHashFunction != null) { router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction); } else { //默認使用MD5進行Hash計算 router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt); } List<MessageQueue> results = new ArrayList<MessageQueue>(); for (MessageQueue mq : mqAll) { //對messageQueue進行hash計算,找到順時針最近的consumer節(jié)點 ClientNode clientNode = router.routeNode(mq.toString()); //判斷是否是當前consumer if (clientNode != null && currentCID.equals(clientNode.getKey())) { results.add(mq); } }
到此這篇關(guān)于RocketMQ中的消費模式和消費策略詳解的文章就介紹到這了,更多相關(guān)RocketMQ消費模式內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springboot數(shù)據(jù)訪問和數(shù)據(jù)視圖的使用方式詳解
這篇文章主要為大家介紹了springboot數(shù)據(jù)訪問和數(shù)據(jù)視圖的使用方式詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-06-06Java后臺基于POST獲取JSON格式數(shù)據(jù)
這篇文章主要介紹了Java后臺基于POST獲取JSON格式數(shù)據(jù),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-03-03SpringBoot如何整合mybatis-generator-maven-plugin 1.4.0
這篇文章主要介紹了SpringBoot整合mybatis-generator-maven-plugin 1.4.0的實現(xiàn)方法,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友參考下吧2023-01-01MyBatis環(huán)境資源配置實現(xiàn)代碼詳解
這篇文章主要介紹了MyBatis環(huán)境資源配置實現(xiàn)代碼解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-08-08SpringMVC 接收前端傳遞的參數(shù)四種方式小結(jié)
這篇文章主要介紹了SpringMVC 接收前端傳遞的參數(shù)四種方式小結(jié),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-10-10