欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

redis stream 實(shí)現(xiàn)消息隊(duì)列的實(shí)踐

 更新時(shí)間:2022年08月10日 11:43:07   作者:愛(ài)碼猿  
本文主要介紹了redis stream 實(shí)現(xiàn)消息隊(duì)列的實(shí)踐,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧

Redis5.0帶來(lái)了Stream類(lèi)型。從字面上看是流類(lèi)型,但其實(shí)從功能上看,應(yīng)該是Redis對(duì)消息隊(duì)列(MQ,Message Queue)的完善實(shí)現(xiàn)。

基于redis實(shí)現(xiàn)消息隊(duì)列的方式有很多:

  • PUB/SUB,訂閱/發(fā)布模式
  • 基于List的 LPUSH+BRPOP 的實(shí)現(xiàn)

redis 實(shí)現(xiàn)消息對(duì)列4中方法

發(fā)布訂閱

發(fā)布訂閱優(yōu)點(diǎn): 典型的一對(duì)的,所有消費(fèi)者都能同時(shí)消費(fèi)到消息。主動(dòng)通知訂閱者而不是訂閱者輪詢(xún)?nèi)プx。

發(fā)布訂閱缺點(diǎn): 不支持多個(gè)消費(fèi)者公平消費(fèi)消息,消息沒(méi)有持久化,不管訂閱者是否收到消息,消息都會(huì)丟失。

使用場(chǎng)景:微服務(wù)間的消息同步,如 分布式webSocker,數(shù)據(jù)同步等。

list 隊(duì)列

生產(chǎn)者通過(guò)lpush生成消息,消費(fèi)者通過(guò)blpop阻塞讀取消息。

**list隊(duì)列優(yōu)點(diǎn):**支持多個(gè)消費(fèi)者公平消費(fèi)消息,對(duì)消息進(jìn)行存儲(chǔ),可以通過(guò)lrange查詢(xún)隊(duì)列內(nèi)的消息。

**list隊(duì)列缺點(diǎn):**blpop仍然會(huì)阻塞當(dāng)前連接,導(dǎo)致連接不可用。一旦blpop成功消息就丟棄了,期間如果服務(wù)器宕機(jī)消息會(huì)丟失,不支持一對(duì)多消費(fèi)者。

zset 隊(duì)列

生產(chǎn)者通過(guò)zadd 創(chuàng)建消息時(shí)指定分?jǐn)?shù),可以確定消息的順序,消費(fèi)者通過(guò)zrange獲取消息后進(jìn)行消費(fèi),消費(fèi)完后通zrem刪除消息。

zset優(yōu)點(diǎn): 保證了消息的順序,消費(fèi)者消費(fèi)失敗后重新入隊(duì)不會(huì)打亂消費(fèi)順序。

zset缺點(diǎn): 不支持一對(duì)多消費(fèi),多個(gè)消費(fèi)者消費(fèi)時(shí)可能出現(xiàn)讀取同一條消息的情況,得通過(guò)加鎖或其他方式解決消費(fèi)的冪等性。

zset使用場(chǎng)景:由于數(shù)據(jù)是有序的,常常被用于延遲隊(duì)列,如 redisson的DelayQueue

Stream 隊(duì)列

Redis5.0帶來(lái)了Stream類(lèi)型。從字面上看是流類(lèi)型,但其實(shí)從功能上看,應(yīng)該是Redis對(duì)消息隊(duì)列(MQ,Message Queue)的完善實(shí)現(xiàn)。

參考kafka的思想,通過(guò)多個(gè)消費(fèi)者組和消費(fèi)者支持一對(duì)多消費(fèi),公平消費(fèi),消費(fèi)者內(nèi)維護(hù)了pending列表防止消息丟失。

提供消息ack機(jī)制。

基本命令

xadd 生產(chǎn)消息

往 stream 內(nèi)創(chuàng)建消息 語(yǔ)法為:

XADD key ID field string [field string …]

# * 表示自動(dòng)生成id redis會(huì)根據(jù)時(shí)間戳+序列號(hào)自動(dòng)生成id,不建議我們自己指定id
xadd stream1 * name zs age 23  

讀取消息

讀取stream內(nèi)的消息,這個(gè)并不是消費(fèi),只是提供了查看數(shù)據(jù)的功能,語(yǔ)法為:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]

#表示從 stream1 內(nèi)取出一條消息,從第0條消息讀取(0表示最小的id)
xread count 1 streams stream1 0
#表示從 stream1 內(nèi) id=1649143363972-0 開(kāi)始讀取一條消息,讀取的是指定id的下一條消息
xread count 1 streams msg 1649143363972-0

#表示一直阻塞讀取最新的消息($表示獲取下一個(gè)生成的消息)
xread count 1 block 0 streams stream1 $ 

xrange stream - + 10

XRANGE key startID endID count

#表示從stream1內(nèi)取10條消息 起始位置為 -(最小ID) 結(jié)束位置為+(最大ID)
xrange stream1 - + 10 

xgroup 消費(fèi)者組

redis stream 借鑒了kafka的設(shè)計(jì),采用了消費(fèi)者和消費(fèi)者組的概念。允許多個(gè)消費(fèi)者組消費(fèi)stream的消息,每個(gè)消費(fèi)者組都能收到完整的消息,例如:stream內(nèi)有10條消息,消費(fèi)者組A和消費(fèi)者組B同時(shí)消費(fèi)時(shí),都能獲取到這10條消息。

每個(gè)消費(fèi)者組內(nèi)可以有多個(gè)消費(fèi)者消費(fèi),消息會(huì)平均分?jǐn)偨o各個(gè)消費(fèi)者,例如:stream有10條消息,消費(fèi)者A,B,C同時(shí)在同一個(gè)組內(nèi)消費(fèi),A接收到 1,4,7,10,B接收到 2,5,8,C接收到 3,6,9

創(chuàng)建消費(fèi)者組:

#消費(fèi)消息首先得創(chuàng)建消費(fèi)者組
# 表示為隊(duì)列 stream1 創(chuàng)建一個(gè)消費(fèi)者組 group1 從消息id=0(第一條消息)開(kāi)始讀取消息
xgroup create stream1 group1 0

#查詢(xún)stream1內(nèi)的所有消費(fèi)者組信息
xinfo groups stream1

xreadgroup 消費(fèi)消息

通過(guò)xreadgroup可以在消費(fèi)者組內(nèi)創(chuàng)建消費(fèi)者消費(fèi)消息

XREADGROUP group groupName consumerName [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]

#創(chuàng)建消費(fèi)者讀取消息
#在group1消費(fèi)者組內(nèi)通過(guò)consumer1消費(fèi)stream1內(nèi)的消息,消費(fèi)1條未分配的消息 (> 表示未分配過(guò)消費(fèi)者的消息)
xreadgrup group group1 consumer1 count 1 streams stream1 > 

Pending 等待列表

通過(guò) xreadgroup 讀取消息時(shí)消息會(huì)分配給對(duì)應(yīng)的消費(fèi)者,每個(gè)消費(fèi)者內(nèi)都維護(hù)了一個(gè)Pending列表用于保存接收到的消息,當(dāng)消息ack后會(huì)從pending列表內(nèi)移除,也就是說(shuō)pending列表內(nèi)維護(hù)的是所有未ack的消息id

每個(gè)Pending的消息有4個(gè)屬性:

  • 消息ID
  • 所屬消費(fèi)者
  • IDLE,已讀取時(shí)長(zhǎng)
  • delivery counter,消息被讀取次數(shù)

XPENDING key group [start end count] [consumer]

#查看pending列表
# 查看group1組內(nèi)的consumer1的pending列表 - 表示最小的消息id + 表示最大的消息ID
xpending stream1 group1 - + 10 consumer1
# 查看group1組內(nèi)的所有消費(fèi)者pending類(lèi)表
xpending stream1 group1 - + 10 

消息確認(rèn)

當(dāng)消費(fèi)者消費(fèi)了消息,需要通過(guò) xack 命令確認(rèn)消息,xack后的消息會(huì)從pending列表移除

XACK key gruopName ID

xack stream1 group1 xxx

消息轉(zhuǎn)移

當(dāng)消費(fèi)者接收到消息卻不能正確消費(fèi)時(shí)(報(bào)錯(cuò)或其他原因),可以使用 XCLAIM 將消息轉(zhuǎn)移給其他消費(fèi)者消費(fèi),需要設(shè)置組、轉(zhuǎn)移的目標(biāo)消費(fèi)者和消息ID,同時(shí)需要提供IDLE(已被讀取時(shí)長(zhǎng)),只有超過(guò)這個(gè)時(shí)長(zhǎng),才能被轉(zhuǎn)移。

通過(guò)xclaim轉(zhuǎn)移的消息只是將消息移入另一個(gè)消費(fèi)者的pending列表,消費(fèi)者并不能通過(guò)xreadgroup讀取到消息,只能通過(guò)xpending讀取到。

# 表示將ID為 1553585533795-1 的消息轉(zhuǎn)移到消費(fèi)者B消費(fèi),前提是消費(fèi)
XCLAIM stream1 group1 consumer1 3600000 1553585533795-1

信息監(jiān)控

redis提供了xinfo來(lái)查看stream的信息

#查看sream信息
xinfo stream steam1
#查詢(xún)消費(fèi)者組信息
xinfo groups group1 

#查詢(xún)消費(fèi)者信息
xinfo consumers consumer1

SpringBoot 整合

1 引入依賴(lài)

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2 編寫(xiě)消費(fèi)者

@Slf4j
@Component
public class EmailConsumer implements StreamListener<String, MapRecord<String,String,String>> {

    public final String streamName      = "emailStream";
    public final String groupName       = "emailGroup";
    public final String consumerName    = "emailConsumer";


    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public void onMessage(MapRecord<String, String, String> message) {

        //log.info("stream名稱(chēng)-->{}",message.getStream());
        //log.info("消息ID-->{}",message.getId());
        log.info("消息內(nèi)容-->{}",message.getValue());

        Map<String, String> msgMap = message.getValue();

        if( msgMap.get("sID")!=null && Integer.valueOf(msgMap.get("sID")) % 3 ==0 ){
            //消費(fèi)異常導(dǎo)致未能ack時(shí),消息會(huì)進(jìn)入pending列表,我們可以啟動(dòng)定時(shí)任務(wù)來(lái)讀取pending列表處理失敗的任務(wù)
            log.info("消費(fèi)異常-->"+message);
           return;
        }

        StreamOperations<String, String, String> streamOperations = stringRedisTemplate.opsForStream();
        //消息應(yīng)答
        streamOperations.acknowledge( streamName,groupName,message.getId() );

    }
	//我們可以啟動(dòng)定時(shí)任務(wù)不斷監(jiān)聽(tīng)pending列表,處理死信消息
}

3 配置redis

序列化配置

@EnableCaching
@Configuration
public class RedisConfig {


    /**
     * 設(shè)置redis序列化規(guī)則
     */
    @Bean
    public Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer(){
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);

        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        jackson2JsonRedisSerializer.setObjectMapper(om);

        return jackson2JsonRedisSerializer;
    }

    /**
     * RedisTemplate配置
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory,
                                                       Jackson2JsonRedisSerializer jackson2JsonRedisSerializer) {

        // 配置redisTemplate
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        RedisSerializer<?> stringSerializer = new StringRedisSerializer();

        // key序列化
        redisTemplate.setKeySerializer(stringSerializer);
        // value序列化
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);

        // Hash key序列化
        redisTemplate.setHashKeySerializer(stringSerializer);
        // Hash value序列化
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);

        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

}

消費(fèi)者組和消費(fèi)者配置

@Slf4j
@Configuration
public class RedisStreamConfig {

    @Autowired
    private EmailConsumer emailConsumer;

    @Autowired
    private RedisTemplate<String,Object> redisTemplate;

    @Bean
    public StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String,String,String>> emailListenerContainerOptions(){

        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

        return StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                .builder()
                //block讀取超時(shí)時(shí)間
                .pollTimeout(Duration.ofSeconds(3))
                //count 數(shù)量(一次只獲取一條消息)
                .batchSize(1)
                //序列化規(guī)則
                .serializer( stringRedisSerializer )
                .build();
    }

    /**
     * 開(kāi)啟監(jiān)聽(tīng)器接收消息
     */
    @Bean
    public StreamMessageListenerContainer<String,MapRecord<String,String,String>> emailListenerContainer(RedisConnectionFactory factory,
                                                                 StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String,String,String>> streamMessageListenerContainerOptions){

        StreamMessageListenerContainer<String,MapRecord<String,String,String>> listenerContainer = StreamMessageListenerContainer.create(factory,
                streamMessageListenerContainerOptions);

        //如果 流不存在 創(chuàng)建 stream 流
        if( !redisTemplate.hasKey(emailConsumer.streamName)){
            redisTemplate.opsForStream().add(emailConsumer.streamName, Collections.singletonMap("", ""));
            log.info("初始化stream {} success",emailConsumer.streamName);
        }

        //創(chuàng)建消費(fèi)者組
        try {
            redisTemplate.opsForStream().createGroup(emailConsumer.streamName,emailConsumer.groupName);
        } catch (Exception e) {
            log.info("消費(fèi)者組 {} 已存在",emailConsumer.groupName);
        }

        //注冊(cè)消費(fèi)者 消費(fèi)者名稱(chēng),從哪條消息開(kāi)始消費(fèi),消費(fèi)者類(lèi)
        // > 表示沒(méi)消費(fèi)過(guò)的消息
        // $ 表示最新的消息
        listenerContainer.receive(
            Consumer.from(emailConsumer.groupName, emailConsumer.consumerName),
            StreamOffset.create(emailConsumer.streamName, ReadOffset.lastConsumed()),
            emailConsumer
        );


        listenerContainer.start();
        return listenerContainer;
    }

}

4.生產(chǎn)者生產(chǎn)消息

@GetMapping("/redis/ps")
public String redisPublish(String content,Integer count){

    StreamOperations streamOperations = redisTemplate.opsForStream();

    for (int i = 0; i < count; i++) {
        AtomicInteger num = new AtomicInteger(i);

        Map msgMap = new HashMap();
        msgMap.put("count", i);
        msgMap.put("sID", num);
        //新增消息
        streamOperations.add("emailStream",msgMap);
    }
    return "success";
}

參考文檔:

redis Stream 消息隊(duì)列

SpringBoot整合redis stream 實(shí)現(xiàn)消息隊(duì)列

到此這篇關(guān)于redis stream 實(shí)現(xiàn)消息隊(duì)列的實(shí)踐的文章就介紹到這了,更多相關(guān)redis stream 消息隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Redis 中的布隆過(guò)濾器的實(shí)現(xiàn)

    Redis 中的布隆過(guò)濾器的實(shí)現(xiàn)

    這篇文章主要介紹了Redis 中的布隆過(guò)濾器的實(shí)現(xiàn),詳細(xì)的介紹了什么是布隆過(guò)濾器以及如何實(shí)現(xiàn),非常具有實(shí)用價(jià)值,需要的朋友可以參考下
    2018-10-10
  • Redis高效檢索地理位置的原理解析

    Redis高效檢索地理位置的原理解析

    這篇文章主要介紹了Redis是如何高效檢索地理位置,通過(guò)geo相關(guān)的命令,可以很容易在redis中存儲(chǔ)和使用經(jīng)緯度坐標(biāo)信息,具體實(shí)現(xiàn)方法跟隨小編一起看看吧
    2021-06-06
  • 一次關(guān)于Redis內(nèi)存詭異增長(zhǎng)的排查過(guò)程實(shí)戰(zhàn)記錄

    一次關(guān)于Redis內(nèi)存詭異增長(zhǎng)的排查過(guò)程實(shí)戰(zhàn)記錄

    這篇文章主要給大家分享了一次關(guān)于Redis內(nèi)存詭異增長(zhǎng)的排查過(guò)程實(shí)戰(zhàn)記錄,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用Redis具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2018-07-07
  • redis中opsForList().range()的使用方法詳解

    redis中opsForList().range()的使用方法詳解

    這篇文章主要給大家介紹了關(guān)于redis中opsForList().range()的使用方法,文中通過(guò)實(shí)例代碼以及圖文介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用redis具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2023-03-03
  • Redis高可用集群redis-cluster詳解

    Redis高可用集群redis-cluster詳解

    redis?cluster?是redis官方提供的分布式解決方案,在3.0版本后推出的,有效地解決了redis分布式的需求,當(dāng)一個(gè)redis節(jié)點(diǎn)掛了可以快速的切換到另一個(gè)節(jié)點(diǎn),對(duì)redis-cluster高可用集群相關(guān)知識(shí)感興趣的朋友一起看看吧
    2022-03-03
  • 淺談Redis阻塞的9種情況

    淺談Redis阻塞的9種情況

    本文主要介紹了淺談Redis阻塞的9種情況,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-03-03
  • Redis中的慢日志

    Redis中的慢日志

    這篇文章主要介紹了Redis中的慢日志,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-12-12
  • Linux服務(wù)器安裝redis數(shù)據(jù)庫(kù)圖文教程

    Linux服務(wù)器安裝redis數(shù)據(jù)庫(kù)圖文教程

    Redis是一個(gè)開(kāi)源的使用ANSI C語(yǔ)言編寫(xiě)、遵守BSD協(xié)議、支持網(wǎng)絡(luò)、可基于內(nèi)存亦可持久化的日志型、Key-Value數(shù)據(jù)庫(kù),并提供多種語(yǔ)言的API。這篇文章主要介紹了Linux服務(wù)器安裝redis數(shù)據(jù)庫(kù)圖文教程,需要的朋友可以參考下
    2018-03-03
  • Redis壓縮列表的設(shè)計(jì)與實(shí)現(xiàn)

    Redis壓縮列表的設(shè)計(jì)與實(shí)現(xiàn)

    壓縮列表(Ziplist)是 Redis 為了節(jié)省內(nèi)存而設(shè)計(jì)的一種緊湊型數(shù)據(jù)結(jié)構(gòu),主要用于存儲(chǔ)長(zhǎng)度較短且數(shù)量較少的元素集合,本文給大家介紹了Redis壓縮列表的設(shè)計(jì)與實(shí)現(xiàn),文中通過(guò)代碼示例講解的非常詳細(xì),需要的朋友可以參考下
    2024-08-08
  • Redis實(shí)現(xiàn)單設(shè)備登錄的場(chǎng)景分析

    Redis實(shí)現(xiàn)單設(shè)備登錄的場(chǎng)景分析

    這篇文章主要介紹了Redis實(shí)現(xiàn)單設(shè)備登錄,用戶(hù)首次登錄時(shí),將用戶(hù)信息存入Redis,key是用戶(hù)id,value是token,當(dāng)用戶(hù)在其他設(shè)備登錄時(shí),會(huì)重新生成token,這個(gè)時(shí)候原先的token已經(jīng)被覆蓋了,本文給大家提供樣例及核心代碼,感興趣的朋友參考下吧
    2022-04-04

最新評(píng)論