欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Springboot3+Redis實現(xiàn)消息隊列的多種方法小結(jié)

 更新時間:2025年03月20日 09:41:33   作者:jolly_xu  
本文主要介紹了Springboot3+Redis實現(xiàn)消息隊列的多種方法小結(jié),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

Redis實現(xiàn)消息隊列和RabbitMQ的優(yōu)缺點

Redis實現(xiàn)消息隊列的優(yōu)點:

  • 性能高:Redis是內(nèi)存數(shù)據(jù)庫,讀寫速度快,適合高并發(fā)的消息推送。
  • 數(shù)據(jù)結(jié)構(gòu)豐富:Redis支持多種數(shù)據(jù)結(jié)構(gòu),如列表(list)、集合(set)、有序集合(zset)等,可以實現(xiàn)多種消息隊列模式。
  • 易用性:Redis的命令簡單,學(xué)習(xí)成本低,易于上手。
  • 持久化:Redis支持RDB和AOF持久化,能夠保證數(shù)據(jù)的安全。
  • 分布式支持:Redis支持主從復(fù)制、哨兵和集群,可以實現(xiàn)高可用和分布式架構(gòu)。

Redis實現(xiàn)消息隊列的缺點:

  • 功能有限:相比于專業(yè)的消息隊列中間件,Redis的消息隊列功能相對簡單,不支持復(fù)雜的消息路由、事務(wù)、消息優(yōu)先級等特性。
  • 消息可靠性:Redis沒有內(nèi)置的消息確認(rèn)機制,需要自行實現(xiàn),可能會增加開發(fā)復(fù)雜度。
  • 數(shù)據(jù)量限制:受內(nèi)存限制,Redis不適合存儲大量消息。
  • 消息持久化問題:雖然Redis支持持久化,但在大數(shù)據(jù)量情況下,持久化可能會影響性能。

RabbitMQ的優(yōu)點:

  • 功能強大:RabbitMQ是一個專業(yè)的消息隊列中間件,支持消息持久化、事務(wù)、消息優(yōu)先級、延遲消息、死信隊列等高級特性。
  • 高可用性:RabbitMQ支持鏡像隊列,可以實現(xiàn)高可用架構(gòu)。
  • 靈活的路由:RabbitMQ支持多種交換機類型(direct, topic, headers, fanout),可以實現(xiàn)復(fù)雜的消息路由。
  • 客戶端支持廣泛:RabbitMQ有多種語言客戶端,方便集成。
  • 社區(qū)活躍:RabbitMQ有活躍的社區(qū),問題解決速度快。

RabbitMQ的缺點:

  • 性能相對較低:相比于Redis,RabbitMQ的性能略低,尤其是在高并發(fā)場景下。
  • 資源消耗:RabbitMQ需要更多的系統(tǒng)資源,如CPU和內(nèi)存。
  • 復(fù)雜性:RabbitMQ的概念模型較為復(fù)雜,學(xué)習(xí)曲線較陡峭

Redis適合于需要高速讀寫、輕量級消息隊列的場景,如果業(yè)務(wù)對消息隊列的功能要求不高,且已經(jīng)使用了Redis,可以考慮使用Redis實現(xiàn)消息隊列。其他情況下還是建議使用RabbitMQ

1.Spring Data Redis

這是Spring框架提供的一個用于簡化Redis操作的模塊。

初始準(zhǔn)備

1.1首先配置Pom依賴

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

1.2 在yml中配置redis相關(guān)信息

由于spring.redis.host 這種配置已經(jīng)被棄用,在新版的springboot中,需要在spring.data.redis.host 進行配置

spring:
  data:
    redis:
    # 改為自己的地址和密碼
      host: 10.69.37.213
      port: 6379
      password: Jolly
      # 連接超時時間,單位ms
      connect-timeout: 50000
      # 選擇第幾個數(shù)據(jù)庫,默認(rèn)為0,最大值15
      database: 0
      lettuce:
        pool:
          # 最大的活躍連接數(shù),不會超過這個數(shù),根據(jù)項目預(yù)期并發(fā)量調(diào)整
          max-active: 50
          # max-idle 指定了連接池中的最大空閑連接數(shù)。
          # 空閑連接是指那些沒有被使用,但是已經(jīng)創(chuàng)建并且保持在連接池中的連接
          # 這個值應(yīng)該與max-active相匹配或者稍微低一些,
          # 以保持連接池中有足夠的空閑連接來處理突發(fā)請求。
          # 設(shè)置得太高可能會導(dǎo)致資源浪費,因為空閑連接會占用內(nèi)存和其他資源。
          max-idle: 30
          #這個配置指定了連接池中的最小空閑連接數(shù)。
          #這個設(shè)置可以確保連接池始終保持一定數(shù)量的空閑連接,以便在請求到來時可以立即使用,而不需要等待連接的創(chuàng)建。
          # 這個值應(yīng)該根據(jù)你的應(yīng)用程序的基線負(fù)載來設(shè)置
          min-idle: 10
          # 當(dāng)連接池達到最大活躍連接數(shù)時,客戶端等待可用連接的最大時間(以毫秒為單位)。-1 表示無限等待
          # 如果設(shè)置為一個正數(shù),那么在等待時間超過這個值后,會拋出一個異常。
          max-wait: -1

1.3 設(shè)置redis的序列化

為了防止存入到redis的數(shù)據(jù)出現(xiàn)亂碼的情況,進行序列化的設(shè)置

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;

@Configuration
public class redisConfig {

	@ConditionalOnMissingBean(name = "redisTemplate")
    @Bean
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<Object, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(connectionFactory);
        // 默認(rèn)為utf-8,可以進行修改
        template.setKeySerializer(new StringRedisSerializer());
        // 原版默認(rèn)使用jdk的序列化方式JdkSerializationRedisSerializer
        Jackson2JsonRedisSerializer serializer = new Jackson2JsonRedisSerializer(Object.class);
        template.setValueSerializer(serializer);
        // 設(shè)置Hash的序列化化方式
        template.setHashKeySerializer(new StringRedisSerializer());
        template.setHashValueSerializer(serializer);
        // 設(shè)置屬性
        template.afterPropertiesSet();
        return template;
    }
}

2.Redis實現(xiàn)消息隊列的方式

2.1 使用Redis的List實現(xiàn)消息隊列

首先構(gòu)造一個簡單的訂單類,用于后面消息隊列測試

import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
public class order implements Serializable {
    private int id;
    private String userid;
    private String goodName;
}

我們使用最簡單的方式來實現(xiàn)消息隊列,直接不斷輪詢List中是否有消息

import jakarta.annotation.Resource;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class Mq {

    @Resource
    private RedisTemplate<String, Object> redisTemplate;

	// 隊列名
    private final String queue = "order_queue";

    @GetMapping("/order")
    public void order(){
       	// 為了模擬消息的獲取,異步開啟一個線程,進行消息處理
        Thread thread = new Thread(() -> {
            try {
                processData();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        thread.start();
         // 模擬產(chǎn)生10條消息,放入隊列
        for (int i = 0; i < 10; i++) {
            order order = new order(i, i, "goods" + i);
            redisTemplate.opsForList().leftPush(queue, order);
            System.out.println("放入消息隊列:"+i);
        }
    }

    // 處理消息,不斷的輪詢隊列中的消息
    public void processData() throws InterruptedException {
        while (true){
            Object order = redisTemplate.opsForList().rightPop(queue);
            if(order == null){
                System.out.println("當(dāng)前沒有消息");
                Thread.sleep(1000);
            }else{
                System.out.println("處理消息:"+order);
            }
        }
    }
}

這種方式是最簡單的方式,但是不推薦,因為一直輪詢是會浪費CPU資源的,拉低服務(wù)端的性能。

2.2 消息訂閱模式

Redis 支持消息隊列的一種模式是通過其發(fā)布訂閱(Publish/Subscribe)功能。這種模式允許客戶端訂閱一個或多個頻道(channel),并接收發(fā)送到這些頻道的消息。

2.2.1 發(fā)布消息

這一步是比較簡單的,直接調(diào)用方法即可.

    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    private final String CHANNEL_NAME  = "order_queue";
    
	@GetMapping("/order")
    public void order(){
        // 模擬產(chǎn)生10條消息,放入隊列
        for (int i = 0; i < 10; i++) {
            order order = new order(i, i, "goods" + i);
            //發(fā)布消息
            redisTemplate.convertAndSend(CHANNEL_NAME, order);
            System.out.println("放入消息隊列:"+i);
        }
    }

2.2.2 消息監(jiān)聽

首先我們需要取實現(xiàn)MessageListener接口的方法

import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Service;

@Service
public class sub implements MessageListener {
	
	// 當(dāng)監(jiān)聽到有消息的時候,就會執(zhí)行這個方法
    @Override
    public void onMessage(Message message, byte[] pattern) {
        String msg = new String(message.getBody());
        // 模擬延遲處理
        try {
            Thread.sleep(2000);  // 假設(shè)處理需要2秒
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        System.out.println("處理消息:"+msg);
    }

}

然后可以在開始的redisConfig類里面加上下面的代碼

	/**
     * 因為標(biāo)記了@Bean 注解,所以會在springboot啟動的時候調(diào)用該方法創(chuàng)建,也可以放在其他地方進行創(chuàng)建
     * 當(dāng)調(diào)用這個方法時,RedisConnectionFactory 這個對象已經(jīng)存在于springboot的容器內(nèi),然后調(diào)用這個
     * 方法的時候就會傳入該參數(shù),執(zhí)行方法后會創(chuàng)建一個RedisMessageListenerContainer,這樣可以在其他類
     * 里面管理這些監(jiān)聽MessageListener
     * @param connectionFactory
     * @return
     */
    @Bean
    public RedisMessageListenerContainer redisContainer(RedisConnectionFactory connectionFactory) {
        // 首先創(chuàng)建一個監(jiān)聽的容器,這個容器可以傳入多個MessageListener
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        // 注入一個連接池工廠
        container.setConnectionFactory(connectionFactory);
        // 創(chuàng)建一個自己的監(jiān)聽類
        sub sub = new sub();
        // 然后和名為order_queue的通道進行綁定
        container.addMessageListener(sub,new ChannelTopic("order_queue"));
        return container;
    }

2.2.3 結(jié)果

在這里插入圖片描述

2.3 基于Stream進行實現(xiàn)

Redis Stream 是 Redis 5.0 版本引入的一種新的數(shù)據(jù)結(jié)構(gòu),用于存儲和操作消息流。它類似于消息隊列,但提供了更豐富的功能,允許你以有序、可持久化的方式存儲多個字段-值對的消息。

2.3.1 優(yōu)點

  • 持久化存儲:Stream可以持久化消息到磁盤,即使Redis服務(wù)器重啟,消息也不會丟失。
  • 有序性:Stream保證了消息的有序性,每個消息都有一個唯一的ID,按照進入隊列的順序排列。
  • 多消費者:Stream支持多個消費者,可以有不同的組(Consumer Groups)消費同一個Stream,而且每個消費者可以獨立消費,不會互相影響。
  • 消息確認(rèn)機制:消費者可以讀取消息并進行確認(rèn)(ACK),確保消息不會因為消費者故障而丟失。
  • 消息回溯:Stream允許新的消費者從任意位置開始消費消息,包括從Stream的開始位置,這使得新的消費者可以回溯并處理之前的消息。
  • 靈活的消息長度:Stream中的消息可以是任意長度的字符串,可以包含復(fù)雜的數(shù)據(jù)結(jié)構(gòu),如JSON。
  • 阻塞讀取:消費者可以使用BLPOPBRPOP等命令進行阻塞讀取,直到有新的消息到來。
  • 事務(wù)支持:可以利用Redis的事務(wù)特性,確保消息的寫入和讀取操作是原子性的。
  • 時間戳:每個消息可以包含時間戳字段,便于進行基于時間的消息管理。
  • 易于監(jiān)控:Stream的結(jié)構(gòu)便于監(jiān)控隊列的長度、消費者狀態(tài)等信息。

2.3.2 實現(xiàn)

我們模擬一個搶購訂單場景,比如我們的服務(wù)器只能每秒處理50個請求,請求太多可能會導(dǎo)致我們的服務(wù)直接宕機,那么我們可以把請求放入消息隊列,讓消息隊列來抗住大量的請求。
我們的策略可以是消息隊列限量50個請求,當(dāng)請求到來時,消息數(shù)量大于50n我們直接返回讓用戶重試,服務(wù)太忙的提示,這也是很常見的提示。

import com.xujialin.springboot3_study.entity.order;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;

@RestController
public class streamCont {

    @Resource
    private RedisTemplate<String,Object> redisTemplate;

    private String stream_key = "stream_key";



    @GetMapping("/order")
    public void order() {
        //封裝請求,假裝這是高并發(fā)場景
        for (int j = 0; j < 100; j++) {
            new Thread(() -> {
                for (int i = 0; i < 100; i++) {
                    order order = new order(i, i, "goods" + i);
                    HashMap<String,order> map = new HashMap<>();
                    map.put("order", order);
                    Long size = redisTemplate.opsForStream().size(stream_key);
                    if(size > 500){
                        System.out.println("活動太火爆了,請重試!");
                        continue;
                    }
                    redisTemplate.opsForStream().add(stream_key,map);
                }
            }).start();
        }
        System.out.println("恭喜你搶到了");
    }

    @PostConstruct
    public void init(){
        // 第一個是stream的key,第二個是組名
//        redisTemplate.opsForStream().createGroup(stream_key, "g1");
        try {
            redisTemplate.opsForStream().createGroup(stream_key, "g1");
        } catch (RedisSystemException e) {
            // 如果 group 已存在,拋出異常,可忽略
            System.out.println("group已經(jīng)存在");
        }
        for (int i = 0; i < 5; i++) {
            new Thread(new consumer()).start();
        }
    }

    class consumer implements Runnable {

        @Override
        public void run() {
            while(true){
                // 讀取消息
                List<MapRecord<String, Object, Object>> list = redisTemplate.opsForStream().read(
                        // 為g1消費者組創(chuàng)建一個消費則名字,可以為每個線程的名字,也可以為一個固定的名字,
                        // 一條消息最多只能被組里面的一個消費者消費,如果一條消息同時被兩個消費者消費,
                        // 那么這兩個消費者應(yīng)該隸屬于不同的消費者組,所以同一個名字或者不同的名字,對于同一個
                        // 消費組沒有太大區(qū)別
                        Consumer.from("g1", Thread.currentThread().getName()),
                        // 創(chuàng)建一個讀取選項,創(chuàng)建一個空的 StreamReadOptions 實例。這是配置讀取選項的起點
                        // .count(1): 設(shè)置讀取操作返回的最大消息數(shù)量。意味著每次讀取操作最多只會返回一條消息。
                        //.block(Duration.ofSeconds(2)): 配置讀取操作為阻塞模式,并設(shè)置阻塞的超時時間為2s,
                        // 也可以設(shè)置單位
                        StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                        // 創(chuàng)建一個偏移量,ReadOffset.lastConsumed(): 這是指定讀取消息的偏移量。
                        // 表示從消費者組中最后一次確認(rèn)消費的消息之后開始讀取新的消息。
                        StreamOffset.create( "stream_key", ReadOffset.lastConsumed()));

                // 讀取消息
                if (list != null && !list.isEmpty()) {
                    MapRecord<String, Object, Object> entries = list.get(0);
                    // 模擬處理消息
                    System.out.println(entries);
                    // 確認(rèn)消息
                    redisTemplate.opsForStream().acknowledge("stream_key","g1",entries.getId());
                }
            }
        }
    }
}

還可以使用更優(yōu)雅的實現(xiàn),使用 StreamMessageListenerContainer 可以創(chuàng)建一個更高級的消息監(jiān)聽機制,它允許你注冊 StreamListener,這樣你就可以實現(xiàn)基于事件的異步消息處理,而不是阻塞讀取。這種方式更適合生產(chǎn)環(huán)境,因為它提供了更好的資源管理和錯誤處理機制。

到此這篇關(guān)于Springboot3+Redis實現(xiàn)消息隊列的多種方法小結(jié)的文章就介紹到這了,更多相關(guān)Springboot3 Redis消息隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論