欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot中實現(xiàn)Redis?Stream隊列的代碼實例

 更新時間:2024年09月12日 10:14:19   作者:保加利亞的風(fēng)  
本文介紹了如何在Spring?Boot中使用Redis?Stream隊列進(jìn)行消息的生產(chǎn)和消費(fèi),涉及到的主要內(nèi)容包括添加Redis依賴、配置RedisTemplate、創(chuàng)建生產(chǎn)者和消費(fèi)者監(jiān)聽器等,需要的朋友可以參考下

前言

簡單實現(xiàn)一下在SpringBoot中操作Redis Stream隊列的方式,監(jiān)聽隊列中的消息進(jìn)行消費(fèi)。

  • jdk:1.8
  • springboot-version:2.6.3
  • redis:5.0.1(5版本以上才有Stream隊列)

準(zhǔn)備工作

1、pom

redis 依賴包(version 2.6.3)

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

2、 yml

spring: 
  redis:
    database: 0
    host: 127.0.0.1

3、 RedisStreamUtil工具類

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;

@Component
public class RedisStreamUtil {

	@Autowired
	private RedisTemplate<String, Object> redisTemplate;

	/**
	 * 創(chuàng)建消費(fèi)組
	 *
	 * @param key   鍵名稱
	 * @param group 組名稱
	 * @return {@link String}
	 */
	public String oup(String key, String group) {
		return redisTemplate.opsForStream().createGroup(key, group);
	}

	/**
	 * 獲取消費(fèi)者信息
	 *
	 * @param key   鍵名稱
	 * @param group 組名稱
	 * @return {@link StreamInfo.XInfoConsumers}
	 */
	public StreamInfo.XInfoConsumers queryConsumers(String key, String group) {
		return redisTemplate.opsForStream().consumers(key, group);
	}

	/**
	 * 查詢組信息
	 *
	 * @param key 鍵名稱
	 * @return
	 */
	public StreamInfo.XInfoGroups queryGroups(String key) {
		return redisTemplate.opsForStream().groups(key);
	}

	// 添加Map消息
	public String addMap(String key, Map<String, Object> value) {
		return redisTemplate.opsForStream().add(key, value).getValue();
	}

	// 讀取消息
	public List<MapRecord<String, Object, Object>> read(String key) {
		return redisTemplate.opsForStream().read(StreamOffset.fromStart(key));
	}

	// 確認(rèn)消費(fèi)
	public Long ack(String key, String group, String... recordIds) {
		return redisTemplate.opsForStream().acknowledge(key, group, recordIds);
	}

	// 刪除消息。當(dāng)一個節(jié)點的所有消息都被刪除,那么該節(jié)點會自動銷毀
	public Long del(String key, String... recordIds) {
		return redisTemplate.opsForStream().delete(key, recordIds);
	}

	// 判斷是否存在key
	public boolean hasKey(String key) {
		Boolean aBoolean = redisTemplate.hasKey(key);
		return aBoolean != null && aBoolean;
	}
}

代碼實現(xiàn)

生產(chǎn)者發(fā)送消息

生產(chǎn)者發(fā)送消息,在Service層創(chuàng)建addMessage方法,往隊列中發(fā)送消息。

代碼中addMap()方法第一個參數(shù)為key,第二個參數(shù)為value,該key要和后續(xù)配置的保持一致,暫時先記住這個key。

@Service
@Slf4j
@RequiredArgsConstructor
public class RedisStreamMqServiceImpl implements RedisStreamMqService {

    private final RedisStreamUtil redisStreamUtil;

    /**
     * 發(fā)送一個消息
     *
     * @return {@code Object}
     */
    @Override
    public Object addMessage() {
        RedisUser redisUser = new RedisUser();
        redisUser.setAge(18);
        redisUser.setName("hcr");
        redisUser.setEmail("156ef561@gmail.com");

        Map<String, Object> message = new HashMap<>();
        message.put("user", redisUser);

        String recordId = redisStreamUtil.addMap("mystream", message);
        return recordId;
    }
}

controller接口方法

@RestController
@RequestMapping("/redis")
@Slf4j
@RequiredArgsConstructor
public class RedisController {

    private final RedisStreamMqService redisStreamMqService;

    @GetMapping("/addMessage")
    public Object addMessage() {
        return redisStreamMqService.addMessage();
    }
}

調(diào)用測試,查看redis中是否正常添加數(shù)據(jù)。

接口返回數(shù)據(jù)

1702622585248-0

查看redis中的數(shù)據(jù)

消費(fèi)者監(jiān)聽消息進(jìn)行消費(fèi)

創(chuàng)建RedisConsumersListener監(jiān)聽器

import cn.hcr.utils.RedisStreamUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
@Slf4j
@RequiredArgsConstructor
public class RedisConsumersListener implements StreamListener<String, MapRecord<String, String, String>> {

    public final RedisStreamUtil redisStreamUtil;

    /**
     * 監(jiān)聽器
     *
     * @param message
     */
    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        // stream的key值
        String streamKey = message.getStream();
        //消息ID
        RecordId recordId = message.getId();
        //消息內(nèi)容
        Map<String, String> msg = message.getValue();
        log.info("【streamKey】= " + streamKey + ",【recordId】= " + recordId + ",【msg】=" + msg);

        //處理邏輯

        //邏輯處理完成后,ack消息,刪除消息,group為消費(fèi)組名稱
        StreamInfo.XInfoGroups xInfoGroups = redisStreamUtil.queryGroups(streamKey);
        xInfoGroups.forEach(xInfoGroup -> redisStreamUtil.ack(streamKey, xInfoGroup.groupName(), recordId.getValue()));
        redisStreamUtil.del(streamKey, recordId.getValue());
    }
}

創(chuàng)建RedisConfig配置類,配置監(jiān)聽

package cn.hcr.config;

import cn.hcr.listener.RedisConsumersListener;
import cn.hcr.utils.RedisStreamUtil;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import lombok.var;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;

import javax.annotation.Resource;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

@Configuration
@Slf4j
public class RedisConfig {

    @Resource
    private RedisStreamUtil redisStreamUtil;

    /**
     * redis序列化
     *
     * @param redisConnectionFactory
     * @return {@code RedisTemplate<String, Object>}
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        template.setKeySerializer(stringRedisSerializer);
        template.setHashKeySerializer(stringRedisSerializer);
        template.setValueSerializer(jackson2JsonRedisSerializer);
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }

    @Bean
    public Subscription subscription(RedisConnectionFactory factory) {
        AtomicInteger index = new AtomicInteger(1);
        int processors = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(), r -> {
            Thread thread = new Thread(r);
            thread.setName("async-stream-consumer-" + index.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        });
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
                StreamMessageListenerContainer
                        .StreamMessageListenerContainerOptions
                        .builder()
                        // 一次最多獲取多少條消息
                        .batchSize(5)
                        .executor(executor)
                        .pollTimeout(Duration.ofSeconds(1))
                        .errorHandler(throwable -> {
                            log.error("[MQ handler exception]", throwable);
                            throwable.printStackTrace();
                        })
                        .build();
        
        //該key和group可根據(jù)需求自定義配置
        String streamName = "mystream";
        String groupname = "mygroup";

        initStream(streamName, groupname);
        var listenerContainer = StreamMessageListenerContainer.create(factory, options);
        // 手動ask消息
        Subscription subscription = listenerContainer.receive(Consumer.from(groupname, "zhuyazhou"),
                StreamOffset.create(streamName, ReadOffset.lastConsumed()), new RedisConsumersListener(redisStreamUtil));
        // 自動ask消息
           /* Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from(redisMqGroup.getName(), redisMqGroup.getConsumers()[0]),
                    StreamOffset.create(streamName, ReadOffset.lastConsumed()), new ReportReadMqListener());*/
        listenerContainer.start();
        return subscription;
    }

    private void initStream(String key, String group) {
        boolean hasKey = redisStreamUtil.hasKey(key);
        if (!hasKey) {
            Map<String, Object> map = new HashMap<>(1);
            map.put("field", "value");
            //創(chuàng)建主題
            String result = redisStreamUtil.addMap(key, map);
            //創(chuàng)建消費(fèi)組
            redisStreamUtil.oup(key, group);
            //將初始化的值刪除掉
            redisStreamUtil.del(key, result);
            log.info("stream:{}-group:{} initialize success", key, group);
        }
    }
}

redisTemplate:該bean用于配置redis序列化

subscription:配置監(jiān)聽

initStream:初始化消費(fèi)組

監(jiān)聽測試

使用addMessage()方法投送一條消息后,查看控制臺輸出信息。

【streamKey】= mystream,
【recordId】= 1702623008044-0,
【msg】=
{user=[
    "cn.hcr.pojo.RedisUser",
    {"name":"hcr","age":18,"email":"156ef561@gmail.com"}
    ]
}

總結(jié)

以上就是在SpringBoot中簡單實現(xiàn)Redis Stream隊列的Demo,如有需要源碼或者哪里不清楚的請評論或者發(fā)送私信。
Template:該bean用于配置redis序列化

subscription:配置監(jiān)聽

initStream:初始化消費(fèi)組

到此這篇關(guān)于SpringBoot中實現(xiàn)Redis Stream隊列的文章就介紹到這了,更多相關(guān)SpringBoot實現(xiàn)Redis Stream隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java并發(fā) synchronized鎖住的內(nèi)容解析

    Java并發(fā) synchronized鎖住的內(nèi)容解析

    這篇文章主要介紹了Java并發(fā) synchronized鎖住的內(nèi)容解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2019-09-09
  • springboot在filter中如何用threadlocal存放用戶身份信息

    springboot在filter中如何用threadlocal存放用戶身份信息

    這篇文章主要介紹了springboot中在filter中如何用threadlocal存放用戶身份信息,本文章主要描述通過springboot的filter類,在過濾器中設(shè)置jwt信息進(jìn)行身份信息保存的方法,需要的朋友可以參考下
    2024-07-07
  • Java中的原生post請求方式

    Java中的原生post請求方式

    這篇文章主要介紹了Java中的原生post請求方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-10-10
  • SpringBoot+React實現(xiàn)計算個人所得稅

    SpringBoot+React實現(xiàn)計算個人所得稅

    本文將以個人所得稅的計算為例,使用React+SpringBoot+GcExcel來實現(xiàn)這一功能,文中的示例代碼講解詳細(xì),具有一定的學(xué)習(xí)價值,感興趣的小伙伴可以了解下
    2023-09-09
  • Spring實戰(zhàn)之SpEl語法實例詳解

    Spring實戰(zhàn)之SpEl語法實例詳解

    這篇文章主要介紹了Spring實戰(zhàn)之SpEl語法,結(jié)合實例形式分析了SpEl創(chuàng)建數(shù)組、集合及解析變量等相關(guān)操作原理與實現(xiàn)技巧,需要的朋友可以參考下
    2019-12-12
  • SpringBoot 自動配置原理及源碼解析

    SpringBoot 自動配置原理及源碼解析

    SpringBoot 在項目啟動的時候封裝了創(chuàng)建對象的方法,無需我們手動配置,接下來通過本文給大家介紹SpringBoot 自動配置原理解析及源碼展示,感興趣的朋友一起看看吧
    2021-06-06
  • SpringBoot項目設(shè)置斷點debug調(diào)試無效忽略web.xml問題的解決

    SpringBoot項目設(shè)置斷點debug調(diào)試無效忽略web.xml問題的解決

    這篇文章主要介紹了SpringBoot項目設(shè)置斷點debug調(diào)試無效忽略web.xml問題的解決,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-08-08
  • Java編程利用socket多線程訪問服務(wù)器文件代碼示例

    Java編程利用socket多線程訪問服務(wù)器文件代碼示例

    這篇文章主要介紹了Java編程利用socket多線程訪問服務(wù)器文件代碼示例,具有一定參考價值,需要的朋友可以了解下。
    2017-10-10
  • Java的@Transactional、@Aysnc、事務(wù)同步問題詳解

    Java的@Transactional、@Aysnc、事務(wù)同步問題詳解

    這篇文章主要介紹了Java的@Transactional、@Aysnc、事務(wù)同步問題詳解,現(xiàn)在我們需要在一個業(yè)務(wù)方法中插入一個用戶,這個業(yè)務(wù)方法我們需要加上事務(wù),然后插入用戶后,我們要異步的方式打印出數(shù)據(jù)庫中所有存在的用戶,需要的朋友可以參考下
    2023-11-11
  • Java中使用byte[]獲取16位字符串的技巧分享

    Java中使用byte[]獲取16位字符串的技巧分享

    在Java編程中,處理字符編碼和字節(jié)數(shù)組時,尤其是從??byte[]???數(shù)組中獲取字符串,可能會遇到字符集的復(fù)雜性問題,本文將介紹如何從一個??byte[]??數(shù)組中正確獲取16位字符串,需要的朋友可以參考下
    2024-08-08

最新評論