欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot使用Redis Stream實(shí)現(xiàn)輕量消息隊(duì)列的示例代碼

 更新時(shí)間:2024年08月23日 09:19:32   作者:少年醬105974  
Redis Stream 是 Redis 5.0 引入的一種數(shù)據(jù)結(jié)構(gòu),用于處理日志類型的數(shù)據(jù),它提供了高效、可靠的方式來處理和存儲(chǔ)時(shí)間序列數(shù)據(jù),如事件、消息等,本文介紹了SpringBoot使用Redis Stream實(shí)現(xiàn)輕量消息隊(duì)列,需要的朋友可以參考下

引言

Redis Stream 是 Redis 5.0 引入的一種數(shù)據(jù)結(jié)構(gòu),用于處理日志類型的數(shù)據(jù)。它提供了高效、可靠的方式來處理和存儲(chǔ)時(shí)間序列數(shù)據(jù),如事件、消息等。其設(shè)計(jì)靈感源于 Kafka 和類似的消息隊(duì)列系統(tǒng),且完全集成在 Redis 中,利用了 Redis 的高性能和持久化特性。

依賴

<dependency>  
    <groupId>org.springframework.boot</groupId>  
    <artifactId>spring-boot-starter-data-redis</artifactId>  
</dependency>

說明:此部分定義了 Redis 相關(guān)的依賴,確保項(xiàng)目能夠引入并使用 Spring Boot 提供的 Redis 啟動(dòng)器。

RedisTemplate 配置

package com.mjg.config;  

import com.fasterxml.jackson.annotation.JsonAutoDetect;  
import com.fasterxml.jackson.annotation.PropertyAccessor;  
import com.fasterxml.jackson.databind.ObjectMapper;  
import com.fasterxml.jackson.databind.SerializationFeature;  
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
import org.springframework.data.redis.connection.RedisConnectionFactory;  
import org.springframework.data.redis.core.RedisTemplate;  
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;  
import org.springframework.data.redis.serializer.StringRedisSerializer;  

@Configuration  
public class RedisConfig {  

    @Bean  
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {  
        RedisTemplate<String, Object> template = new RedisTemplate<>();  
        template.setConnectionFactory(connectionFactory);  
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);  
        ObjectMapper om = new ObjectMapper();  
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);  
//        om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);  
        // 注冊(cè) Java 8 日期時(shí)間模塊  
        om.registerModule(new JavaTimeModule());  
        om.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);  
        om.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);  
        jackson2JsonRedisSerializer.serialize(om);  
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();  
        // key 采用 String 的序列化方式  
        template.setKeySerializer(stringRedisSerializer);  
        // hash 的 key 也采用 String 的序列化方式  
        template.setHashKeySerializer(stringRedisSerializer);  
        // value 序列化方式采用 jackson  
        template.setValueSerializer(jackson2JsonRedisSerializer);  
        // hash 的 value 序列化方式采用 jackson  
        template.setHashValueSerializer(jackson2JsonRedisSerializer);  
        template.afterPropertiesSet();  
        return template;  
    }  
}

說明:此配置類用于設(shè)置 RedisTemplate 的序列化方式,以滿足不同數(shù)據(jù)類型的存儲(chǔ)和讀取需求。

RedisStreamConfig

package com.mjg.config;  

import cn.hutool.core.convert.Convert;  
import cn.hutool.core.util.StrUtil;  
import lombok.RequiredArgsConstructor;  
import lombok.SneakyThrows;  
import lombok.extern.slf4j.Slf4j;  
import org.springframework.beans.factory.DisposableBean;  
import org.springframework.beans.factory.InitializingBean;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
import org.springframework.data.redis.connection.RedisConnectionFactory;  
import org.springframework.data.redis.connection.RedisServerCommands;  
import org.springframework.data.redis.connection.stream.Consumer;  
import org.springframework.data.redis.connection.stream.ObjectRecord;  
import org.springframework.data.redis.connection.stream.ReadOffset;  
import org.springframework.data.redis.connection.stream.StreamOffset;  
import org.springframework.data.redis.core.RedisCallback;  
import org.springframework.data.redis.core.RedisTemplate;  
import org.springframework.data.redis.core.StreamOperations;  
import org.springframework.data.redis.stream.StreamListener;  
import org.springframework.data.redis.stream.StreamMessageListenerContainer;  
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;  
import org.springframework.util.Assert;  

import java.net.InetAddress;  
import java.time.Duration;  
import java.util.Properties;  

@Slf4j  
@RequiredArgsConstructor  
@Configuration  
public class RedisStreamConfig implements InitializingBean, DisposableBean {  
    private final RedisTemplate<String, Object> redisTemplate;  

    public static String streamName = "user-event-stream";  
    public static String userEventGroup = "user-event-group";  
    private final ThreadPoolTaskExecutor threadPoolTaskExecutor;  

    /**  
     * 消息偵聽器容器,用于監(jiān)聽 Redis Stream 中的消息  
     *  
     * @param connectionFactory Redis 連接工廠,用于創(chuàng)建 Redis 連接  
     * @param messageConsumer   消息消費(fèi)者,用于處理接收到的消息  
     * @return 返回 {@link StreamMessageListenerContainer}<{@link String}, {@link ObjectRecord}<{@link String}, {@link String}>> 類型的消息偵聽器容器  
     */  
    @Bean  
    public StreamMessageListenerContainer<String, ObjectRecord<String, String>> messageListenerContainer(RedisConnectionFactory connectionFactory, MessageConsumer messageConsumer) {  
        StreamMessageListenerContainer<String, ObjectRecord<String, String>> listenerContainer = streamContainer(streamName, connectionFactory, messageConsumer);  
        listenerContainer.start();  
        return listenerContainer;  
    }  

    /**  
     * 創(chuàng)建一個(gè)流容器,用于監(jiān)聽 Redis Stream 中的數(shù)據(jù)  
     *  
     * @param streamName        Redis Stream 的名稱  
     * @param connectionFactory Redis 連接工廠  
     * @param streamListener    綁定的監(jiān)聽類  
     * @return 返回 StreamMessageListenerContainer 對(duì)象  
     */  
    @SneakyThrows  
    private StreamMessageListenerContainer<String, ObjectRecord<String, String>> streamContainer(String streamName, RedisConnectionFactory connectionFactory, StreamListener<String, ObjectRecord<String, String>> streamListener) {  
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options =  
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions  
                      .builder()  
                      .pollTimeout(Duration.ofSeconds(5)) // 拉取消息超時(shí)時(shí)間  
                      .batchSize(10) // 批量抓取消息  
                      .targetType(String.class) // 傳遞的數(shù)據(jù)類型  
                      .executor(threadPoolTaskExecutor)  
                      .build();  
        StreamMessageListenerContainer<String, ObjectRecord<String, String>> container = StreamMessageListenerContainer  
              .create(connectionFactory, options);  
        // 指定消費(fèi)最新的消息  
        StreamOffset<String> offset = StreamOffset.create(streamName, ReadOffset.lastConsumed());  
        // 創(chuàng)建消費(fèi)者  
        StreamMessageListenerContainer.StreamReadRequest<String> streamReadRequest = buildStreamReadRequest(offset, streamListener);  
        // 指定消費(fèi)者對(duì)象  
        container.register(streamReadRequest, streamListener);  
        return container;  
    }  

    /**  
     * 生成流讀取請(qǐng)求  
     *  
     * @param offset         偏移量,用于指定從 Redis Stream 中的哪個(gè)位置開始讀取消息  
     * @param streamListener 流偵聽器,用于處理接收到的消息  
     * @return 返回一個(gè) StreamReadRequest 對(duì)象,表示一個(gè)流讀取請(qǐng)求  
     * @throws Exception 當(dāng) streamListener 無法識(shí)別為 MessageConsumer 類型時(shí),拋出異常  
     */  
    private StreamMessageListenerContainer.StreamReadRequest<String> buildStreamReadRequest(StreamOffset<String> offset, StreamListener<String, ObjectRecord<String, String>> streamListener) throws Exception {  
        Consumer consumer;  
        if (streamListener instanceof MessageConsumer) {  
            consumer = Consumer.from(userEventGroup, InetAddress.getLocalHost().getHostName());  
        } else {  
            throw new Exception("無法識(shí)別的 stream key");  
        }  
        // 關(guān)閉自動(dòng) ack 確認(rèn)  
        return StreamMessageListenerContainer.StreamReadRequest.builder(offset)  
              .errorHandler((error) -> {  
                    log.error(error.getMessage());  
                })  
              .cancelOnError(e -> false)  
              .consumer(consumer)  
                // 關(guān)閉自動(dòng) ack 確認(rèn)  
              .autoAcknowledge(false)  
              .build();  
    }  

    /**  
     * 檢查 Redis 版本是否符合要求  
     *  
     * @throws IllegalStateException 如果 Redis 版本小于 5.0.0 版本,拋出該異常  
     */  
    private void checkRedisVersion() {  
        // 獲得 Redis 版本  
        Properties info = redisTemplate.execute((RedisCallback<Properties>) RedisServerCommands::info);  
        Assert.notNull(info, "Redis info is null");  
        Object redisVersion = info.get("redis_version");  
        Integer anInt = Convert.toInt(redisVersion);  
        if (anInt < 5) {  
            throw new IllegalStateException(StrUtil.format("您當(dāng)前的 Redis 版本為 {},小于最低要求的 5.0.0 版本!", redisVersion));  
        }  
    }  

    @Override  
    public void destroy() throws Exception {  
    }  
    @Override  
    public void afterPropertiesSet() throws Exception {  
        checkRedisVersion();  
        StreamOperations<String, Object, Object> streamOperations = redisTemplate.opsForStream();  
        if (Boolean.FALSE.equals(redisTemplate.hasKey(streamName))) {  
            streamOperations.createGroup(streamName, ReadOffset.from("0"), userEventGroup);  
        }  
    }  
}

說明:該配置類實(shí)現(xiàn)了對(duì) Redis Stream 的相關(guān)配置,包括消息監(jiān)聽容器的創(chuàng)建、流讀取請(qǐng)求的生成、Redis 版本的檢查以及組的創(chuàng)建等功能。

生產(chǎn)者

package com.mjg.config;  

import lombok.RequiredArgsConstructor;  
import lombok.extern.slf4j.Slf4j;  
import org.springframework.data.redis.connection.stream.RecordId;  
import org.springframework.data.redis.connection.stream.StreamRecords;  
import org.springframework.data.redis.core.RedisTemplate;  
import org.springframework.stereotype.Component;  

import java.util.Collections;  

@Component  
@RequiredArgsConstructor  
@Slf4j  
public class MessageProducer {  

    private final RedisTemplate<String, Object> redisTemplate;  

    public void sendMessage(String streamKey, Object message) {  
        RecordId recordId = redisTemplate  
              .opsForStream().add(StreamRecords.newRecord()  
                      .ofMap(Collections.singletonMap("data", message))  
                      .withStreamKey(streamKey));  
        if (recordId!= null) {  
            log.info("Message sent to Stream '{}' with RecordId: {}", streamKey, recordId);  
        }  
    }  
}

說明:MessageProducer 類負(fù)責(zé)向 Redis Stream 發(fā)送消息。

消費(fèi)者

package com.mjg.config;  

import lombok.RequiredArgsConstructor;  
import org.springframework.data.redis.connection.stream.ObjectRecord;  
import org.springframework.data.redis.core.RedisTemplate;  
import org.springframework.data.redis.stream.StreamListener;  
import org.springframework.stereotype.Component;  

@RequiredArgsConstructor  
@Component  
public class MessageConsumer implements StreamListener<String, ObjectRecord<String, String>> {  

    private final RedisTemplate<String, Object> redisTemplate;  

    @Override  
    public void onMessage(ObjectRecord<String, String> message) {  
        String stream = message.getStream();  
        String messageId = message.getId().toString();  
        String messageBody = message.getValue();  

        System.out.println("Received message from Stream '" + stream + "' with messageId: " + messageId);  
        System.out.println("Message body: " + messageBody);  

//        消息應(yīng)答  
        redisTemplate.opsForStream().acknowledge(RedisStreamConfig.streamName, RedisStreamConfig.userEventGroup, message.getId());  
    }  
}

說明:MessageConsumer 類實(shí)現(xiàn)了 StreamListener 接口,用于處理從 Redis Stream 接收到的消息,并進(jìn)行相應(yīng)的應(yīng)答操作。

測(cè)試

@RequiredArgsConstructor  
@Slf4j  
@RestController  
public class MessageController {  
    public static String streamName = "user-event-stream";  
    private final MessageProducer messageProducer;  

    @GetMapping("/send")  
    public void send() {  
        messageProducer.sendMessage(streamName, "hello 啦啦啦啦" + LocalDateTime.now());  
    }  
}

說明:MessageController 類中的 send 方法通過調(diào)用 MessageProducer 來發(fā)送消息到指定的 Redis Stream 中。

以上就是SpringBoot使用Redis Stream實(shí)現(xiàn)輕量消息隊(duì)列的示例代碼的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot Redis Stream輕量消息隊(duì)列的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • java控制臺(tái)輸出版多人聊天室

    java控制臺(tái)輸出版多人聊天室

    這篇文章主要為大家詳細(xì)介紹了java控制臺(tái)輸出版多人聊天室,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2022-09-09
  • 詳解JavaEE 使用 Redis 數(shù)據(jù)庫進(jìn)行內(nèi)容緩存和高訪問負(fù)載

    詳解JavaEE 使用 Redis 數(shù)據(jù)庫進(jìn)行內(nèi)容緩存和高訪問負(fù)載

    本篇文章主要介紹了JavaEE 使用 Redis 數(shù)據(jù)庫進(jìn)行內(nèi)容緩存和高訪問負(fù)載,具有一定的參考價(jià)值,有興趣的可以了解一下
    2017-08-08
  • Fastjson反序列化隨機(jī)性失敗示例詳解

    Fastjson反序列化隨機(jī)性失敗示例詳解

    這篇文章主要為大家介紹了Fastjson反序列化隨機(jī)性失敗示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-08-08
  • Java中的FutureTask源碼解析

    Java中的FutureTask源碼解析

    這篇文章主要介紹了Java中的FutureTask源碼解析,FutureTask是一個(gè)可取消的異步計(jì)算,這個(gè)類是Future的實(shí)現(xiàn)類,有開始和取消一個(gè)計(jì)算的方法,如果一個(gè)計(jì)算已經(jīng)完成可以查看結(jié)果,需要的朋友可以參考下
    2023-12-12
  • Java pdu短信解碼全面解析

    Java pdu短信解碼全面解析

    本文是根據(jù)python的方法改寫的pdu短信解碼,非常不錯(cuò),代碼簡(jiǎn)單易懂具有參考借鑒價(jià)值,感興趣的朋友一起看看吧
    2016-10-10
  • Spring中ApplicationListener的使用解析

    Spring中ApplicationListener的使用解析

    這篇文章主要介紹了Spring中ApplicationListener的使用解析,ApplicationContext事件機(jī)制是觀察者設(shè)計(jì)模式的實(shí)現(xiàn),通過ApplicationEvent類和ApplicationListener接口,需要的朋友可以參考下
    2023-12-12
  • 老生常談Java中instanceof關(guān)鍵字的理解

    老生常談Java中instanceof關(guān)鍵字的理解

    java 中的instanceof 運(yùn)算符是用來在運(yùn)行時(shí)指出對(duì)象是否是特定類的一個(gè)實(shí)例。這篇文章主要介紹了老生常談Java中instanceof關(guān)鍵字的理解,需要的朋友可以參考下
    2018-10-10
  • 詳談hibernate,jpa與spring?data?jpa三者之間的關(guān)系

    詳談hibernate,jpa與spring?data?jpa三者之間的關(guān)系

    這篇文章主要介紹了hibernate,jpa與spring?data?jpa三者之間的關(guān)系,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-11-11
  • Java實(shí)現(xiàn)發(fā)送HTML內(nèi)容并帶附件的電子郵件

    Java實(shí)現(xiàn)發(fā)送HTML內(nèi)容并帶附件的電子郵件

    這篇文章主要為大家詳細(xì)介紹了如何使用Java實(shí)現(xiàn)發(fā)送HTML內(nèi)容并帶附件的電子郵件,文中的示例代碼講解詳細(xì),有需要的小伙伴可以參考一下
    2025-01-01
  • Spring MVC過濾器-登錄過濾的代碼實(shí)現(xiàn)

    Spring MVC過濾器-登錄過濾的代碼實(shí)現(xiàn)

    本篇文章主要介紹了Spring MVC過濾器-登錄過濾,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧。
    2017-01-01

最新評(píng)論