欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

詳解Redis Stream做消息隊列

 更新時間:2022年09月23日 09:05:07   作者:在下uptown  
這篇文章主要介紹了詳解Redis Stream做消息隊列,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價值,需要的小伙伴可以參考一下

List

眾所周知redis數(shù)據(jù)結(jié)構(gòu)中的list的lpush與rpop可以用于常規(guī)消息隊列,從集合的最左端寫入,最右端彈出消費。并且支持多個生產(chǎn)者與多個消費者并發(fā)拿數(shù)據(jù),數(shù)據(jù)只能由一個消費者拿到。

但這個方案并不能保證消費者消費消息后是否成功處理的問題(服務(wù)掛掉或處理異常等),機制屬于點對點模式不能做廣播模式(發(fā)布/訂閱模式)

Pub/sub

于是redis提供了相應(yīng)的發(fā)布訂閱功能,為了解除點對點的強綁定模式引入了Channel管道。

當(dāng)生產(chǎn)者向管道中發(fā)布消息,訂閱了該管道的消費者能夠同時接收到該消息,而且為了簡化訂閱多個管道需要顯式關(guān)注多個名稱提供了pattern能力。

通過名稱匹配如果接收消息的頻道wmyskxz.chat,consumer3也會收到消息。

但這個方案也有很大的詬病就是不會持久化,如果服務(wù)掛掉重啟數(shù)據(jù)就全丟棄了,也沒有提供ack機制,不保證數(shù)據(jù)可靠性,不管有沒有消費成功發(fā)后既忘。

Stream

stream的話結(jié)構(gòu)很像kafka的設(shè)計思想,提供了consumer group和offset機制,結(jié)構(gòu)上感覺跟kafka的topic差不多,只是沒有對應(yīng)partation副本機制,而是一個追加消息的鏈表結(jié)構(gòu)??蛻舳苏{(diào)用XADD時候自動創(chuàng)建stream。每個消息都會持久化并存在唯一的id標(biāo)識

Consumer Group

消費者組的概念跟kafka的消費者概念如出一轍,消費者既可以用XREAD命令進(jìn)行獨立消費,也可以多個消費者同時加入一個消費者組。一條消息只能由一個消費者組中的一個消費者消費。這樣可以在分布式系統(tǒng)中保證消息的唯一性。

其實這個特性我后來仔細(xì)琢磨了一下當(dāng)時自認(rèn)為無懈可擊的流式圖表為了保證分布式系統(tǒng)消息唯一做了redis分布式鎖。有點雞肋,明明消費者組已經(jīng)保證了數(shù)據(jù)的唯一性。只能說加鎖可以壓縮資源成本

last_delivered_id

用于標(biāo)識消費者組消費在stream上消費位置的游標(biāo),每個消費者組都有一個stream內(nèi)唯一的名稱,消費者組不會自動創(chuàng)建,需要用XGROUP CREATE顯式創(chuàng)建。

pending_ids

每個消費者內(nèi)部都有一個狀態(tài)變量。用來表示已經(jīng)被客戶端消費但沒有ack的消費。目的是為了保證客戶端至少消費了消息一次(atleastonce)。如果消費者收到了消息處理完了但是沒有回復(fù)ack,就會導(dǎo)致列表不斷增長,如果有很多消費組的話,那么這個列表占用的內(nèi)存就會放大

curd

  • xadd 追加消息
  • xdel 刪除消息,這里的刪除僅僅是設(shè)置了標(biāo)志位,不影響消息總長度
  • xrange 獲取消息列表,會自動過濾已經(jīng)刪除的消息
  • xlen 消息長度
  • del 刪除Stream

pending_ids如何避免消息丟失

在客戶端消費者讀取Stream消息時,Redis服務(wù)器將消息回復(fù)給客戶端的過程中,客戶端突然斷開了連接,消息就丟失了。

但是pending_ids里已經(jīng)保存了發(fā)出去的消息ID。待客戶端重新連上之后,可以再次收到pending_ids中的消息ID列表。

不過此時xreadgroup的起始消息必須是任意有效的消息ID,一般將參數(shù)設(shè)為0-0,表示讀取所有的pending_ids消息以及自last_delivered_id之后的新消息。

嵌入SpringBoot

redis stream雖然還是有一些弊端,但是相比較而言用kafka之類的消息組件太重,redis用作消息隊列已經(jīng)很合適了。

這里簡單提一下思路,本質(zhì)上是提供一個管理消息的一個小功能,定義一個注解用于創(chuàng)建stream管道

創(chuàng)建一個注解類,標(biāo)注該注解的類必須繼承StreamListener<String, ObjectRecord<String, Object>>類且重寫onMessage方法。方法上也加這個注解

創(chuàng)建一個config類實現(xiàn)BeanPostProcessor接口,重寫bean聲明周期postProcessAfterInitializationpostProcessBeforeInitialization方法。該方法會在spring啟動流程里的refresh方法加載bean的聲明周期中掃描到所有加了注解的bean。

通過線程池挨個創(chuàng)建stream的group組與stream的consumer監(jiān)聽連接,config類記得繼承DisposableBean類在destroy方法里把連接關(guān)掉免得oom。

注冊redis stream api提供的consumer容器

這里一定注意pollTimeout參數(shù),看名字就知道默認(rèn)拉取數(shù)據(jù)時間間隔,這個參數(shù)如果寫的值很小或者寫0,你就看你cpu高不高就完了。

@Bean("listenerContainer")
@DependsOn(value = "redisConnectionFactory")
public StreamMessageListenerContainer<String, ObjectRecord<String, Object>> init() {
   StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Object>>
   options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
         .batchSize(10)
         .serializer(new StringRedisSerializer())
         .executor(new ForkJoinPool())
         .pollTimeout(Duration.ofSeconds(3))
         .targetType(Object.class)
         .build();
   return StreamMessageListenerContainer.create(redisConnectionFactory, options);
}

創(chuàng)建消費者

private Subscription createSubscription(RedisConnectionFactory factory, StreamListener streamListener, String streamKey, String group, String consumerName) {
   StreamOperations<String, String, Object> streamOperations = this.stringRedisTemplate.opsForStream();

   if (stringRedisTemplate.hasKey(streamKey)) {
      StreamInfo.XInfoGroups groups = streamOperations.groups(streamKey);

      AtomicReference<Boolean> groupHasKey = new AtomicReference<>(false);

      groups.forEach(groupInfo -> {
         if (Objects.equals(group, groupInfo.getRaw().get("name"))) {
            groupHasKey.set(true);
         }
      });

      if (groups.isEmpty() || !groupHasKey.get()) {
         creatGroup(streamKey, group);
      } else {
         groups.stream().forEach(g -> {
            log.info("XInfoGroups:{}", g);
            StreamInfo.XInfoConsumers consumers = streamOperations.consumers(streamKey, g.groupName());
            log.info("XInfoConsumers:{}", consumers);
         });
      }
   } else {
      creatGroup(streamKey, group);
   }
   StreamOffset<String> streamOffset = StreamOffset.create(streamKey, ReadOffset.lastConsumed());
   Consumer consumer = Consumer.from(group, consumerName);

   Subscription subscription = listenerContainer.receive(consumer, streamOffset, streamListener);
   listenerContainer.start();
   this.containerList.add(listenerContainer);
   return subscription;
}

到此這篇關(guān)于詳解Redis Stream做消息隊列的文章就介紹到這了,更多相關(guān)Redis Stream內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • redis事務(wù)如何解決超賣問題

    redis事務(wù)如何解決超賣問題

    使用Redis事務(wù)可以有效避免超賣問題,首先,通過MULTI命令開啟事務(wù),將需要執(zhí)行的多個命令加入到事務(wù)中,然后通過EXEC命令提交事務(wù),確保這些命令可以一次性、順序地執(zhí)行,在事務(wù)執(zhí)行期間,Redis服務(wù)器不會執(zhí)行其他客戶端的命令
    2024-11-11
  • 淺談Redis中bind的坑

    淺談Redis中bind的坑

    本文主要介紹了淺談Redis中bind的坑,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-07-07
  • 深入理解redis分布式鎖和消息隊列

    深入理解redis分布式鎖和消息隊列

    本篇文章主要介紹了深入理解redis分布式鎖和消息隊列,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-07-07
  • Redis中實現(xiàn)查找某個值的范圍

    Redis中實現(xiàn)查找某個值的范圍

    這篇文章主要介紹了Redis中實現(xiàn)查找某個值的范圍,本文的題引來了Redis作者Salvatore Sanfilippo(@antirez)的回答,比較經(jīng)典,需要的朋友可以參考下
    2015-06-06
  • redis中事務(wù)機制及樂觀鎖的實現(xiàn)

    redis中事務(wù)機制及樂觀鎖的實現(xiàn)

    這篇文章主要介紹了redis中事務(wù)機制及樂觀鎖的相關(guān)內(nèi)容,通過事務(wù)的執(zhí)行分析Redis樂觀鎖,具有一定參考價值,需要的朋友可以了解下。
    2017-10-10
  • Redis Template實現(xiàn)分布式鎖的實例代碼

    Redis Template實現(xiàn)分布式鎖的實例代碼

    使用Redis的SETNX命令獲取分布式鎖的步驟,接下來通過本文給大家介紹Redis Template實現(xiàn)分布式鎖的實例代碼,代碼簡單易懂,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧
    2018-09-09
  • 阿里云服務(wù)器安裝配置redis的方法并且加入到開機啟動(推薦)

    阿里云服務(wù)器安裝配置redis的方法并且加入到開機啟動(推薦)

    這篇文章主要介紹了阿里云服務(wù)器安裝配置redis并且加入到開機啟動,需要的朋友可以參考下
    2017-12-12
  • 使用Redis實現(xiàn)微信步數(shù)排行榜功能

    使用Redis實現(xiàn)微信步數(shù)排行榜功能

    這篇文章主要介紹了使用Redis實現(xiàn)微信步數(shù)排行榜功能,本文通過圖文實例代碼相結(jié)合給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-06-06
  • 一篇文章帶你弄清楚Redis的精髓

    一篇文章帶你弄清楚Redis的精髓

    Redis是一個開源的、支持網(wǎng)絡(luò)、基于內(nèi)存的鍵值對存儲系統(tǒng),它可以用作數(shù)據(jù)庫、緩存和消息中間件。它支持多種數(shù)據(jù)類型,包括字符串、散列、列表、集合、位圖等,擁有極快的讀寫速度,并且支持豐富的特性,如事務(wù)、持久化、復(fù)制、腳本、發(fā)布/訂閱等。
    2023-02-02
  • Redis主從同步配置的方法步驟(圖文)

    Redis主從同步配置的方法步驟(圖文)

    這篇文章主要介紹了Redis主從同步配置的方法步驟(圖文),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-02-02

最新評論