欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ的消費(fèi)者類型與最佳實(shí)踐詳解

 更新時(shí)間:2023年10月11日 10:50:00   作者:阿里云云原生  
這篇文章主要介紹了RocketMQ的消費(fèi)者類型與最佳實(shí)踐詳解,在?RocketMQ?5.0?中,更加強(qiáng)調(diào)了客戶端類型的概念,尤其是消費(fèi)者類型,為了滿足多樣的?RocketMQ?中一共有三種不同的消費(fèi)者類型,分別是?PushConsumer、SimpleConsumer?和?PullConsumer,需要的朋友可以參考下

消費(fèi)者類型概覽

在介紹不同的消息類型之前,先明確一下不同 RocketMQ 消費(fèi)者中的一個(gè)通用工作流程:在消費(fèi)者中,到達(dá)客戶端的消息都是由客戶端主動(dòng)向服務(wù)端請(qǐng)求并掛起長輪詢獲得的。

為了保證消息到達(dá)的及時(shí)性,客戶端需要不斷地向服務(wù)端發(fā)起請(qǐng)求(請(qǐng)求是否需要由客戶端主動(dòng)發(fā)起則與具體的客戶端類型有關(guān)),而新的符合條件的消息一旦到達(dá)服務(wù)端,就會(huì)客戶端請(qǐng)求走。最終根據(jù)客戶端處理的結(jié)果不同,服務(wù)端對(duì)消息的處理結(jié)果進(jìn)行記錄。

在這里插入圖片描述

另外 PushConsumer 和 SimpleConsumer 中還會(huì)有一個(gè) ConsumerGroup 的概念,ConsumerGroup 相當(dāng)于是一組相同訂閱關(guān)系的消費(fèi)者的共同身份標(biāo)識(shí)。而服務(wù)端也會(huì)根據(jù) ConsumerGroup 來記錄對(duì)應(yīng)的消費(fèi)進(jìn)度。同一個(gè) ConsumerGroup 下的消息消費(fèi)者將共同消費(fèi)符合當(dāng)前訂閱組要求的所有消息,而不是獨(dú)立進(jìn)行消費(fèi)。相比較于 PullConsumer,PushConsumer 和 SimpleConsumer 更加適用于業(yè)務(wù)集成的場(chǎng)景,由服務(wù)端來托管消費(fèi)狀態(tài)和進(jìn)度,相對(duì)來說更加的輕量與簡單。

簡單來說:

  • PushConsumer : 全托管的消費(fèi)者類型,用戶只需要注冊(cè)消息監(jiān)聽器即可,符合對(duì)應(yīng)訂閱關(guān)系的消息就會(huì)調(diào)用對(duì)應(yīng)的消費(fèi)方法,是與業(yè)務(wù)集成最為普遍的消費(fèi)者類型。
  • SimpleConsumer: 解耦消息消費(fèi)與進(jìn)度同步的消費(fèi)者類型,用戶自主接受來自服務(wù)端的消息,并對(duì)單條消息進(jìn)行消息確認(rèn)。和 PushConsumer 一樣也由服務(wù)端托管消費(fèi)進(jìn)度,適用于用戶需要自主控制消費(fèi)速率的業(yè)務(wù)場(chǎng)景。
  • PullConsumer: 使用流處理框架進(jìn)行管理的消費(fèi)者類型,用戶按照隊(duì)列(Topic 的最小邏輯組成單位)來進(jìn)行消息的接收并可以選擇自動(dòng)或者手動(dòng)提交消費(fèi)位點(diǎn)。

PushConsumer

PushConsumer 是 RocketMQ 目前使用最為廣泛的消費(fèi)者。用戶只需要確認(rèn)好訂閱關(guān)系之后,注冊(cè)相對(duì)應(yīng)的 Listener 即可。符合對(duì)應(yīng)訂閱關(guān)系的消息在由 Producer 發(fā)出后,消費(fèi)者的 Listener 接口也會(huì)被即時(shí)調(diào)用,那么此時(shí)用戶需要在 Listener 中去實(shí)現(xiàn)對(duì)應(yīng)的業(yè)務(wù)邏輯。

使用簡介

以下是 Push 消費(fèi)者的使用示例:

PushConsumer pushConsumer = provider.newPushConsumerBuilder()
 .setClientConfiguration(clientConfiguration)
    // set the consumer group name.
    .setConsumerGroup(consumerGroup)
    // set the subscription for the consumer.
    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
    .setMessageListener(messageView -> {
        // handle the received message and return consume result.
        LOGGER.info("consume message={}", messageView);
        return ConsumeResult.SUCCESS;
    })
    .build();
// block the main thread, no need for production environment.
Thread.sleep(Long.MAX_VALUE);
// close the push consumer when you don't need it anymore.
pushConsumer.close();

用戶需要根據(jù)自己業(yè)務(wù)處理結(jié)果的不同來返回 ConsumeResult.SUCCESS或者 ConsumeResult.FAILURE。當(dāng)用戶返回 ConsumeResult.SUCCESS時(shí),消息則被視為消費(fèi)成功;當(dāng)用戶返回 ConsumeResult.FAILURE時(shí),則服務(wù)端視為消費(fèi)失敗,會(huì)進(jìn)行該條消息的退避重試,消息的退避重試是指,在消息被消費(fèi)成功之前,當(dāng)前消息會(huì)被多次投遞到用戶注冊(cè)的 MessageListener 中直到消費(fèi)成功,而兩次消費(fèi)之間的時(shí)間間隔則是符合退避規(guī)律的。

特別的,每個(gè) ConsumerGroup 都會(huì)有一個(gè)最大消費(fèi)次數(shù)的設(shè)置,如果當(dāng)前消息的消費(fèi)次數(shù)超過了這個(gè)設(shè)置,則消息不會(huì)再被投遞,轉(zhuǎn)而被投遞進(jìn)入死信隊(duì)列。這個(gè)消費(fèi)次數(shù)在消息每次被投遞到 MessageListener 時(shí)都會(huì)進(jìn)行自增。譬如:如果消息的最大消費(fèi)次數(shù)為 1,那么無論對(duì)于這條消息,當(dāng)前是被返回消費(fèi)成功還是消費(fèi)失敗,都只會(huì)被消費(fèi)這一次。

應(yīng)用場(chǎng)景與最佳實(shí)踐

PushConsumer 是一種近乎全托管的消費(fèi)者,這里的托管的含義在于用戶本身并不需要關(guān)心消息的接收,而只需要關(guān)注消息的消費(fèi)過程,除此之外的所有邏輯都在 Push 消費(fèi)者的實(shí)現(xiàn)中封裝掉了,用戶只需要根據(jù)每條收到的消息返回不同的消費(fèi)結(jié)果即可,因此也是最為普適的消費(fèi)者類型。

MessageListener 是針對(duì)單條消息設(shè)計(jì)的監(jiān)聽器接口:

/**
* MessageListener is used only for the push consumer to process message consumption synchronously.
 *
 * <p> Refer to {@link PushConsumer}, push consumer will get message from server and dispatch the message to the
 * backend thread pool to consumer message concurrently.
 */
public interface MessageListener {
    /**
     * The callback interface to consume the message.
     *
     * <p>You should process the {@link MessageView} and return the corresponding {@link ConsumeResult}.
     * The consumption is successful only when {@link ConsumeResult#SUCCESS } is returned, null pointer is returned
     * or exception is thrown would cause message consumption failure too.
     */
    ConsumeResult consume(MessageView messageView);
}

絕大多數(shù)場(chǎng)景下,使用方應(yīng)該快速處理消費(fèi)邏輯并返回消費(fèi)成功,不宜長時(shí)間阻塞消費(fèi)邏輯。對(duì)于消費(fèi)邏輯比較重的情形,建議可以先行提交消費(fèi)狀態(tài),然后對(duì)消息進(jìn)行異步處理。

實(shí)際在 Push 消費(fèi)者的實(shí)現(xiàn)中,為了保證消息消費(fèi)的及時(shí)性,消息是會(huì)被預(yù)先拉取客戶端再進(jìn)行后續(xù)的消費(fèi)的,因此在客戶端中存在對(duì)已拉取消息大小的緩存。為了防止緩存的消息過多導(dǎo)致客戶端內(nèi)存泄漏,也提前預(yù)留了客戶端參數(shù)供使用者自行進(jìn)行設(shè)置。

// 設(shè)置本地最大緩存消息數(shù)目為 16 條
pushConsumer.setMaxCachedMessageCount(16);
// 設(shè)置本地最大緩存消息占用內(nèi)存大小為 128 MB
pushConsumer.setMaxCachedMessageSizeInBytes(128 * 1024 * 1024);

SimpleConsumer

相比較 PushConsumer,SimpleConsumer 則暴露了更多的細(xì)節(jié)給使用者。在 SimpleConsumer 中,用戶將自行控制消息的接收與處理。

使用簡介

以下是 SimpleConsumer 的使用示例:

SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
    .setClientConfiguration(clientConfiguration)
    // Set the consumer group name.
    .setConsumerGroup(consumerGroup)
    // set await duration for long-polling.
    .setAwaitDuration(awaitDuration)
    // Set the subscription for the consumer.
    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
    .build();
// Max message num for each long polling.
int maxMessageNum = 16;
// Set message invisible duration after it is received.
Duration invisibleDuration = Duration.ofSeconds(15);
final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
LOGGER.info("Received {} message(s)", messages.size());
for (MessageView message : messages) {
    final MessageId messageId = message.getMessageId();
    try {
        consumer.ack(message);
        LOGGER.info("Message is acknowledged successfully, messageId={}", messageId);
    } catch (Throwable t) {
        LOGGER.error("Message is failed to be acknowledged, messageId={}", messageId, t);
    }
}
// Close the simple consumer when you don't need it anymore.
consumer.close();

在 SimpleConsumer 中用戶需要自行進(jìn)行消息的拉取,這一動(dòng)作通過 SimpleConsumer#receive 這個(gè)接口進(jìn)行,然后再根據(jù)自己業(yè)務(wù)邏輯處理結(jié)果的不同再對(duì)拉取到的消息進(jìn)行不同的處理。SimpleConsumer#receive 也是通過長輪詢來接受來自服務(wù)端的消息,具體的長輪詢時(shí)間可以使用 SimpleConsumerBuilder#setAwaitDuration 來進(jìn)行設(shè)置。

在 SimpleConsumer 中,用戶需要通過 SimpleConsumer#receive 設(shè)置一個(gè)消息不重復(fù)的時(shí)間窗口(或者說關(guān)于通過這個(gè)接口收到的消息的一個(gè)不可見時(shí)間窗口),這個(gè)時(shí)間窗口從用戶接受到這條消息開始計(jì)時(shí),在這段時(shí)間之內(nèi)消息是不會(huì)重復(fù)投遞到消費(fèi)者的,而超出這個(gè)時(shí)間窗口之后,則會(huì)對(duì)這條消息進(jìn)行再一次的投遞。在這個(gè)過程中,消息的消費(fèi)次數(shù)也會(huì)進(jìn)行遞增。與 PushConsumer 類似的是,一旦消費(fèi)次數(shù)超出 ConsumerGroup 的最大次數(shù),也就不會(huì)進(jìn)行重投了。

在這里插入圖片描述

相比較于 PushConsumer 而言,SimpleConsumer 用戶可以自主控制接受消息的節(jié)奏。SimpleConsumer#receive 會(huì)針對(duì)于當(dāng)前的訂閱關(guān)系去服務(wù)端拉取符合條件的消息。SimpleConsumer 實(shí)際上的每次消息接收請(qǐng)求是按照具體 Topic 的分區(qū)來 one by one 發(fā)起請(qǐng)求的,實(shí)際的 Topic 分區(qū)可能會(huì)比較多,因此為了保證消息接收的及時(shí)性,建議綜合自己的業(yè)務(wù)處理能力一定程度上提高 SimpleConsumer#receive 的并發(fā)度。

用戶在接受到消息之后,可以選擇對(duì)消息使用 ack 或者 changeInvisibleDuration,前者即對(duì)服務(wù)端表示對(duì)這條消息的確認(rèn),與 PushConsumer 中的消費(fèi)成功類似,而 changeInvisibleDuration 則表示延遲當(dāng)前消息的可見時(shí)間,即需要服務(wù)端在當(dāng)前一段時(shí)間之后再向客戶端進(jìn)行投遞。值得注意的是,這里消息的再次投遞也是需要遵循 ConsumerGroup 的最大消費(fèi)次數(shù)的限制,即一旦消息的最大消費(fèi)次數(shù)超出了最大消費(fèi)次數(shù)(每次消息到達(dá)可見時(shí)間都會(huì)進(jìn)行消費(fèi)次數(shù)的自增),則不再進(jìn)行投遞,轉(zhuǎn)而進(jìn)入死信隊(duì)列。舉例來說:

  • 進(jìn)行 ack,即表示消息消費(fèi)成功被確認(rèn),消費(fèi)進(jìn)度被服務(wù)端同步。
  • 進(jìn)行 changeInvisibleDuration,

1)如果消息已經(jīng)超過當(dāng)前 ConsumerGroup 的最大消費(fèi)次數(shù),那么消息后續(xù)會(huì)被投遞進(jìn)入死信隊(duì)列

2)如果消息未超過當(dāng)前 ConsumerGroup 的最大消費(fèi)次數(shù),若請(qǐng)求在上一次消息可見時(shí)間到來之前發(fā)起,則修改成功,否則則修改失敗。

應(yīng)用場(chǎng)景與最佳實(shí)踐

在 PushConsumer 中,消息是單條地被投遞進(jìn)入 MessageListener來處理的,而在 SimpleConsumer 中用戶可以同時(shí)拿到一批消息,每批消息的最大條數(shù)也由 SimpleConsumer#receive 來決定。在一些 IO 密集型的應(yīng)用中,會(huì)是一個(gè)更加方便的選擇。此時(shí)用戶可以每次拿到一批消息并集中進(jìn)行處理從而提高消費(fèi)速度。

PullConsumer

PullConsumer 也是 RocketMQ 一直以來都支持的消費(fèi)者類型,RocketMQ 5.0 中全新的 PullConsumer API 還在演進(jìn)中,敬請(qǐng)期待。下文中的 PullConsumer 會(huì)使用 4.0 中現(xiàn)存的 LitePullConsumer 進(jìn)行論述,也是當(dāng)前推薦的方式。

使用簡介

現(xiàn)存的 LitePullConsumer 中的主要接口

// PullConsumer 中的主要接口
public interface LitePullConsumer {
 // 注冊(cè)路由變化監(jiān)聽器
void registerTopicMessageQueueChangeListener(String topic,
        TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException;
    // 將隊(duì)列 assign 給當(dāng)前消費(fèi)者
    void assign(Collection<MessageQueue> messageQueues);
    // 針對(duì)當(dāng)前 assigned 的隊(duì)列獲取消息
    List<MessageExt> poll(long timeout);
    // 查找當(dāng)前隊(duì)列在服務(wù)端提交的位點(diǎn)
    Long committed(MessageQueue messageQueue) throws MQClientException;
    // 設(shè)置是否自動(dòng)提交隊(duì)列位點(diǎn)
    void setAutoCommit(boolean autoCommit);
    // 同步提交隊(duì)列位點(diǎn)
    void commitSync();
}

在 RocketMQ 中,無論是消息的發(fā)送還是接收,都是通過隊(duì)列來進(jìn)行的,一個(gè) Topic 由若干個(gè)隊(duì)列組成,消息本身也是按照隊(duì)列的形式來一個(gè)個(gè)進(jìn)行存儲(chǔ)的,同一個(gè)隊(duì)列中的消息擁有不同的位點(diǎn),且位點(diǎn)的大小是隨隨消息達(dá)到服務(wù)端的時(shí)間逐次遞增的,本質(zhì)上不同 ConsumerGroup 在服務(wù)端的消費(fèi)進(jìn)度就是一個(gè)個(gè)隊(duì)列中的位點(diǎn)信息,客戶端將自己的消費(fèi)進(jìn)度同步給服務(wù)端本質(zhì)上其實(shí)就是在同步一個(gè)個(gè)消息的位點(diǎn)。

在這里插入圖片描述

在 PullConsumer 中將隊(duì)列這個(gè)概念完整地暴露給了用戶。用戶可以針對(duì)自己關(guān)心的 topic 設(shè)置路由監(jiān)聽器從而感知隊(duì)列的變化,并將隊(duì)列 assign 給當(dāng)前消費(fèi)者,當(dāng)用戶使用 LitePullConsumer#poll 時(shí)會(huì)嘗試獲取已經(jīng) assign 好了的隊(duì)列中的消息。如果設(shè)置了 LitePullConsumer#setAutoCommit 的話,一旦消息達(dá)到了客戶端就會(huì)自動(dòng)進(jìn)行位點(diǎn)的提交,否則則需要使用 LitePullConsumer#commitSync 接口來進(jìn)行手動(dòng)提交。

應(yīng)用場(chǎng)景與最佳實(shí)踐

PullConsumer 中用戶擁有對(duì)消息位點(diǎn)管理的絕對(duì)自主權(quán),可以自行管理消費(fèi)進(jìn)度,這是與 PushConsumer 和 SimpleConsumer 最為本質(zhì)的不同,這也使得 PullConsumer 在流計(jì)算這種需要同時(shí)自主控制消費(fèi)速率和消費(fèi)進(jìn)度的場(chǎng)景能得到非常廣泛的應(yīng)用。更多情況下,PullConsumer 是與具體的流計(jì)算框架進(jìn)行集成的。

到此這篇關(guān)于RocketMQ的消費(fèi)者類型與最佳實(shí)踐詳解的文章就介紹到這了,更多相關(guān)RocketMQ的消費(fèi)者類型內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Maven項(xiàng)目如何查找jar包是由哪個(gè)依賴引入的

    Maven項(xiàng)目如何查找jar包是由哪個(gè)依賴引入的

    這篇文章主要介紹了Maven項(xiàng)目如何查找jar包是由哪個(gè)依賴引入的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-08-08
  • spring boot admin 搭建詳解

    spring boot admin 搭建詳解

    本篇文章主要介紹了spring boot admin 搭建詳解,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2018-04-04
  • Java 8對(duì)LinkedList元素進(jìn)行排序的方法詳解

    Java 8對(duì)LinkedList元素進(jìn)行排序的方法詳解

    在Java中,LinkedList是一種基于鏈表的數(shù)據(jù)結(jié)構(gòu),與ArrayList相比,它在進(jìn)行插入和刪除操作時(shí)表現(xiàn)出更好的性能,然而,LinkedList的元素排序也是開發(fā)中常見的需求之一,本文介紹了Java8對(duì)LinkedList元素進(jìn)行排序的方法,需要的朋友可以參考下
    2024-11-11
  • JAVA基礎(chǔ)之注解與反射的使用方法和場(chǎng)景

    JAVA基礎(chǔ)之注解與反射的使用方法和場(chǎng)景

    這篇文章主要給大家介紹了關(guān)于JAVA基礎(chǔ)之注解與反射的使用方法和場(chǎng)景的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-03-03
  • dm.jdbc.driver.DMException網(wǎng)絡(luò)通信異常的解決過程

    dm.jdbc.driver.DMException網(wǎng)絡(luò)通信異常的解決過程

    最近一個(gè)項(xiàng)目里面出現(xiàn)了一個(gè)比較詭異的問題,給大家分享下,這篇文章主要給大家介紹了關(guān)于dm.jdbc.driver.DMException網(wǎng)絡(luò)通信異常的解決過程,需要的朋友可以參考下
    2023-02-02
  • SpringBoot如何實(shí)現(xiàn)一個(gè)實(shí)時(shí)更新的進(jìn)度條的示例代碼

    SpringBoot如何實(shí)現(xiàn)一個(gè)實(shí)時(shí)更新的進(jìn)度條的示例代碼

    本文詳細(xì)的介紹了SpringBoot如何實(shí)現(xiàn)一個(gè)實(shí)時(shí)更新的進(jìn)度條,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-05-05
  • Idea打War包流程圖文教程

    Idea打War包流程圖文教程

    這篇文章主要給大家介紹了關(guān)于Idea打War包流程的相關(guān)資料,IDEA導(dǎo)出war包的方式與MyEclipse有一點(diǎn)不同,文中通過圖文介紹的非常詳細(xì),需要的朋友可以參考下
    2023-08-08
  • Spring Boot 中的 @Field 注解的原理解析

    Spring Boot 中的 @Field 注解的原理解析

    本文詳細(xì)介紹了 Spring Boot 中的 @Field 注解的原理和使用方法,通過使用 @Field 注解,我們可以將 HTTP 請(qǐng)求中的參數(shù)值自動(dòng)綁定到 Java 對(duì)象的屬性上,簡化了開發(fā)過程,提高了開發(fā)效率,感興趣的朋友跟隨小編一起看看吧
    2023-07-07
  • SpringBoot解決循環(huán)調(diào)用問題

    SpringBoot解決循環(huán)調(diào)用問題

    作者在將SpringBoot從1.5版本升級(jí)至2.6版本,并遷移至阿里云上運(yùn)行后,遇到了循環(huán)調(diào)用問題,在Jetty容器中運(yùn)行沒問題,但在Tomcat容器中就出現(xiàn)了循環(huán)引用問題,原因是SpringBoot 2.6不鼓勵(lì)循環(huán)引用,暴露出該問題,作者提供了兩種解決思路
    2024-10-10
  • xxl-job 帶參數(shù)執(zhí)行和高可用部署方法

    xxl-job 帶參數(shù)執(zhí)行和高可用部署方法

    這篇文章主要介紹了xxl-job 帶參數(shù)執(zhí)行和高可用部署,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2023-04-04

最新評(píng)論