欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Redis消息隊列、阻塞隊列、延時隊列的實現(xiàn)

 更新時間:2023年11月10日 11:28:07   作者:夏尋.  
Redis是一種常用的內(nèi)存數(shù)據(jù)庫,它提供了豐富的功能,通常用于數(shù)據(jù)緩存和分布式隊列,本文主要介紹了Redis消息隊列、阻塞隊列、延時隊列的實現(xiàn),感興趣的可以了解一下

redis 隊列的優(yōu)點是輕量級,業(yè)務(wù)足夠簡單時不需要使用rabbitMq這樣專業(yè)的消息中間件;缺點是彈出隊列中的元素時,即使該消息處理失敗也無法再次進行消費

Redis隊列 List

 一、普通隊列

可以直接使用Redis的list數(shù)據(jù)類型實現(xiàn)消息隊列,只需簡單的兩個指令lpush和rpop或者rpush和lpop。

  • lpush+rpop:左進右出的隊列
  • rpush+lpop:左出右進的隊列

使用redis的命令來模擬普通隊列

使用lpush命令生產(chǎn)消息:

>lpush queue:single 1
"1"
>lpush queue:single 2
"2"
>lpush queue:single 3
"3"

使用rpop命令消費消息:

>rpop queue:single
"1"
>rpop queue:single
"2"
>rpop queue:single
"3"

使用Java代碼來實現(xiàn)普通隊列:

生產(chǎn)者SingleProducer

package com.morris.redis.demo.queue.single;

import redis.clients.jedis.Jedis;

/**
 * 生產(chǎn)者
 */
public class SingleProducer {

    public static final String SINGLE_QUEUE_NAME = "queue:single";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        for (int i = 0; i < 100; i++) {
            jedis.lpush(SINGLE_QUEUE_NAME, "hello " + i);
        }
        jedis.close();
    }
}

消費者SingleConsumer:

package com.morris.redis.demo.queue.single;

import redis.clients.jedis.Jedis;

import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
 * 消費者
 */
public class SingleConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();
        while (true) {
            String message = jedis.rpop(SingleProducer.SINGLE_QUEUE_NAME);
            if(Objects.nonNull(message)) {
                System.out.println(message);
            } else {
                TimeUnit.MILLISECONDS.sleep(500);
            }
        }
    }
}

上面的代碼已經(jīng)基本實現(xiàn)了普通隊列的生產(chǎn)與消費,但是上述的例子中消息的消費者存在兩個問題:

  • 普通的redis隊列,為了實現(xiàn)業(yè)務(wù),通常會使用while進行循環(huán),這樣的話沒有消息時依舊會頻繁的執(zhí)行循環(huán),造成cpu的空轉(zhuǎn),所以一般會在代碼中增加sleep來解決該問題,但因此又會造成消息延遲問題。
  • 如果生產(chǎn)者速度大于消費者消費速度,消息隊列長度會一直增大,時間久了會占用大量內(nèi)存空間。

二、Redis阻塞隊列

redis隊列提供了 “阻塞式” 拉取消息的命令:BRPOP / BLPOP,這里的 B 指的是阻塞(Block)。
如果隊列為空,消費者在拉取消息時就「阻塞等待」,一旦有新消息過來,就通知消費者立即處理新消息。

使用redis的brpop命令來模擬阻塞隊列

>brpop queue:single 30

使用 BRPOP 這種阻塞式方式拉取消息時,還支持傳入一個「超時時間」,如果設(shè)置為 0,則表示不設(shè)置超時,直到有新消息才返回,否則會在指定的超時時間后返回 NULL

Java阻塞隊列生產(chǎn)者實現(xiàn)如下:

package com.cxh;

import org.junit.jupiter.api.Test;
import redis.clients.jedis.Jedis;

import java.util.concurrent.TimeUnit;

/**
 * 生產(chǎn)者類
 * 生產(chǎn)者每隔600ms生成一條消息
 * */
class MessageProducer extends Thread{
    public static final String MESSAGE_KEY = "message:queue";
    private volatile int count;

    public void putMessage(String mess){
        Jedis jedis = new Jedis("127.0.0.1", 6379);
/*        jedis.auth("123456");*/
        Long size = jedis.lpush(MESSAGE_KEY, mess);
        System.out.println("Put " + Thread.currentThread().getName() + " put message " + count);
        count++;
    }
    @Override
    public synchronized  void run() {
        for(int i = 0 ; i < 1; i++){
            putMessage("message" + count);
            try {
                Thread.sleep(600);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Java阻塞隊列消費者實現(xiàn)如下:

package com.cxh.Component;

import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;

import java.util.Arrays;
import java.util.List;

@Component
public class component implements CommandLineRunner {
    public static final String MESSAGE_KEY = "message:queue";
    @Override
    public void run(String... args) throws Exception {
/*        //todo: 需要執(zhí)行的方法
        System.out.println(Arrays.toString(args));*/
        System.out.println("comsumer 111");
        Jedis jedis = new Jedis("127.0.0.1", 6379);
//        String message = jedis.rpop(MESSAGE_KEY);
//        System.out.println("Pop " + Thread.currentThread().getName() + "comsumer message = " + message); //多線程的時候使用
        while (true){
            List<String> message = jedis.brpop(0, MESSAGE_KEY);
            System.out.println(message.toString());
        }
    }

}

上面的代碼已經(jīng)實現(xiàn)了延遲隊列的生產(chǎn)與消費,需要注意的是:

  • 無法實現(xiàn)一次生產(chǎn)多次消費(使用 pub/sub 發(fā)布訂閱模式,可以實現(xiàn) 1:N 的消息隊列,即一次生產(chǎn),多端消費
  • 阻塞時間結(jié)束后代碼會繼續(xù)向下執(zhí)行
  • 如果設(shè)置的超時時間太長,這個連接太久沒有活躍過,可能會被 Redis Server 判定為無效連接,之后 Redis Server 會強制把這個客戶端踢下線。所以,客戶端要有處理機制。
    實際項目中redis連接超時時間遠大于20s,因此正常情況不會出現(xiàn)redis超時問題。以防萬一增加redis異常捕獲,出現(xiàn)異常時殺掉當(dāng)前進程,同時supervisord會自動重新拉起該進程

三、Redis延遲隊列

zset 會按 score 進行排序,如果 score 代表想要執(zhí)行時間的時間戳。在某個時間將它插入zset集合中,它會按照時間戳大小進行排序,也就是對執(zhí)行時間前后進行排序。

起一個死循環(huán)線程不斷地進行取第一個key值,如果當(dāng)前時間戳大于等于該key值的socre就將它取出來進行消費刪除,可以達到延時執(zhí)行的目的。

下面使用redis的zset來模擬延時隊列

命令生產(chǎn)者:

127.0.0.1:6379> zadd queue:delay 1 order1 2 order2 3 order3
(integer) 0

命令消費者:

127.0.0.1:6379> zrange queue:delay 0 0 withscores
1) "order1"
2) "1"
127.0.0.1:6379> zrem queue:delay order1
(integer) 1

使用Java代碼來實現(xiàn)普通隊列:

生產(chǎn)者DelayProducer :

package com.morris.redis.demo.queue.delay;

import redis.clients.jedis.Jedis;

import java.util.Date;
import java.util.Random;

/**
 * 生產(chǎn)者
 */
public class DelayProducer {

    public static final String DELAY_QUEUE_NAME = "queue:delay";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        long now = new Date().getTime();
        Random random = new Random();
        for (int i = 0; i < 10; i++) {
            int second = random.nextInt(30); // 隨機訂單失效時間
            jedis.zadd(DELAY_QUEUE_NAME, now + second * 1000, "order"+i);
        }
        jedis.close();
    }
}

消費者DelayConsumer :

package com.morris.redis.demo.queue.delay;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Tuple;

import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
 * 消費者
 */
public class DelayConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();
        while (true) {
            long now = new Date().getTime();
            Set<Tuple> tupleSet = jedis.zrangeWithScores(DelayProducer.DELAY_QUEUE_NAME, 0, 0);
            if(tupleSet.isEmpty()) {
                TimeUnit.MILLISECONDS.sleep(500);
            } else {
                for (Tuple tuple : tupleSet) {
                    Double score = tuple.getScore();
                    long time = score.longValue();
                    if(time < now) {
                        jedis.zrem(DelayProducer.DELAY_QUEUE_NAME, tuple.getElement());
                        System.out.println("order[" + tuple.getElement() +"] is timeout at " + time);
                    } else {
                        TimeUnit.MILLISECONDS.sleep(500);
                    }
                    break;
                }
            }
        }
    }
}

應(yīng)用場景

延時隊列在項目中的應(yīng)用還是比較多的,尤其像電商類平臺:

  • 12306 下單成功后,在半個小時內(nèi)沒有支付,自動取消訂單。
  • 如果訂單一直處于某一個未完結(jié)狀態(tài)時,及時處理關(guān)單,并退還庫存。
  • 淘寶新建商戶一個月內(nèi)還沒上傳商品信息,將凍結(jié)商鋪等。
  • 會議預(yù)定系統(tǒng),在預(yù)定會議開始前半小時通知所有預(yù)定該會議的用戶。
  • 安全工單超過 24 小時未處理,則自動拉企業(yè)群提醒相關(guān)責(zé)任人。
  • 用戶下單外賣以后,距離超時時間還有 10 分鐘時提醒外賣小哥即將超時。
  • 外賣平臺發(fā)送訂餐通知,下單成功后 60s 給用戶推送短信。

到此這篇關(guān)于Redis消息隊列、阻塞隊列、延時隊列的實現(xiàn)的文章就介紹到這了,更多相關(guān)Redis消息隊列、阻塞隊列、延時隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家! 

相關(guān)文章

  • Redis并發(fā)訪問問題詳細講解

    Redis并發(fā)訪問問題詳細講解

    本文主要介紹了Redis如何應(yīng)對并發(fā)訪問,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2022-12-12
  • Redis分片集群的實現(xiàn)示例

    Redis分片集群的實現(xiàn)示例

    本文介紹了搭建Redis分片集群,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2024-12-12
  • Redis簡介

    Redis簡介

    Redis是一個開源,高級的鍵值存儲和一個適用的解決方案,用于構(gòu)建高性能,可擴展的Web應(yīng)用程序。關(guān)于redis的相關(guān)知識大家可以通過本教程學(xué)習(xí)
    2017-05-05
  • AOP?Redis自定義注解實現(xiàn)細粒度接口IP訪問限制

    AOP?Redis自定義注解實現(xiàn)細粒度接口IP訪問限制

    這篇文章主要為大家介紹了AOP?Redis自定義注解實現(xiàn)細粒度接口IP訪問限制,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-10-10
  • Redis實現(xiàn)布隆過濾器的代碼詳解

    Redis實現(xiàn)布隆過濾器的代碼詳解

    布隆過濾器(Bloom?Filter)是Redis?4.0版本提供的新功能,它被作為插件加載到Redis服務(wù)器中,給Redis提供強大的去重功能,本文將給大家詳細介紹一下Redis布隆過濾器,文中有相關(guān)的代碼示例,需要的朋友可以參考下
    2023-07-07
  • 解決Redis啟動警告問題

    解決Redis啟動警告問題

    這篇文章介紹了解決Redis啟動警告問題的方法,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2022-02-02
  • NestJS+Redis實現(xiàn)緩存步驟詳解

    NestJS+Redis實現(xiàn)緩存步驟詳解

    這篇文章主要介紹了NestJS+Redis實現(xiàn)緩存,本文分步驟給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-08-08
  • 解讀Redis秒殺優(yōu)化方案(阻塞隊列+基于Stream流的消息隊列)

    解讀Redis秒殺優(yōu)化方案(阻塞隊列+基于Stream流的消息隊列)

    該文章介紹了使用Redis的阻塞隊列和Stream流的消息隊列來優(yōu)化秒殺系統(tǒng)的方案,通過將秒殺流程拆分為兩條流水線,使用Redis緩存緩解數(shù)據(jù)庫壓力,并結(jié)合Lua腳本進行原子性判斷,使用阻塞隊列和消息隊列異步處理訂單,有效提高了系統(tǒng)的并發(fā)處理能力和可用性
    2025-02-02
  • Redis如何實現(xiàn)分布式鎖

    Redis如何實現(xiàn)分布式鎖

    這篇文章主要介紹了Redis如何實現(xiàn)分布式鎖問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-10-10
  • 基于Redis實現(xiàn)每日登錄失敗次數(shù)限制

    基于Redis實現(xiàn)每日登錄失敗次數(shù)限制

    這篇文章主要介紹了通過redis實現(xiàn)每日登錄失敗次數(shù)限制的問題,通過redis記錄登錄失敗的次數(shù),以用戶的username為key,本文給出了實例代碼,需要的朋友可以參考下
    2019-08-08

最新評論