RocketMQ的消費(fèi)者類型與最佳實(shí)踐詳解
消費(fèi)者類型概覽
在介紹不同的消息類型之前,先明確一下不同 RocketMQ 消費(fèi)者中的一個(gè)通用工作流程:在消費(fèi)者中,到達(dá)客戶端的消息都是由客戶端主動(dòng)向服務(wù)端請(qǐng)求并掛起長輪詢獲得的。
為了保證消息到達(dá)的及時(shí)性,客戶端需要不斷地向服務(wù)端發(fā)起請(qǐng)求(請(qǐng)求是否需要由客戶端主動(dòng)發(fā)起則與具體的客戶端類型有關(guān)),而新的符合條件的消息一旦到達(dá)服務(wù)端,就會(huì)客戶端請(qǐng)求走。最終根據(jù)客戶端處理的結(jié)果不同,服務(wù)端對(duì)消息的處理結(jié)果進(jìn)行記錄。
另外 PushConsumer 和 SimpleConsumer 中還會(huì)有一個(gè) ConsumerGroup 的概念,ConsumerGroup 相當(dāng)于是一組相同訂閱關(guān)系的消費(fèi)者的共同身份標(biāo)識(shí)。而服務(wù)端也會(huì)根據(jù) ConsumerGroup 來記錄對(duì)應(yīng)的消費(fèi)進(jìn)度。同一個(gè) ConsumerGroup 下的消息消費(fèi)者將共同消費(fèi)符合當(dāng)前訂閱組要求的所有消息,而不是獨(dú)立進(jìn)行消費(fèi)。相比較于 PullConsumer,PushConsumer 和 SimpleConsumer 更加適用于業(yè)務(wù)集成的場(chǎng)景,由服務(wù)端來托管消費(fèi)狀態(tài)和進(jìn)度,相對(duì)來說更加的輕量與簡單。
簡單來說:
- PushConsumer : 全托管的消費(fèi)者類型,用戶只需要注冊(cè)消息監(jiān)聽器即可,符合對(duì)應(yīng)訂閱關(guān)系的消息就會(huì)調(diào)用對(duì)應(yīng)的消費(fèi)方法,是與業(yè)務(wù)集成最為普遍的消費(fèi)者類型。
- SimpleConsumer: 解耦消息消費(fèi)與進(jìn)度同步的消費(fèi)者類型,用戶自主接受來自服務(wù)端的消息,并對(duì)單條消息進(jìn)行消息確認(rèn)。和 PushConsumer 一樣也由服務(wù)端托管消費(fèi)進(jìn)度,適用于用戶需要自主控制消費(fèi)速率的業(yè)務(wù)場(chǎng)景。
- PullConsumer: 使用流處理框架進(jìn)行管理的消費(fèi)者類型,用戶按照隊(duì)列(Topic 的最小邏輯組成單位)來進(jìn)行消息的接收并可以選擇自動(dòng)或者手動(dòng)提交消費(fèi)位點(diǎn)。
PushConsumer
PushConsumer 是 RocketMQ 目前使用最為廣泛的消費(fèi)者。用戶只需要確認(rèn)好訂閱關(guān)系之后,注冊(cè)相對(duì)應(yīng)的 Listener 即可。符合對(duì)應(yīng)訂閱關(guān)系的消息在由 Producer 發(fā)出后,消費(fèi)者的 Listener 接口也會(huì)被即時(shí)調(diào)用,那么此時(shí)用戶需要在 Listener 中去實(shí)現(xiàn)對(duì)應(yīng)的業(yè)務(wù)邏輯。
使用簡介
以下是 Push 消費(fèi)者的使用示例:
PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) // set the consumer group name. .setConsumerGroup(consumerGroup) // set the subscription for the consumer. .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .setMessageListener(messageView -> { // handle the received message and return consume result. LOGGER.info("consume message={}", messageView); return ConsumeResult.SUCCESS; }) .build(); // block the main thread, no need for production environment. Thread.sleep(Long.MAX_VALUE); // close the push consumer when you don't need it anymore. pushConsumer.close();
用戶需要根據(jù)自己業(yè)務(wù)處理結(jié)果的不同來返回 ConsumeResult.SUCCESS或者 ConsumeResult.FAILURE。當(dāng)用戶返回 ConsumeResult.SUCCESS時(shí),消息則被視為消費(fèi)成功;當(dāng)用戶返回 ConsumeResult.FAILURE時(shí),則服務(wù)端視為消費(fèi)失敗,會(huì)進(jìn)行該條消息的退避重試,消息的退避重試是指,在消息被消費(fèi)成功之前,當(dāng)前消息會(huì)被多次投遞到用戶注冊(cè)的 MessageListener 中直到消費(fèi)成功,而兩次消費(fèi)之間的時(shí)間間隔則是符合退避規(guī)律的。
特別的,每個(gè) ConsumerGroup 都會(huì)有一個(gè)最大消費(fèi)次數(shù)的設(shè)置,如果當(dāng)前消息的消費(fèi)次數(shù)超過了這個(gè)設(shè)置,則消息不會(huì)再被投遞,轉(zhuǎn)而被投遞進(jìn)入死信隊(duì)列。這個(gè)消費(fèi)次數(shù)在消息每次被投遞到 MessageListener 時(shí)都會(huì)進(jìn)行自增。譬如:如果消息的最大消費(fèi)次數(shù)為 1,那么無論對(duì)于這條消息,當(dāng)前是被返回消費(fèi)成功還是消費(fèi)失敗,都只會(huì)被消費(fèi)這一次。
應(yīng)用場(chǎng)景與最佳實(shí)踐
PushConsumer 是一種近乎全托管的消費(fèi)者,這里的托管的含義在于用戶本身并不需要關(guān)心消息的接收,而只需要關(guān)注消息的消費(fèi)過程,除此之外的所有邏輯都在 Push 消費(fèi)者的實(shí)現(xiàn)中封裝掉了,用戶只需要根據(jù)每條收到的消息返回不同的消費(fèi)結(jié)果即可,因此也是最為普適的消費(fèi)者類型。
MessageListener 是針對(duì)單條消息設(shè)計(jì)的監(jiān)聽器接口:
/** * MessageListener is used only for the push consumer to process message consumption synchronously. * * <p> Refer to {@link PushConsumer}, push consumer will get message from server and dispatch the message to the * backend thread pool to consumer message concurrently. */ public interface MessageListener { /** * The callback interface to consume the message. * * <p>You should process the {@link MessageView} and return the corresponding {@link ConsumeResult}. * The consumption is successful only when {@link ConsumeResult#SUCCESS } is returned, null pointer is returned * or exception is thrown would cause message consumption failure too. */ ConsumeResult consume(MessageView messageView); }
絕大多數(shù)場(chǎng)景下,使用方應(yīng)該快速處理消費(fèi)邏輯并返回消費(fèi)成功,不宜長時(shí)間阻塞消費(fèi)邏輯。對(duì)于消費(fèi)邏輯比較重的情形,建議可以先行提交消費(fèi)狀態(tài),然后對(duì)消息進(jìn)行異步處理。
實(shí)際在 Push 消費(fèi)者的實(shí)現(xiàn)中,為了保證消息消費(fèi)的及時(shí)性,消息是會(huì)被預(yù)先拉取客戶端再進(jìn)行后續(xù)的消費(fèi)的,因此在客戶端中存在對(duì)已拉取消息大小的緩存。為了防止緩存的消息過多導(dǎo)致客戶端內(nèi)存泄漏,也提前預(yù)留了客戶端參數(shù)供使用者自行進(jìn)行設(shè)置。
// 設(shè)置本地最大緩存消息數(shù)目為 16 條 pushConsumer.setMaxCachedMessageCount(16); // 設(shè)置本地最大緩存消息占用內(nèi)存大小為 128 MB pushConsumer.setMaxCachedMessageSizeInBytes(128 * 1024 * 1024);
SimpleConsumer
相比較 PushConsumer,SimpleConsumer 則暴露了更多的細(xì)節(jié)給使用者。在 SimpleConsumer 中,用戶將自行控制消息的接收與處理。
使用簡介
以下是 SimpleConsumer 的使用示例:
SimpleConsumer consumer = provider.newSimpleConsumerBuilder() .setClientConfiguration(clientConfiguration) // Set the consumer group name. .setConsumerGroup(consumerGroup) // set await duration for long-polling. .setAwaitDuration(awaitDuration) // Set the subscription for the consumer. .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .build(); // Max message num for each long polling. int maxMessageNum = 16; // Set message invisible duration after it is received. Duration invisibleDuration = Duration.ofSeconds(15); final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration); LOGGER.info("Received {} message(s)", messages.size()); for (MessageView message : messages) { final MessageId messageId = message.getMessageId(); try { consumer.ack(message); LOGGER.info("Message is acknowledged successfully, messageId={}", messageId); } catch (Throwable t) { LOGGER.error("Message is failed to be acknowledged, messageId={}", messageId, t); } } // Close the simple consumer when you don't need it anymore. consumer.close();
在 SimpleConsumer 中用戶需要自行進(jìn)行消息的拉取,這一動(dòng)作通過 SimpleConsumer#receive 這個(gè)接口進(jìn)行,然后再根據(jù)自己業(yè)務(wù)邏輯處理結(jié)果的不同再對(duì)拉取到的消息進(jìn)行不同的處理。SimpleConsumer#receive 也是通過長輪詢來接受來自服務(wù)端的消息,具體的長輪詢時(shí)間可以使用 SimpleConsumerBuilder#setAwaitDuration 來進(jìn)行設(shè)置。
在 SimpleConsumer 中,用戶需要通過 SimpleConsumer#receive 設(shè)置一個(gè)消息不重復(fù)的時(shí)間窗口(或者說關(guān)于通過這個(gè)接口收到的消息的一個(gè)不可見時(shí)間窗口),這個(gè)時(shí)間窗口從用戶接受到這條消息開始計(jì)時(shí),在這段時(shí)間之內(nèi)消息是不會(huì)重復(fù)投遞到消費(fèi)者的,而超出這個(gè)時(shí)間窗口之后,則會(huì)對(duì)這條消息進(jìn)行再一次的投遞。在這個(gè)過程中,消息的消費(fèi)次數(shù)也會(huì)進(jìn)行遞增。與 PushConsumer 類似的是,一旦消費(fèi)次數(shù)超出 ConsumerGroup 的最大次數(shù),也就不會(huì)進(jìn)行重投了。
相比較于 PushConsumer 而言,SimpleConsumer 用戶可以自主控制接受消息的節(jié)奏。SimpleConsumer#receive 會(huì)針對(duì)于當(dāng)前的訂閱關(guān)系去服務(wù)端拉取符合條件的消息。SimpleConsumer 實(shí)際上的每次消息接收請(qǐng)求是按照具體 Topic 的分區(qū)來 one by one 發(fā)起請(qǐng)求的,實(shí)際的 Topic 分區(qū)可能會(huì)比較多,因此為了保證消息接收的及時(shí)性,建議綜合自己的業(yè)務(wù)處理能力一定程度上提高 SimpleConsumer#receive 的并發(fā)度。
用戶在接受到消息之后,可以選擇對(duì)消息使用 ack 或者 changeInvisibleDuration,前者即對(duì)服務(wù)端表示對(duì)這條消息的確認(rèn),與 PushConsumer 中的消費(fèi)成功類似,而 changeInvisibleDuration 則表示延遲當(dāng)前消息的可見時(shí)間,即需要服務(wù)端在當(dāng)前一段時(shí)間之后再向客戶端進(jìn)行投遞。值得注意的是,這里消息的再次投遞也是需要遵循 ConsumerGroup 的最大消費(fèi)次數(shù)的限制,即一旦消息的最大消費(fèi)次數(shù)超出了最大消費(fèi)次數(shù)(每次消息到達(dá)可見時(shí)間都會(huì)進(jìn)行消費(fèi)次數(shù)的自增),則不再進(jìn)行投遞,轉(zhuǎn)而進(jìn)入死信隊(duì)列。舉例來說:
- 進(jìn)行 ack,即表示消息消費(fèi)成功被確認(rèn),消費(fèi)進(jìn)度被服務(wù)端同步。
- 進(jìn)行 changeInvisibleDuration,
1)如果消息已經(jīng)超過當(dāng)前 ConsumerGroup 的最大消費(fèi)次數(shù),那么消息后續(xù)會(huì)被投遞進(jìn)入死信隊(duì)列
2)如果消息未超過當(dāng)前 ConsumerGroup 的最大消費(fèi)次數(shù),若請(qǐng)求在上一次消息可見時(shí)間到來之前發(fā)起,則修改成功,否則則修改失敗。
應(yīng)用場(chǎng)景與最佳實(shí)踐
在 PushConsumer 中,消息是單條地被投遞進(jìn)入 MessageListener來處理的,而在 SimpleConsumer 中用戶可以同時(shí)拿到一批消息,每批消息的最大條數(shù)也由 SimpleConsumer#receive 來決定。在一些 IO 密集型的應(yīng)用中,會(huì)是一個(gè)更加方便的選擇。此時(shí)用戶可以每次拿到一批消息并集中進(jìn)行處理從而提高消費(fèi)速度。
PullConsumer
PullConsumer 也是 RocketMQ 一直以來都支持的消費(fèi)者類型,RocketMQ 5.0 中全新的 PullConsumer API 還在演進(jìn)中,敬請(qǐng)期待。下文中的 PullConsumer 會(huì)使用 4.0 中現(xiàn)存的 LitePullConsumer 進(jìn)行論述,也是當(dāng)前推薦的方式。
使用簡介
現(xiàn)存的 LitePullConsumer 中的主要接口
// PullConsumer 中的主要接口 public interface LitePullConsumer { // 注冊(cè)路由變化監(jiān)聽器 void registerTopicMessageQueueChangeListener(String topic, TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException; // 將隊(duì)列 assign 給當(dāng)前消費(fèi)者 void assign(Collection<MessageQueue> messageQueues); // 針對(duì)當(dāng)前 assigned 的隊(duì)列獲取消息 List<MessageExt> poll(long timeout); // 查找當(dāng)前隊(duì)列在服務(wù)端提交的位點(diǎn) Long committed(MessageQueue messageQueue) throws MQClientException; // 設(shè)置是否自動(dòng)提交隊(duì)列位點(diǎn) void setAutoCommit(boolean autoCommit); // 同步提交隊(duì)列位點(diǎn) void commitSync(); }
在 RocketMQ 中,無論是消息的發(fā)送還是接收,都是通過隊(duì)列來進(jìn)行的,一個(gè) Topic 由若干個(gè)隊(duì)列組成,消息本身也是按照隊(duì)列的形式來一個(gè)個(gè)進(jìn)行存儲(chǔ)的,同一個(gè)隊(duì)列中的消息擁有不同的位點(diǎn),且位點(diǎn)的大小是隨隨消息達(dá)到服務(wù)端的時(shí)間逐次遞增的,本質(zhì)上不同 ConsumerGroup 在服務(wù)端的消費(fèi)進(jìn)度就是一個(gè)個(gè)隊(duì)列中的位點(diǎn)信息,客戶端將自己的消費(fèi)進(jìn)度同步給服務(wù)端本質(zhì)上其實(shí)就是在同步一個(gè)個(gè)消息的位點(diǎn)。
在 PullConsumer 中將隊(duì)列這個(gè)概念完整地暴露給了用戶。用戶可以針對(duì)自己關(guān)心的 topic 設(shè)置路由監(jiān)聽器從而感知隊(duì)列的變化,并將隊(duì)列 assign 給當(dāng)前消費(fèi)者,當(dāng)用戶使用 LitePullConsumer#poll 時(shí)會(huì)嘗試獲取已經(jīng) assign 好了的隊(duì)列中的消息。如果設(shè)置了 LitePullConsumer#setAutoCommit 的話,一旦消息達(dá)到了客戶端就會(huì)自動(dòng)進(jìn)行位點(diǎn)的提交,否則則需要使用 LitePullConsumer#commitSync 接口來進(jìn)行手動(dòng)提交。
應(yīng)用場(chǎng)景與最佳實(shí)踐
PullConsumer 中用戶擁有對(duì)消息位點(diǎn)管理的絕對(duì)自主權(quán),可以自行管理消費(fèi)進(jìn)度,這是與 PushConsumer 和 SimpleConsumer 最為本質(zhì)的不同,這也使得 PullConsumer 在流計(jì)算這種需要同時(shí)自主控制消費(fèi)速率和消費(fèi)進(jìn)度的場(chǎng)景能得到非常廣泛的應(yīng)用。更多情況下,PullConsumer 是與具體的流計(jì)算框架進(jìn)行集成的。
到此這篇關(guān)于RocketMQ的消費(fèi)者類型與最佳實(shí)踐詳解的文章就介紹到這了,更多相關(guān)RocketMQ的消費(fèi)者類型內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- 解決rocketmq-spring-boot-starter導(dǎo)致的多消費(fèi)者實(shí)例重復(fù)消費(fèi)問題
- RocketMQ中消費(fèi)者的消費(fèi)進(jìn)度管理
- RocketMQ中的消費(fèi)者啟動(dòng)流程解讀
- RocketMQ中消費(fèi)者概念和消費(fèi)流程詳解
- 詳解RocketMQ中的消費(fèi)者啟動(dòng)與消費(fèi)流程分析
- RocketMQ4.5.X 實(shí)現(xiàn)修改生產(chǎn)者消費(fèi)者日志保存路徑
- RocketMq同組消費(fèi)者如何自動(dòng)設(shè)置InstanceName
相關(guān)文章
Maven項(xiàng)目如何查找jar包是由哪個(gè)依賴引入的
這篇文章主要介紹了Maven項(xiàng)目如何查找jar包是由哪個(gè)依賴引入的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-08-08Java 8對(duì)LinkedList元素進(jìn)行排序的方法詳解
在Java中,LinkedList是一種基于鏈表的數(shù)據(jù)結(jié)構(gòu),與ArrayList相比,它在進(jìn)行插入和刪除操作時(shí)表現(xiàn)出更好的性能,然而,LinkedList的元素排序也是開發(fā)中常見的需求之一,本文介紹了Java8對(duì)LinkedList元素進(jìn)行排序的方法,需要的朋友可以參考下2024-11-11JAVA基礎(chǔ)之注解與反射的使用方法和場(chǎng)景
這篇文章主要給大家介紹了關(guān)于JAVA基礎(chǔ)之注解與反射的使用方法和場(chǎng)景的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-03-03dm.jdbc.driver.DMException網(wǎng)絡(luò)通信異常的解決過程
最近一個(gè)項(xiàng)目里面出現(xiàn)了一個(gè)比較詭異的問題,給大家分享下,這篇文章主要給大家介紹了關(guān)于dm.jdbc.driver.DMException網(wǎng)絡(luò)通信異常的解決過程,需要的朋友可以參考下2023-02-02SpringBoot如何實(shí)現(xiàn)一個(gè)實(shí)時(shí)更新的進(jìn)度條的示例代碼
本文詳細(xì)的介紹了SpringBoot如何實(shí)現(xiàn)一個(gè)實(shí)時(shí)更新的進(jìn)度條,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-05-05SpringBoot解決循環(huán)調(diào)用問題
作者在將SpringBoot從1.5版本升級(jí)至2.6版本,并遷移至阿里云上運(yùn)行后,遇到了循環(huán)調(diào)用問題,在Jetty容器中運(yùn)行沒問題,但在Tomcat容器中就出現(xiàn)了循環(huán)引用問題,原因是SpringBoot 2.6不鼓勵(lì)循環(huán)引用,暴露出該問題,作者提供了兩種解決思路2024-10-10xxl-job 帶參數(shù)執(zhí)行和高可用部署方法
這篇文章主要介紹了xxl-job 帶參數(shù)執(zhí)行和高可用部署,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-04-04