springboot使用Redis隊列實戰(zhàn)
前言
MQ應用有很多,比如ActiveMQ,RabbitMQ,Kafka等,但是也可以基于redis來實現(xiàn),可以降低系統(tǒng)的維護成本和實現(xiàn)復雜度,本篇介紹redis中實現(xiàn)消息隊列的幾種方案,并通過springboot實戰(zhàn)使其更易懂。
1. 基于List的 LPUSH+BRPOP 的實現(xiàn)
2. 基于Sorted-Set的實現(xiàn)
3. PUB/SUB,訂閱/發(fā)布模式
4. 基于Stream類型的實現(xiàn)
1. 基于List的 LPUSH+BRPOP 的實現(xiàn)
描述
使用rpush和lpush操作入隊列,lpop和rpop操作出隊列。
List支持多個生產(chǎn)者和消費者并發(fā)進出消息,每個消費者拿到都是不同的列表元素。
優(yōu)點
一旦數(shù)據(jù)到來則立刻醒過來,消息延遲幾乎為零。
缺點
- 不能重復消費,一旦消費就會被刪除
- 不能做廣播模式 , 不支持分組消費
- lpop和rpop會一直空輪訓,消耗資源 ,但可以 引入阻塞讀blpop和brpop 同時也有新的問題 如果線程一直阻塞在那里,Redis客戶端的連接就成了閑置連接,閑置過久,服務器一般會主動斷開連接,減少閑置資源占用,這個時候blpop和brpop或拋出異常
實戰(zhàn)
代碼
@Slf4j
@Service
public class ListRedisQueue {
//隊列名
public static final String KEY = "listQueue";
@Resource
private RedisTemplate redisTemplate;
public void produce(String message) {
redisTemplate.opsForList().rightPush(KEY, message);
}
public void consume() {
while (true) {
String msg = (String) redisTemplate.opsForList().leftPop(KEY);
log.info("瘋狂獲取消息:" + msg);
}
}
public void blockingConsume() {
while (true) {
List<Object> obj = redisTemplate.executePipelined(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
//隊列沒有元素會阻塞操作,直到隊列獲取新的元素或超時,5表示如果沒元素就每五秒去拿一次消息
return connection.bRPop(5, KEY.getBytes());
}
}, new StringRedisSerializer());
for (Object str : obj) {
log.info("blockingConsume獲取消息 : {}", str);
}
}
}
}測試
lPop/rPop消費數(shù)據(jù)
@Autowired
private ListRedisQueue listRedisQueue;
@Test
public void produce() {
for (int i = 0; i < 5; i++) {
listRedisQueue.produce("第"+i + "個數(shù)據(jù)");
}
}
@Test
public void consume() {
produce();
logger.info("生產(chǎn)消息完畢");
listRedisQueue.consume();
}輸出

blpop / brpop 消費數(shù)據(jù)
@Test
public void blockingConsume() {
produce();
logger.info("生產(chǎn)消息完畢");
listRedisQueue.blockingConsume();
}輸出

2. 基于Sorted-Set的實現(xiàn)延時隊列
描述
其實zset就是sorted set。為了避免sorted set簡寫sset導致命令沖突,所以改為zset。同理例如class-->clazz
sorted set從字面意思上,很容易就可以理解,是個有序且不可重復的數(shù)據(jù)集合。類似set和hash的混合體,但是相比于set,zset內(nèi)部由score進行排序.
優(yōu)點
可以自定義消息ID,在消息ID有意義時,比較重要。
缺點
缺點也明顯,不允許重復消息(因為是集合),同時消息ID確定有錯誤會導致消息的順序出錯。
實戰(zhàn)
代碼
@Slf4j
@Service
public class SortedSetRedisQueue {
//隊列名
public static final String KEY = "sortedSet_queue";
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public void produce(String msg, Double score) {
// 創(chuàng)建Sorted Set實例
ZSetOperations zSetOperations = redisTemplate.opsForZSet();
// 添加數(shù)據(jù)
zSetOperations.add(KEY, msg, score);
}
public void consumer() throws InterruptedException {
// 創(chuàng)建SortedSet實例
ZSetOperations zSetOperations = redisTemplate.opsForZSet();
while (true) {
// 拿取數(shù)據(jù) (rangeByScore返回有序集合中指定分數(shù)區(qū)間的成員列表。有序集成員按分數(shù)值遞增(從小到大)次序排列)
Set<String> order = zSetOperations.rangeByScore(KEY, 0, System.currentTimeMillis(), 0, 1);
if (ObjectUtils.isEmpty(order)) {
log.info("當前沒有數(shù)據(jù) 當前線程睡眠3秒");
TimeUnit.SECONDS.sleep(3);
// 跳過本次循環(huán) 重新循環(huán)拿取數(shù)據(jù)
continue;
}
// 利用迭代器拿取Set中的數(shù)據(jù)
String massage = order.iterator().next();
// 過河拆遷,拿到就刪除消息
if (zSetOperations.remove(KEY, massage) > 0) {
//做些業(yè)務處理
log.info("我拿到的消息:" + massage);
}
}
}
}測試
@Autowired
private SortedSetRedisQueue sortedSetRedisQueue;
@Test
public void sortedSetProduce() throws InterruptedException {
for (int i = 0; i < 5; i++) {
TimeUnit.SECONDS.sleep(1);
// 生成分數(shù)
double score = System.currentTimeMillis();
sortedSetRedisQueue.produce("第"+i + "個數(shù)據(jù)",score);
}
}
@Test
public void sortedSetConsumer() throws InterruptedException {
sortedSetProduce();
logger.info("生產(chǎn)消息完畢");
sortedSetRedisQueue.consumer();
}
}輸出

3.PUB/SUB,訂閱/發(fā)布模式
描述
SUBSCRIBE,用于訂閱信道
PUBLISH,向信道發(fā)送消息
UNSUBSCRIBE,取消訂閱
此模式允許生產(chǎn)者只生產(chǎn)一次消息,由中間件負責將消息復制到多個消息隊列,每個消息隊列由對應的消費組消費。
優(yōu)點
- 一個消息可以發(fā)布到多個消費者
- 消費者可以同時訂閱多個信道,因此可以接收多種消息(處理時先根據(jù)信道判斷)
- 消息即時發(fā)送,消費者會自動接收到信道發(fā)布的消息
缺點
- 消息發(fā)布時,如果客戶端不在線,則消息丟失
- 消費者處理消息時出現(xiàn)了大量消息積壓,則可能會斷開通道,導致消息丟失
- 消費者接收消息的時間不一定是一致的,可能會有差異(業(yè)務處理需要判重)
實戰(zhàn)
監(jiān)聽器
@Slf4j
@Component
public class RedisMessageListenerListener implements MessageListener {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 消息處理
*
* @param message
* @param pattern
*/
@Override
public void onMessage(Message message, byte[] pattern) {
String channel = new String(pattern);
log.info("onMessage --> 消息通道是:{}", channel);
RedisSerializer<?> valueSerializer = redisTemplate.getValueSerializer();
Object deserialize = valueSerializer.deserialize(message.getBody());
log.info("反序列化的結果:{}", deserialize);
if (deserialize == null) return;
String md5DigestAsHex = DigestUtils.md5DigestAsHex(deserialize.toString().getBytes(StandardCharsets.UTF_8));
log.info("計算得到的key: {}", md5DigestAsHex);
Boolean result = redisTemplate.opsForValue().setIfAbsent(md5DigestAsHex, "1", 20, TimeUnit.SECONDS);
if (Boolean.TRUE.equals(result)) {
// redis消息進行處理
log.info("接收的結果:{}", deserialize.toString());
} else {
log.info("其他服務處理中");
}
}
}實現(xiàn)MessageListener 接口,就可以通過onMessage()方法接收到消息了,該方法有兩個參數(shù):
- 參數(shù) message 的 getBody() 方法以二進制形式獲取消息體, getChannel() 以二進制形式獲取消息通道
- 參數(shù) pattern 二進制形式的消息通道(實際和 message.getChannel() 返回值相同)
綁定監(jiān)聽器
@Configuration
public class RedisMessageListenerConfig {
@Bean
public RedisMessageListenerContainer getRedisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory,
RedisMessageListenerListener redisMessageListenerListener) {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
redisMessageListenerContainer.addMessageListener(redisMessageListenerListener, new ChannelTopic(PubSubRedisQueue.KEY));
return redisMessageListenerContainer;
}
}RedisMessageListenerContainer 是為Redis消息偵聽器 MessageListener 提供異步行為的容器。處理偵聽、轉(zhuǎn)換和消息分派的低級別詳細信息。
本文使用的是主題訂閱:ChannelTopic,你也可以使用模式匹配:PatternTopic,從而匹配多個信道。
生產(chǎn)者
@Service
public class PubSubRedisQueue {
//隊列名
public static final String KEY = "pub_sub_queue";
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public void produce(String message) {
redisTemplate.convertAndSend(KEY, message);
}
} 測試
@Slf4j
@RestController
@RequestMapping(value = "/queue")
public class RedisMQController {
@Autowired
private PubSubRedisQueue pubSubRedisQueue;
@RequestMapping(value = "/pubsub/produce", method = RequestMethod.GET)
public void pubsubProduce(@RequestParam(name = "msg") String msg) {
pubSubRedisQueue.produce(msg);
}隨便找個瀏覽器請求生產(chǎn)者接口:

所以每插入一條消息,監(jiān)聽者則立即進去消費

4. 基于Stream類型的實現(xiàn)(Redis Version5.0)
描述
Stream為redis 5.0后新增的數(shù)據(jù)結構。支持多播的可持久化消息隊列,實現(xiàn)借鑒了Kafka設計。

Redis Stream的結構如上圖所示,它有一個消息鏈表,將所有加入的消息都串起來,每個消息都有一個唯一的ID和對應的內(nèi)容。消息是持久化的,Redis重啟后,內(nèi)容還在。
每個Stream都有唯一的名稱,它就是Redis的key,在我們首次使用xadd指令追加消息時自動創(chuàng)建。
每個Stream都可以掛多個消費組,每個消費組會有個游標last_delivered_id在Stream數(shù)組之上往前移動,表示當前消費組已經(jīng)消費到哪條消息了。每個消費組都有一個Stream內(nèi)唯一的名稱,消費組不會自動創(chuàng)建,它需要單獨的指令xgroup create進行創(chuàng)建,需要指定從Stream的某個消息ID開始消費,這個ID用來初始化last_delivered_id變量。
每個消費組(Consumer Group)的狀態(tài)都是獨立的,相互不受影響。也就是說同一份Stream內(nèi)部的消息會被每個消費組都消費到。
同一個消費組(Consumer Group)可以掛接多個消費者(Consumer),這些消費者之間是競爭關系,任意一個消費者讀取了消息都會使游標last_delivered_id往前移動。每個消費者者有一個組內(nèi)唯一名稱。
消費者(Consumer)內(nèi)部會有個狀態(tài)變量pending_ids,它記錄了當前已經(jīng)被客戶端讀取的消息,但是還沒有ack。如果客戶端沒有ack,這個變量里面的消息ID會越來越多,一旦某個消息被ack,它就開始減少。這個pending_ids變量在Redis官方被稱之為PEL,也就是Pending Entries List,這是一個很核心的數(shù)據(jù)結構,它用來確保客戶端至少消費了消息一次,而不會在網(wǎng)絡傳輸?shù)闹型緛G失了沒處理。
優(yōu)點
- 高性能:可以在非常短的時間內(nèi)處理大量的消息。
- 持久化:支持數(shù)據(jù)持久化,即使Redis服務器宕機,也可以恢復之前的消息。
- 順序性:保證消息的順序性,即使是并發(fā)的消息也會按照發(fā)送順序排列。
- 靈活性:可以方便地擴展和分布式部署,可以滿足不同場景下的需求。
缺點
- 功能相對簡單:Redis Stream相對于其他的消息隊列,功能相對簡單,無法滿足一些復雜的需求。
- 不支持消息回溯:即消費者無法獲取之前已經(jīng)消費過的消息。
- 不支持多消費者分組:無法實現(xiàn)多個消費者并發(fā)消費消息的功能。
實戰(zhàn)
自動ack消費者
@Slf4j
@Component
public class AutoAckStreamConsumeListener implements StreamListener<String, MapRecord<String, String, String>> {
//分組名
public static final String GROUP = "autoack_stream";
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Override
public void onMessage(MapRecord<String, String, String> message) {
String stream = message.getStream();
RecordId id = message.getId();
Map<String, String> map = message.getValue();
log.info("[自動ACK]接收到一個消息 stream:[{}],id:[{}],value:[{}]", stream, id, map);
redisTemplate.opsForStream().delete(GROUP, id.getValue());
}
}手動ack消費者
@Slf4j
@Component
public class BasicAckStreamConsumeListener implements StreamListener<String, MapRecord<String, String, String>> {
//分組名
public static final String GROUP = "basicack_stream";
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Override
public void onMessage(MapRecord<String, String, String> message) {
String stream = message.getStream();
RecordId id = message.getId();
Map<String, String> map = message.getValue();
log.info("[手動ACK]接收到一個消息 stream:[{}],id:[{}],value:[{}]", stream, id, map);
redisTemplate.opsForStream().acknowledge(stream, GROUP, id.getValue());
//消費完畢刪除該條消息
redisTemplate.opsForStream().delete(GROUP, id.getValue());
}
}綁定關系
@Slf4j
@Configuration
public class RedisStreamConfiguration {
@Autowired
private RedisConnectionFactory redisConnectionFactory;
@Autowired
private AutoAckStreamConsumeListener autoAckStreamConsumeListener;
@Autowired
private BasicAckStreamConsumeListener basicAckStreamConsumeListener;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Bean(initMethod = "start", destroyMethod = "stop")
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer() {
AtomicInteger index = new AtomicInteger(1);
int processors = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(), r -> {
Thread thread = new Thread(r);
thread.setName("async-stream-consumer-" + index.getAndIncrement());
thread.setDaemon(true);
return thread;
});
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
// 一次最多獲取多少條消息
.batchSize(3)
// 運行 Stream 的 poll task
.executor(executor)
// Stream 中沒有消息時,阻塞多長時間,需要比 `spring.redis.timeout` 的時間小
.pollTimeout(Duration.ofSeconds(3))
// 獲取消息的過程或獲取到消息給具體的消息者處理的過程中,發(fā)生了異常的處理
.errorHandler(new ErrorHandler() {
@Override
public void handleError(Throwable t) {
log.info("出現(xiàn)異常就來這里了" + t);
}
})
.build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer =
StreamMessageListenerContainer.create(redisConnectionFactory, options);
// 獨立消費
// 消費組A,自動ack
// 從消費組中沒有分配給消費者的消息開始消費
if (!isStreamGroupExists(StreamRedisQueue.KEY,AutoAckStreamConsumeListener.GROUP)){
redisTemplate.opsForStream().createGroup(StreamRedisQueue.KEY,AutoAckStreamConsumeListener.GROUP);
}
streamMessageListenerContainer.receiveAutoAck(Consumer.from(AutoAckStreamConsumeListener.GROUP, "AutoAckConsumer"),
StreamOffset.create(StreamRedisQueue.KEY, ReadOffset.lastConsumed()), autoAckStreamConsumeListener);
// 消費組B,不自動ack
if (!isStreamGroupExists(StreamRedisQueue.KEY,BasicAckStreamConsumeListener.GROUP)){
redisTemplate.opsForStream().createGroup(StreamRedisQueue.KEY,BasicAckStreamConsumeListener.GROUP);
}
streamMessageListenerContainer.receive(Consumer.from(BasicAckStreamConsumeListener.GROUP, "BasicAckConsumer"),
StreamOffset.create(StreamRedisQueue.KEY, ReadOffset.lastConsumed()), basicAckStreamConsumeListener);
return streamMessageListenerContainer;
}
/**
* 判斷該消費組是否存在
* @param streamKey
* @param groupName
* @return
*/
public boolean isStreamGroupExists(String streamKey, String groupName) {
RedisStreamCommands commands = redisConnectionFactory.getConnection().streamCommands();
//首先檢查Stream Key是否存在,否則下面代碼可能會因為嘗試檢查不存在的Stream Key而導致異常
if (!redisTemplate.hasKey(streamKey)){
return false;
}
//獲取streamKey下的所有groups
StreamInfo.XInfoGroups xInfoGroups = commands.xInfoGroups(streamKey.getBytes());
AtomicBoolean exists= new AtomicBoolean(false);
xInfoGroups.forEach(xInfoGroup -> {
if (xInfoGroup.groupName().equals(groupName)){
exists.set(true);
}
});
return exists.get();
}
}
生產(chǎn)工具
@Slf4j
@Service
public class StreamRedisQueue {
//隊列名
public static final String KEY = "stream_queue";
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public String produce(Map<String, String> value) {
return redisTemplate.opsForStream().add(KEY, value).getValue();
}
public void createGroup(String key, String group){
redisTemplate.opsForStream().createGroup(key, group);
}
}測試
生產(chǎn)消息
@Slf4j
@RestController
@RequestMapping(value = "/queue")
public class RedisMQController {
@Autowired
private StreamRedisQueue streamRedisQueue;
@RequestMapping(value = "/stream/produce", method = RequestMethod.GET)
public void streamProduce() {
Map<String, String> map = new HashMap<>();
map.put("劉德華", "大家好我是劉德華");
map.put("周杰倫", "周杰倫");
map.put("time", DateUtil.now());
String result = streamRedisQueue.produce(map);
log.info("返回結果:{}", result);
}
}只要有消息,消費者就會消費

到此這篇關于springboot使用Redis隊列實戰(zhàn)的文章就介紹到這了,更多相關springboot Redis隊列實內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Java實現(xiàn)Dbhelper支持大數(shù)據(jù)增刪改
這篇文章主要介紹了Java實現(xiàn)Dbhelper支持大數(shù)據(jù)增刪改功能的實現(xiàn)過程,感興趣的小伙伴們可以參考一下2016-01-01
基于Java實現(xiàn)回調(diào)監(jiān)聽工具類
這篇文章主要為大家詳細介紹了如何基于Java實現(xiàn)一個回調(diào)監(jiān)聽工具類,文中的示例代碼講解詳細,感興趣的小伙伴可以跟隨小編一起學習一下2025-04-04
解決java.util.NoSuchElementException異常正確方法
java.util.NoSuchElementException是Java中的一種異常,表示在迭代器或枚舉中找不到元素,這篇文章主要給大家介紹了關于解決java.util.NoSuchElementException異常的相關資料,需要的朋友可以參考下2023-11-11
SpringBoot整合Mybatis?LocalDateTime?映射失效的解決
這篇文章主要介紹了SpringBoot整合Mybatis?LocalDateTime?映射失效的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-01-01
為何HashSet中使用PRESENT而不是null作為value
這篇文章主要介紹了為何HashSet中使用PRESENT而不是null作為value,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-10-10

