Redis消息隊(duì)列、阻塞隊(duì)列、延時(shí)隊(duì)列的實(shí)現(xiàn)
redis 隊(duì)列的優(yōu)點(diǎn)是輕量級(jí),業(yè)務(wù)足夠簡(jiǎn)單時(shí)不需要使用rabbitMq這樣專業(yè)的消息中間件;缺點(diǎn)是彈出隊(duì)列中的元素時(shí),即使該消息處理失敗也無(wú)法再次進(jìn)行消費(fèi)
Redis隊(duì)列 List

一、普通隊(duì)列
可以直接使用Redis的list數(shù)據(jù)類型實(shí)現(xiàn)消息隊(duì)列,只需簡(jiǎn)單的兩個(gè)指令lpush和rpop或者rpush和lpop。
- lpush+rpop:左進(jìn)右出的隊(duì)列
- rpush+lpop:左出右進(jìn)的隊(duì)列
使用redis的命令來(lái)模擬普通隊(duì)列
使用lpush命令生產(chǎn)消息:
>lpush queue:single 1 "1" >lpush queue:single 2 "2" >lpush queue:single 3 "3"
使用rpop命令消費(fèi)消息:
>rpop queue:single "1" >rpop queue:single "2" >rpop queue:single "3"
使用Java代碼來(lái)實(shí)現(xiàn)普通隊(duì)列:
生產(chǎn)者SingleProducer
package com.morris.redis.demo.queue.single;
import redis.clients.jedis.Jedis;
/**
* 生產(chǎn)者
*/
public class SingleProducer {
public static final String SINGLE_QUEUE_NAME = "queue:single";
public static void main(String[] args) {
Jedis jedis = new Jedis();
for (int i = 0; i < 100; i++) {
jedis.lpush(SINGLE_QUEUE_NAME, "hello " + i);
}
jedis.close();
}
}
消費(fèi)者SingleConsumer:
package com.morris.redis.demo.queue.single;
import redis.clients.jedis.Jedis;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* 消費(fèi)者
*/
public class SingleConsumer {
public static void main(String[] args) throws InterruptedException {
Jedis jedis = new Jedis();
while (true) {
String message = jedis.rpop(SingleProducer.SINGLE_QUEUE_NAME);
if(Objects.nonNull(message)) {
System.out.println(message);
} else {
TimeUnit.MILLISECONDS.sleep(500);
}
}
}
}
上面的代碼已經(jīng)基本實(shí)現(xiàn)了普通隊(duì)列的生產(chǎn)與消費(fèi),但是上述的例子中消息的消費(fèi)者存在兩個(gè)問(wèn)題:
- 普通的redis隊(duì)列,為了實(shí)現(xiàn)業(yè)務(wù),通常會(huì)使用while進(jìn)行循環(huán),這樣的話沒(méi)有消息時(shí)依舊會(huì)頻繁的執(zhí)行循環(huán),造成cpu的空轉(zhuǎn),所以一般會(huì)在代碼中增加sleep來(lái)解決該問(wèn)題,但因此又會(huì)造成消息延遲問(wèn)題。
- 如果生產(chǎn)者速度大于消費(fèi)者消費(fèi)速度,消息隊(duì)列長(zhǎng)度會(huì)一直增大,時(shí)間久了會(huì)占用大量?jī)?nèi)存空間。
二、Redis阻塞隊(duì)列
redis隊(duì)列提供了 “阻塞式” 拉取消息的命令:BRPOP / BLPOP,這里的 B 指的是阻塞(Block)。
如果隊(duì)列為空,消費(fèi)者在拉取消息時(shí)就「阻塞等待」,一旦有新消息過(guò)來(lái),就通知消費(fèi)者立即處理新消息。
使用redis的brpop命令來(lái)模擬阻塞隊(duì)列
>brpop queue:single 30
使用 BRPOP 這種阻塞式方式拉取消息時(shí),還支持傳入一個(gè)「超時(shí)時(shí)間」,如果設(shè)置為 0,則表示不設(shè)置超時(shí),直到有新消息才返回,否則會(huì)在指定的超時(shí)時(shí)間后返回 NULL
Java阻塞隊(duì)列生產(chǎn)者實(shí)現(xiàn)如下:
package com.cxh;
import org.junit.jupiter.api.Test;
import redis.clients.jedis.Jedis;
import java.util.concurrent.TimeUnit;
/**
* 生產(chǎn)者類
* 生產(chǎn)者每隔600ms生成一條消息
* */
class MessageProducer extends Thread{
public static final String MESSAGE_KEY = "message:queue";
private volatile int count;
public void putMessage(String mess){
Jedis jedis = new Jedis("127.0.0.1", 6379);
/* jedis.auth("123456");*/
Long size = jedis.lpush(MESSAGE_KEY, mess);
System.out.println("Put " + Thread.currentThread().getName() + " put message " + count);
count++;
}
@Override
public synchronized void run() {
for(int i = 0 ; i < 1; i++){
putMessage("message" + count);
try {
Thread.sleep(600);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}Java阻塞隊(duì)列消費(fèi)者實(shí)現(xiàn)如下:
package com.cxh.Component;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import java.util.Arrays;
import java.util.List;
@Component
public class component implements CommandLineRunner {
public static final String MESSAGE_KEY = "message:queue";
@Override
public void run(String... args) throws Exception {
/* //todo: 需要執(zhí)行的方法
System.out.println(Arrays.toString(args));*/
System.out.println("comsumer 111");
Jedis jedis = new Jedis("127.0.0.1", 6379);
// String message = jedis.rpop(MESSAGE_KEY);
// System.out.println("Pop " + Thread.currentThread().getName() + "comsumer message = " + message); //多線程的時(shí)候使用
while (true){
List<String> message = jedis.brpop(0, MESSAGE_KEY);
System.out.println(message.toString());
}
}
}上面的代碼已經(jīng)實(shí)現(xiàn)了延遲隊(duì)列的生產(chǎn)與消費(fèi),需要注意的是:
- 無(wú)法實(shí)現(xiàn)一次生產(chǎn)多次消費(fèi)(使用 pub/sub 發(fā)布訂閱模式,可以實(shí)現(xiàn) 1:N 的消息隊(duì)列,即一次生產(chǎn),多端消費(fèi))
- 阻塞時(shí)間結(jié)束后代碼會(huì)繼續(xù)向下執(zhí)行
- 如果設(shè)置的超時(shí)時(shí)間太長(zhǎng),這個(gè)連接太久沒(méi)有活躍過(guò),可能會(huì)被 Redis Server 判定為無(wú)效連接,之后 Redis Server 會(huì)強(qiáng)制把這個(gè)客戶端踢下線。所以,客戶端要有處理機(jī)制。
實(shí)際項(xiàng)目中redis連接超時(shí)時(shí)間遠(yuǎn)大于20s,因此正常情況不會(huì)出現(xiàn)redis超時(shí)問(wèn)題。以防萬(wàn)一增加redis異常捕獲,出現(xiàn)異常時(shí)殺掉當(dāng)前進(jìn)程,同時(shí)supervisord會(huì)自動(dòng)重新拉起該進(jìn)程
三、Redis延遲隊(duì)列
zset 會(huì)按 score 進(jìn)行排序,如果 score 代表想要執(zhí)行時(shí)間的時(shí)間戳。在某個(gè)時(shí)間將它插入zset集合中,它會(huì)按照時(shí)間戳大小進(jìn)行排序,也就是對(duì)執(zhí)行時(shí)間前后進(jìn)行排序。
起一個(gè)死循環(huán)線程不斷地進(jìn)行取第一個(gè)key值,如果當(dāng)前時(shí)間戳大于等于該key值的socre就將它取出來(lái)進(jìn)行消費(fèi)刪除,可以達(dá)到延時(shí)執(zhí)行的目的。
下面使用redis的zset來(lái)模擬延時(shí)隊(duì)列
命令生產(chǎn)者:
127.0.0.1:6379> zadd queue:delay 1 order1 2 order2 3 order3 (integer) 0
命令消費(fèi)者:
127.0.0.1:6379> zrange queue:delay 0 0 withscores 1) "order1" 2) "1" 127.0.0.1:6379> zrem queue:delay order1 (integer) 1
使用Java代碼來(lái)實(shí)現(xiàn)普通隊(duì)列:
生產(chǎn)者DelayProducer :
package com.morris.redis.demo.queue.delay;
import redis.clients.jedis.Jedis;
import java.util.Date;
import java.util.Random;
/**
* 生產(chǎn)者
*/
public class DelayProducer {
public static final String DELAY_QUEUE_NAME = "queue:delay";
public static void main(String[] args) {
Jedis jedis = new Jedis();
long now = new Date().getTime();
Random random = new Random();
for (int i = 0; i < 10; i++) {
int second = random.nextInt(30); // 隨機(jī)訂單失效時(shí)間
jedis.zadd(DELAY_QUEUE_NAME, now + second * 1000, "order"+i);
}
jedis.close();
}
}
消費(fèi)者DelayConsumer :
package com.morris.redis.demo.queue.delay;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Tuple;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* 消費(fèi)者
*/
public class DelayConsumer {
public static void main(String[] args) throws InterruptedException {
Jedis jedis = new Jedis();
while (true) {
long now = new Date().getTime();
Set<Tuple> tupleSet = jedis.zrangeWithScores(DelayProducer.DELAY_QUEUE_NAME, 0, 0);
if(tupleSet.isEmpty()) {
TimeUnit.MILLISECONDS.sleep(500);
} else {
for (Tuple tuple : tupleSet) {
Double score = tuple.getScore();
long time = score.longValue();
if(time < now) {
jedis.zrem(DelayProducer.DELAY_QUEUE_NAME, tuple.getElement());
System.out.println("order[" + tuple.getElement() +"] is timeout at " + time);
} else {
TimeUnit.MILLISECONDS.sleep(500);
}
break;
}
}
}
}
}
應(yīng)用場(chǎng)景
延時(shí)隊(duì)列在項(xiàng)目中的應(yīng)用還是比較多的,尤其像電商類平臺(tái):
- 12306 下單成功后,在半個(gè)小時(shí)內(nèi)沒(méi)有支付,自動(dòng)取消訂單。
- 如果訂單一直處于某一個(gè)未完結(jié)狀態(tài)時(shí),及時(shí)處理關(guān)單,并退還庫(kù)存。
- 淘寶新建商戶一個(gè)月內(nèi)還沒(méi)上傳商品信息,將凍結(jié)商鋪等。
- 會(huì)議預(yù)定系統(tǒng),在預(yù)定會(huì)議開(kāi)始前半小時(shí)通知所有預(yù)定該會(huì)議的用戶。
- 安全工單超過(guò) 24 小時(shí)未處理,則自動(dòng)拉企業(yè)群提醒相關(guān)責(zé)任人。
- 用戶下單外賣(mài)以后,距離超時(shí)時(shí)間還有 10 分鐘時(shí)提醒外賣(mài)小哥即將超時(shí)。
- 外賣(mài)平臺(tái)發(fā)送訂餐通知,下單成功后 60s 給用戶推送短信。
到此這篇關(guān)于Redis消息隊(duì)列、阻塞隊(duì)列、延時(shí)隊(duì)列的實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)Redis消息隊(duì)列、阻塞隊(duì)列、延時(shí)隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- redis實(shí)現(xiàn)延時(shí)隊(duì)列的兩種方式(小結(jié))
- 基于Redis實(shí)現(xiàn)延時(shí)隊(duì)列的優(yōu)化方案小結(jié)
- 生產(chǎn)redisson延時(shí)隊(duì)列不消費(fèi)問(wèn)題排查解決
- Redisson 分布式延時(shí)隊(duì)列 RedissonDelayedQueue 運(yùn)行流程
- redis使用zset實(shí)現(xiàn)延時(shí)隊(duì)列的示例代碼
- redis實(shí)現(xiàn)分布式延時(shí)隊(duì)列的示例代碼
- Redis簡(jiǎn)易延時(shí)隊(duì)列的實(shí)現(xiàn)示例
- Redisson延時(shí)隊(duì)列RedissonDelayed的具體使用
- redis和rabbitmq實(shí)現(xiàn)延時(shí)隊(duì)列的示例代碼
相關(guān)文章
Redis并發(fā)訪問(wèn)問(wèn)題詳細(xì)講解
本文主要介紹了Redis如何應(yīng)對(duì)并發(fā)訪問(wèn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-12-12
AOP?Redis自定義注解實(shí)現(xiàn)細(xì)粒度接口IP訪問(wèn)限制
這篇文章主要為大家介紹了AOP?Redis自定義注解實(shí)現(xiàn)細(xì)粒度接口IP訪問(wèn)限制,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-10-10
Redis實(shí)現(xiàn)布隆過(guò)濾器的代碼詳解
布隆過(guò)濾器(Bloom?Filter)是Redis?4.0版本提供的新功能,它被作為插件加載到Redis服務(wù)器中,給Redis提供強(qiáng)大的去重功能,本文將給大家詳細(xì)介紹一下Redis布隆過(guò)濾器,文中有相關(guān)的代碼示例,需要的朋友可以參考下2023-07-07
NestJS+Redis實(shí)現(xiàn)緩存步驟詳解
這篇文章主要介紹了NestJS+Redis實(shí)現(xiàn)緩存,本文分步驟給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-08-08
解讀Redis秒殺優(yōu)化方案(阻塞隊(duì)列+基于Stream流的消息隊(duì)列)
該文章介紹了使用Redis的阻塞隊(duì)列和Stream流的消息隊(duì)列來(lái)優(yōu)化秒殺系統(tǒng)的方案,通過(guò)將秒殺流程拆分為兩條流水線,使用Redis緩存緩解數(shù)據(jù)庫(kù)壓力,并結(jié)合Lua腳本進(jìn)行原子性判斷,使用阻塞隊(duì)列和消息隊(duì)列異步處理訂單,有效提高了系統(tǒng)的并發(fā)處理能力和可用性2025-02-02
基于Redis實(shí)現(xiàn)每日登錄失敗次數(shù)限制
這篇文章主要介紹了通過(guò)redis實(shí)現(xiàn)每日登錄失敗次數(shù)限制的問(wèn)題,通過(guò)redis記錄登錄失敗的次數(shù),以用戶的username為key,本文給出了實(shí)例代碼,需要的朋友可以參考下2019-08-08

