RocketMQ的消費者類型與最佳實踐詳解
消費者類型概覽
在介紹不同的消息類型之前,先明確一下不同 RocketMQ 消費者中的一個通用工作流程:在消費者中,到達客戶端的消息都是由客戶端主動向服務(wù)端請求并掛起長輪詢獲得的。
為了保證消息到達的及時性,客戶端需要不斷地向服務(wù)端發(fā)起請求(請求是否需要由客戶端主動發(fā)起則與具體的客戶端類型有關(guān)),而新的符合條件的消息一旦到達服務(wù)端,就會客戶端請求走。最終根據(jù)客戶端處理的結(jié)果不同,服務(wù)端對消息的處理結(jié)果進行記錄。
另外 PushConsumer 和 SimpleConsumer 中還會有一個 ConsumerGroup 的概念,ConsumerGroup 相當于是一組相同訂閱關(guān)系的消費者的共同身份標識。而服務(wù)端也會根據(jù) ConsumerGroup 來記錄對應(yīng)的消費進度。同一個 ConsumerGroup 下的消息消費者將共同消費符合當前訂閱組要求的所有消息,而不是獨立進行消費。相比較于 PullConsumer,PushConsumer 和 SimpleConsumer 更加適用于業(yè)務(wù)集成的場景,由服務(wù)端來托管消費狀態(tài)和進度,相對來說更加的輕量與簡單。
簡單來說:
- PushConsumer : 全托管的消費者類型,用戶只需要注冊消息監(jiān)聽器即可,符合對應(yīng)訂閱關(guān)系的消息就會調(diào)用對應(yīng)的消費方法,是與業(yè)務(wù)集成最為普遍的消費者類型。
- SimpleConsumer: 解耦消息消費與進度同步的消費者類型,用戶自主接受來自服務(wù)端的消息,并對單條消息進行消息確認。和 PushConsumer 一樣也由服務(wù)端托管消費進度,適用于用戶需要自主控制消費速率的業(yè)務(wù)場景。
- PullConsumer: 使用流處理框架進行管理的消費者類型,用戶按照隊列(Topic 的最小邏輯組成單位)來進行消息的接收并可以選擇自動或者手動提交消費位點。
PushConsumer
PushConsumer 是 RocketMQ 目前使用最為廣泛的消費者。用戶只需要確認好訂閱關(guān)系之后,注冊相對應(yīng)的 Listener 即可。符合對應(yīng)訂閱關(guān)系的消息在由 Producer 發(fā)出后,消費者的 Listener 接口也會被即時調(diào)用,那么此時用戶需要在 Listener 中去實現(xiàn)對應(yīng)的業(yè)務(wù)邏輯。
使用簡介
以下是 Push 消費者的使用示例:
PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) // set the consumer group name. .setConsumerGroup(consumerGroup) // set the subscription for the consumer. .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .setMessageListener(messageView -> { // handle the received message and return consume result. LOGGER.info("consume message={}", messageView); return ConsumeResult.SUCCESS; }) .build(); // block the main thread, no need for production environment. Thread.sleep(Long.MAX_VALUE); // close the push consumer when you don't need it anymore. pushConsumer.close();
用戶需要根據(jù)自己業(yè)務(wù)處理結(jié)果的不同來返回 ConsumeResult.SUCCESS或者 ConsumeResult.FAILURE。當用戶返回 ConsumeResult.SUCCESS時,消息則被視為消費成功;當用戶返回 ConsumeResult.FAILURE時,則服務(wù)端視為消費失敗,會進行該條消息的退避重試,消息的退避重試是指,在消息被消費成功之前,當前消息會被多次投遞到用戶注冊的 MessageListener 中直到消費成功,而兩次消費之間的時間間隔則是符合退避規(guī)律的。
特別的,每個 ConsumerGroup 都會有一個最大消費次數(shù)的設(shè)置,如果當前消息的消費次數(shù)超過了這個設(shè)置,則消息不會再被投遞,轉(zhuǎn)而被投遞進入死信隊列。這個消費次數(shù)在消息每次被投遞到 MessageListener 時都會進行自增。譬如:如果消息的最大消費次數(shù)為 1,那么無論對于這條消息,當前是被返回消費成功還是消費失敗,都只會被消費這一次。
應(yīng)用場景與最佳實踐
PushConsumer 是一種近乎全托管的消費者,這里的托管的含義在于用戶本身并不需要關(guān)心消息的接收,而只需要關(guān)注消息的消費過程,除此之外的所有邏輯都在 Push 消費者的實現(xiàn)中封裝掉了,用戶只需要根據(jù)每條收到的消息返回不同的消費結(jié)果即可,因此也是最為普適的消費者類型。
MessageListener 是針對單條消息設(shè)計的監(jiān)聽器接口:
/** * MessageListener is used only for the push consumer to process message consumption synchronously. * * <p> Refer to {@link PushConsumer}, push consumer will get message from server and dispatch the message to the * backend thread pool to consumer message concurrently. */ public interface MessageListener { /** * The callback interface to consume the message. * * <p>You should process the {@link MessageView} and return the corresponding {@link ConsumeResult}. * The consumption is successful only when {@link ConsumeResult#SUCCESS } is returned, null pointer is returned * or exception is thrown would cause message consumption failure too. */ ConsumeResult consume(MessageView messageView); }
絕大多數(shù)場景下,使用方應(yīng)該快速處理消費邏輯并返回消費成功,不宜長時間阻塞消費邏輯。對于消費邏輯比較重的情形,建議可以先行提交消費狀態(tài),然后對消息進行異步處理。
實際在 Push 消費者的實現(xiàn)中,為了保證消息消費的及時性,消息是會被預(yù)先拉取客戶端再進行后續(xù)的消費的,因此在客戶端中存在對已拉取消息大小的緩存。為了防止緩存的消息過多導(dǎo)致客戶端內(nèi)存泄漏,也提前預(yù)留了客戶端參數(shù)供使用者自行進行設(shè)置。
// 設(shè)置本地最大緩存消息數(shù)目為 16 條 pushConsumer.setMaxCachedMessageCount(16); // 設(shè)置本地最大緩存消息占用內(nèi)存大小為 128 MB pushConsumer.setMaxCachedMessageSizeInBytes(128 * 1024 * 1024);
SimpleConsumer
相比較 PushConsumer,SimpleConsumer 則暴露了更多的細節(jié)給使用者。在 SimpleConsumer 中,用戶將自行控制消息的接收與處理。
使用簡介
以下是 SimpleConsumer 的使用示例:
SimpleConsumer consumer = provider.newSimpleConsumerBuilder() .setClientConfiguration(clientConfiguration) // Set the consumer group name. .setConsumerGroup(consumerGroup) // set await duration for long-polling. .setAwaitDuration(awaitDuration) // Set the subscription for the consumer. .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .build(); // Max message num for each long polling. int maxMessageNum = 16; // Set message invisible duration after it is received. Duration invisibleDuration = Duration.ofSeconds(15); final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration); LOGGER.info("Received {} message(s)", messages.size()); for (MessageView message : messages) { final MessageId messageId = message.getMessageId(); try { consumer.ack(message); LOGGER.info("Message is acknowledged successfully, messageId={}", messageId); } catch (Throwable t) { LOGGER.error("Message is failed to be acknowledged, messageId={}", messageId, t); } } // Close the simple consumer when you don't need it anymore. consumer.close();
在 SimpleConsumer 中用戶需要自行進行消息的拉取,這一動作通過 SimpleConsumer#receive 這個接口進行,然后再根據(jù)自己業(yè)務(wù)邏輯處理結(jié)果的不同再對拉取到的消息進行不同的處理。SimpleConsumer#receive 也是通過長輪詢來接受來自服務(wù)端的消息,具體的長輪詢時間可以使用 SimpleConsumerBuilder#setAwaitDuration 來進行設(shè)置。
在 SimpleConsumer 中,用戶需要通過 SimpleConsumer#receive 設(shè)置一個消息不重復(fù)的時間窗口(或者說關(guān)于通過這個接口收到的消息的一個不可見時間窗口),這個時間窗口從用戶接受到這條消息開始計時,在這段時間之內(nèi)消息是不會重復(fù)投遞到消費者的,而超出這個時間窗口之后,則會對這條消息進行再一次的投遞。在這個過程中,消息的消費次數(shù)也會進行遞增。與 PushConsumer 類似的是,一旦消費次數(shù)超出 ConsumerGroup 的最大次數(shù),也就不會進行重投了。
相比較于 PushConsumer 而言,SimpleConsumer 用戶可以自主控制接受消息的節(jié)奏。SimpleConsumer#receive 會針對于當前的訂閱關(guān)系去服務(wù)端拉取符合條件的消息。SimpleConsumer 實際上的每次消息接收請求是按照具體 Topic 的分區(qū)來 one by one 發(fā)起請求的,實際的 Topic 分區(qū)可能會比較多,因此為了保證消息接收的及時性,建議綜合自己的業(yè)務(wù)處理能力一定程度上提高 SimpleConsumer#receive 的并發(fā)度。
用戶在接受到消息之后,可以選擇對消息使用 ack 或者 changeInvisibleDuration,前者即對服務(wù)端表示對這條消息的確認,與 PushConsumer 中的消費成功類似,而 changeInvisibleDuration 則表示延遲當前消息的可見時間,即需要服務(wù)端在當前一段時間之后再向客戶端進行投遞。值得注意的是,這里消息的再次投遞也是需要遵循 ConsumerGroup 的最大消費次數(shù)的限制,即一旦消息的最大消費次數(shù)超出了最大消費次數(shù)(每次消息到達可見時間都會進行消費次數(shù)的自增),則不再進行投遞,轉(zhuǎn)而進入死信隊列。舉例來說:
- 進行 ack,即表示消息消費成功被確認,消費進度被服務(wù)端同步。
- 進行 changeInvisibleDuration,
1)如果消息已經(jīng)超過當前 ConsumerGroup 的最大消費次數(shù),那么消息后續(xù)會被投遞進入死信隊列
2)如果消息未超過當前 ConsumerGroup 的最大消費次數(shù),若請求在上一次消息可見時間到來之前發(fā)起,則修改成功,否則則修改失敗。
應(yīng)用場景與最佳實踐
在 PushConsumer 中,消息是單條地被投遞進入 MessageListener來處理的,而在 SimpleConsumer 中用戶可以同時拿到一批消息,每批消息的最大條數(shù)也由 SimpleConsumer#receive 來決定。在一些 IO 密集型的應(yīng)用中,會是一個更加方便的選擇。此時用戶可以每次拿到一批消息并集中進行處理從而提高消費速度。
PullConsumer
PullConsumer 也是 RocketMQ 一直以來都支持的消費者類型,RocketMQ 5.0 中全新的 PullConsumer API 還在演進中,敬請期待。下文中的 PullConsumer 會使用 4.0 中現(xiàn)存的 LitePullConsumer 進行論述,也是當前推薦的方式。
使用簡介
現(xiàn)存的 LitePullConsumer 中的主要接口
// PullConsumer 中的主要接口 public interface LitePullConsumer { // 注冊路由變化監(jiān)聽器 void registerTopicMessageQueueChangeListener(String topic, TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException; // 將隊列 assign 給當前消費者 void assign(Collection<MessageQueue> messageQueues); // 針對當前 assigned 的隊列獲取消息 List<MessageExt> poll(long timeout); // 查找當前隊列在服務(wù)端提交的位點 Long committed(MessageQueue messageQueue) throws MQClientException; // 設(shè)置是否自動提交隊列位點 void setAutoCommit(boolean autoCommit); // 同步提交隊列位點 void commitSync(); }
在 RocketMQ 中,無論是消息的發(fā)送還是接收,都是通過隊列來進行的,一個 Topic 由若干個隊列組成,消息本身也是按照隊列的形式來一個個進行存儲的,同一個隊列中的消息擁有不同的位點,且位點的大小是隨隨消息達到服務(wù)端的時間逐次遞增的,本質(zhì)上不同 ConsumerGroup 在服務(wù)端的消費進度就是一個個隊列中的位點信息,客戶端將自己的消費進度同步給服務(wù)端本質(zhì)上其實就是在同步一個個消息的位點。
在 PullConsumer 中將隊列這個概念完整地暴露給了用戶。用戶可以針對自己關(guān)心的 topic 設(shè)置路由監(jiān)聽器從而感知隊列的變化,并將隊列 assign 給當前消費者,當用戶使用 LitePullConsumer#poll 時會嘗試獲取已經(jīng) assign 好了的隊列中的消息。如果設(shè)置了 LitePullConsumer#setAutoCommit 的話,一旦消息達到了客戶端就會自動進行位點的提交,否則則需要使用 LitePullConsumer#commitSync 接口來進行手動提交。
應(yīng)用場景與最佳實踐
PullConsumer 中用戶擁有對消息位點管理的絕對自主權(quán),可以自行管理消費進度,這是與 PushConsumer 和 SimpleConsumer 最為本質(zhì)的不同,這也使得 PullConsumer 在流計算這種需要同時自主控制消費速率和消費進度的場景能得到非常廣泛的應(yīng)用。更多情況下,PullConsumer 是與具體的流計算框架進行集成的。
到此這篇關(guān)于RocketMQ的消費者類型與最佳實踐詳解的文章就介紹到這了,更多相關(guān)RocketMQ的消費者類型內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
dm.jdbc.driver.DMException網(wǎng)絡(luò)通信異常的解決過程
最近一個項目里面出現(xiàn)了一個比較詭異的問題,給大家分享下,這篇文章主要給大家介紹了關(guān)于dm.jdbc.driver.DMException網(wǎng)絡(luò)通信異常的解決過程,需要的朋友可以參考下2023-02-02SpringBoot如何實現(xiàn)一個實時更新的進度條的示例代碼
本文詳細的介紹了SpringBoot如何實現(xiàn)一個實時更新的進度條,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-05-05SpringBoot解決循環(huán)調(diào)用問題
作者在將SpringBoot從1.5版本升級至2.6版本,并遷移至阿里云上運行后,遇到了循環(huán)調(diào)用問題,在Jetty容器中運行沒問題,但在Tomcat容器中就出現(xiàn)了循環(huán)引用問題,原因是SpringBoot 2.6不鼓勵循環(huán)引用,暴露出該問題,作者提供了兩種解決思路2024-10-10xxl-job 帶參數(shù)執(zhí)行和高可用部署方法
這篇文章主要介紹了xxl-job 帶參數(shù)執(zhí)行和高可用部署,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-04-04