欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Redisson延遲隊(duì)列執(zhí)行流程源碼解析

 更新時(shí)間:2022年09月26日 17:10:28   作者:老程不禿  
這篇文章主要為大家介紹了Redisson延遲隊(duì)列執(zhí)行流程源碼解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

引言

在實(shí)際分布式項(xiàng)目中延遲任務(wù)一般不會(huì)使用JDK自帶的延遲隊(duì)列,因?yàn)樗腔贘VM內(nèi)存存儲(chǔ),沒(méi)有持久化操作,所以當(dāng)服務(wù)重啟后就會(huì)丟失任務(wù)。

在項(xiàng)目中可以使用MQ死信隊(duì)列或redisson延遲隊(duì)列進(jìn)行處理延遲任務(wù),本篇文章將講述redisson延遲隊(duì)列的使用demo和其執(zhí)行源碼。

demo示例

通過(guò)腳手架創(chuàng)建一個(gè)簡(jiǎn)易springboot項(xiàng)目,引入redisson的maven依賴,并簡(jiǎn)單配置redisson連接屬性。

    <!-- redisson引用 -->
    <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson</artifactId>
        <version>3.16.6</version>
    </dependency>
@Configuration
public class RedissonConfig {
    @Value("${spring.redis.host}")
    private String host;
    @Value("${spring.redis.port}")
    private String port;
    /**
     * 獲取redissonClient實(shí)例
     *
     * @return
     * @throws Exception
     */
    @Bean
    public RedissonClient getRedisson() {
        Config config = new Config();
        String address = "redis://" + host + ":" + port;
        config.useSingleServer().setAddress(address);
        return Redisson.create(config);
    }
}

定義一個(gè)redisson延遲隊(duì)列插入和獲取任務(wù)處理類RedissonQueueHandle,通過(guò)控制spring的bean加載周期開(kāi)啟獨(dú)立線程獲取延遲任務(wù)。這里獲取延遲任務(wù)使用了三種方法,除了第一種阻塞式獲取任務(wù)方法外,其他兩種方法都不是百分比按照延遲參數(shù)獲取到任務(wù),因?yàn)槭菚r(shí)間間隔定時(shí)循環(huán)獲取延遲任務(wù)。

/**
 * redisson延遲隊(duì)列處理器
 *
 * @author zrh
 */
@Slf4j
@Component
public class RedissonQueueHandle implements InitializingBean {
    private final RBlockingQueue<RedisDataEntity<?>> queue;
    private final RDelayedQueue<RedisDataEntity<?>> delayedQueue;
    public RedissonQueueHandle (RedissonClient client) {
        this.queue = client.getBlockingQueue("redisson:queue");
        this.delayedQueue = client.getDelayedQueue(queue);
    }
    @Override
    public void afterPropertiesSet () {
        // 開(kāi)一個(gè)線程阻塞式獲取任務(wù)
        thread();
        // 使用netty時(shí)間輪循環(huán)獲取任務(wù)
//        watchDog(new HashedWheelTimer());
        // 使用線程池定時(shí)獲取任務(wù)
//        schedule();
    }
    private void thread () {
        new Thread(() -> {
            while (true) {
                try {
                    RedisDataEntity entity = queue.take();
                    log.info("本次獲取數(shù)據(jù):{},耗時(shí):{}", entity, System.currentTimeMillis() - entity.getTime());
                } catch (Exception e) {
                }
            }
        }, "zrh").start();
    }
    private void watchDog (final HashedWheelTimer timer) {
        timer.newTimeout(timeout -> {
            RedisDataEntity entity = queue.poll();
            if (null != entity) {
                log.info("本次獲取數(shù)據(jù):{},耗時(shí):{}", entity, System.currentTimeMillis() - entity.getTime());
            }
            watchDog(timer);
        }, 3, TimeUnit.SECONDS);
    }
    private void schedule () {
        Executors.newScheduledThreadPool(1).scheduleWithFixedDelay(() -> {
            RedisDataEntity entity = queue.poll();
            if (null != entity) {
                log.info("本次獲取數(shù)據(jù):{},耗時(shí):{}", entity, System.currentTimeMillis() - entity.getTime());
            }
        }, 5, 5, TimeUnit.SECONDS);
    }
    /**
     * 放入redis,定時(shí)過(guò)期
     *
     * @param entity
     */
    public void offer (RedisDataEntity entity) {
        try {
            delayedQueue.offer(entity, entity.getExpire(), TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            log.error("放入redis延遲隊(duì)列異常", e);
        }
    }
}

放入redisson延遲隊(duì)列可以是字符串也可以是對(duì)象RedisDataEntity,因?yàn)橛羞M(jìn)行IO磁盤存儲(chǔ)操作,所以必須實(shí)現(xiàn)Serializable序列化接口。

/**
 * @Author: ZRH
 * @Date: 2022/1/10 11:54
 */
@Data
public class RedisDataEntity<T> implements Serializable {
    /**
     * 數(shù)據(jù)
     */
    private final T data;
    /**
     * 過(guò)期時(shí)間(單位:毫秒)
     */
    private final Long expire;
    /**
     * 添加時(shí)間
     */
    private final Long time;
    public RedisDataEntity (T data, Long expire, Long time) {
        this.data = data;
        this.expire = expire;
        this.time = time;
    }
}

然后開(kāi)一個(gè)插入數(shù)據(jù)接口:

/**
 * @Author: ZRH
 * @Date: 2022/1/10 11:45
 */
@Slf4j
@RestController
public class IndexController {
    private final RedissonQueueHandle redisHandle;
    public IndexController (RedissonQueueHandle redisHandle) {
        this.redisHandle = redisHandle;
    }
    @PostMapping("redissonQueue")
    public String redissonQueue (@RequestParam String data, @RequestParam Long expire) {
        RedisDataEntity entity = new RedisDataEntity(data, expire, System.currentTimeMillis());
        log.info("本次添加數(shù)據(jù):{}", entity);
        redisHandle.offer(entity);
        return "ok";
    }
}
訪問(wèn)接口設(shè)置延遲30秒:http://localhost:8802/redissonQueue?data=a&expire=30000,打印結(jié)果如下
2022-01-14 14:21:52.140  INFO 10808 --- [nio-8802-exec-1] c.r.web.controller.IndexController       : 本次添加數(shù)據(jù):RedisDataEntity(data=a, expire=30000, time=1642141312135)
2022-01-14 14:21:52.887  INFO 10808 --- [nio-8802-exec-2] c.r.web.controller.IndexController       : 本次添加數(shù)據(jù):RedisDataEntity(data=a, expire=30000, time=1642141312887)
2022-01-14 14:22:22.240  INFO 10808 --- [            zrh] c.r.web.redis.RedissonQueueHandle        : 本次獲取數(shù)據(jù):RedisDataEntity(data=a, expire=30000, time=1642141312135),耗時(shí):30105
2022-01-14 14:22:22.914  INFO 10808 --- [            zrh] c.r.web.redis.RedissonQueueHandle        : 本次獲取數(shù)據(jù):RedisDataEntity(data=a, expire=30000, time=1642141312887),耗時(shí):30027

初始執(zhí)行流程源碼解析 redisson延遲隊(duì)列最終都是和redis服務(wù)進(jìn)行交互的,那可以使用monitor命令查看redis中執(zhí)行了哪些命令,這樣對(duì)了解其執(zhí)行流程有很大幫助。

上圖是項(xiàng)目啟動(dòng)時(shí),對(duì)redis發(fā)送的幾個(gè)指令

"SUBSCRIBE":訂閱隊(duì)列"redisson_delay_queue_channel:{redisson:queue}",里面有個(gè)定時(shí)任務(wù)通過(guò)該隊(duì)列獲取數(shù)據(jù)

"zrangebyscore":獲取"redisson_delay_queue_timeout:{redisson:queue}"集合中排序score值在0到1642148406748(當(dāng)前時(shí)間戳)內(nèi)的前100元素

"zrange":獲取"redisson_delay_queue_timeout:{redisson:queue}"集合中第一個(gè)元素,用于獲取下一個(gè)元素的到期時(shí)間

"BLPOP":取出并移除"redisson:queue"列表里的第一個(gè)元素,如果沒(méi)有元素就一直等待阻塞。所以這里會(huì)阻塞著

"rpush":如果指令"zrangebyscore"獲取到了元素,那就將元素推送到隊(duì)列redisson:queue內(nèi)

"lrem":如果指令"zrangebyscore"獲取到了元素,那就刪除隊(duì)列"redisson_delay_queue:{redisson:queue}內(nèi)元素為v的第一個(gè)元素

SUBSCRIBE指令

進(jìn)入RedissonDelayedQueue延遲隊(duì)列的構(gòu)造函數(shù),里面就有上述執(zhí)行指令的lua腳本命令(為了不影響篇幅刪了一部分代碼,下同):

    ......
    protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {
        super(codec, commandExecutor, name);
        // list結(jié)構(gòu),用于延遲隊(duì)列的訂閱發(fā)布
        channelName = prefixName("redisson_delay_queue_channel", getRawName());
        // list結(jié)構(gòu),存放元素原始順序
        queueName = prefixName("redisson_delay_queue", getRawName());
        // zset結(jié)構(gòu),存放未到期元素,并按照過(guò)期時(shí)間進(jìn)行排好序
        timeoutSetName = prefixName("redisson_delay_queue_timeout", getRawName());
        QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {
            @Override
            protected RFuture<Long> pushTaskAsync() {
                return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
                        "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
                      + "if #expiredValues > 0 then "
                          + "for i, v in ipairs(expiredValues) do "
                              + "local randomId, value = struct.unpack('dLc0', v);"
                              + "redis.call('rpush', KEYS[1], value);"
                              + "redis.call('lrem', KEYS[3], 1, v);"
                          + "end; "
                          + "redis.call('zrem', KEYS[2], unpack(expiredValues));"
                      + "end; "
                        // get startTime from scheduler queue head task
                      + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
                      + "if v[1] ~= nil then "
                         + "return v[2]; "
                      + "end "
                      + "return nil;",
                      Arrays.<Object>asList(getRawName(), timeoutSetName, queueName),
                      System.currentTimeMillis(), 100);
            }
            @Override
            protected RTopic getTopic() {
                return RedissonTopic.createRaw(LongCodec.INSTANCE, commandExecutor, channelName);
            }
        };
        queueTransferService.schedule(queueName, task);        
        this.queueTransferService = queueTransferService;
    }

繼續(xù)跟進(jìn)queueTransferService.schedule(queueName, task)方法,因?yàn)榈谝淮芜M(jìn)入tasks集合,所以最后執(zhí)行start()方法:

    ......
    private final ConcurrentMap<String, QueueTransferTask> tasks = new ConcurrentHashMap<>();
    public synchronized void schedule(String name, QueueTransferTask task) {
        QueueTransferTask oldTask = tasks.putIfAbsent(name, task);
        if (oldTask == null) {
            task.start();
        } else {
            oldTask.incUsage();
        }
    }

進(jìn)入QueueTransferTask,繼續(xù)跟進(jìn)schedulerTopic.addListener(...)方法:

    ......
    private int messageListenerId;
    private int statusListenerId;
    public void start() {
        RTopic schedulerTopic = getTopic();
        statusListenerId = schedulerTopic.addListener(new BaseStatusListener() {
            @Override
            public void onSubscribe(String channel) {
                pushTask();
            }
        });
        messageListenerId = schedulerTopic.addListener(Long.class, new MessageListener<Long>() {
            @Override
            public void onMessage(CharSequence channel, Long startTime) {
                scheduleTask(startTime);
            }
        });
    }

然后會(huì)進(jìn)入PublishSubscribeService.subscribe(...)方法:

注意:這里繼續(xù)調(diào)用重載方法subscribe(...)時(shí)設(shè)置了參數(shù):PubSubType.SUBSCRIBE

    ......
    public RFuture<PubSubConnectionEntry> subscribe(Codec codec, ChannelName channelName, RedisPubSubListener<?>... listeners) {
        return subscribe(PubSubType.SUBSCRIBE, codec, channelName, getEntry(channelName), listeners);
    }
    private RFuture<PubSubConnectionEntry> subscribe(PubSubType type, Codec codec, ChannelName channelName, MasterSlaveEntry entry, RedisPubSubListener<?>... listeners) {
        RPromise<PubSubConnectionEntry> promise = new RedissonPromise<>();
        AsyncSemaphore lock = getSemaphore(channelName);
        // 創(chuàng)建一個(gè)線程任務(wù)放入lock對(duì)象
        lock.acquire(() -> {
            if (promise.isDone()) {
                lock.release();
                return;
            }
            subscribe(codec, channelName, entry, promise, type, lock, listeners);
        });
        return promise;
    }

AsyncSemaphore對(duì)象的acquire(...)方法會(huì)把線程任務(wù)放入自身隊(duì)列l(wèi)isteners里,然后依次讀取執(zhí)行線程任務(wù);

public class AsyncSemaphore {
    private final AtomicInteger counter;
    private final Queue<Runnable> listeners = new ConcurrentLinkedQueue<>();
    public void acquire(Runnable listener) {
        listeners.add(listener);
        tryRun();
    }
    private void tryRun() {
        if (counter.decrementAndGet() >= 0) {
            Runnable listener = listeners.poll();
            if (listener == null) {
                counter.incrementAndGet();
                return;
            }
            listener.run();
        } else {
            if (counter.incrementAndGet() > 0) {
                tryRun();
            }
        }
    }   
}

然后繼續(xù)跟進(jìn)方法subscribe(codec, channelName, entry, promise, type, lock, listeners):

    .....
    private void subscribe(Codec codec, ChannelName channelName, MasterSlaveEntry entry,
                            RPromise<PubSubConnectionEntry> promise, PubSubType type,
                            AsyncSemaphore lock, RedisPubSubListener<?>... listeners) {
        PubSubConnectionEntry connEntry = name2PubSubConnection.get(new PubSubKey(channelName, entry));
        if (connEntry != null) {
            addListeners(channelName, promise, type, lock, connEntry, listeners);
            return;
        }
        freePubSubLock.acquire(() -> {
            if (promise.isDone()) {
                lock.release();
                freePubSubLock.release();
                return;
            }
            MasterSlaveEntry msEntry = Optional.ofNullable(connectionManager.getEntry(entry.getClient())).orElse(entry);
            // 第一次進(jìn)入entry2PubSubConnection集合為null,所以使用默認(rèn)值,最后 freeEntry == null
            PubSubEntry freePubSubConnections = entry2PubSubConnection.getOrDefault(msEntry, new PubSubEntry());
            PubSubConnectionEntry freeEntry = freePubSubConnections.getEntries().peek();
            if (freeEntry == null) {
                freePubSubLock.release();
                connect(codec, channelName, msEntry, promise, type, lock, listeners);
                return;
            }
            ......
        });
    }

繼續(xù)跟進(jìn)方法connect(codec, channelName, msEntry, promise, type, lock, listeners):

    ......
    private void connect(Codec codec, ChannelName channelName,
                         MasterSlaveEntry msEntry, RPromise<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener<?>... listeners) {
        RFuture<RedisPubSubConnection> connFuture = nextPubSubConnection(msEntry, channelName);
        promise.onComplete((res, e) -> {...});
        connFuture.onComplete((conn, ex) -> {
            if (ex != null) {...}
            freePubSubLock.acquire(() -> {
                PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
                int remainFreeAmount = entry.tryAcquire();
                PubSubKey key = new PubSubKey(channelName, msEntry);
                PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(key, entry);
                if (oldEntry != null) {...}
                if (remainFreeAmount > 0) {
                    addFreeConnectionEntry(channelName, entry);
                }
                freePubSubLock.release();
                RFuture<Void> subscribeFuture = addListeners(channelName, promise, type, lock, entry, listeners);
                ChannelFuture future;
                // 這里通過(guò)上述重載方法傳遞的參數(shù)可知,最后走else邏輯
                if (PubSubType.PSUBSCRIBE == type) {
                    future = entry.psubscribe(codec, channelName);
                } else {
                    future = entry.subscribe(codec, channelName);
                }
                future.addListener((ChannelFutureListener) future1 -> {
                    if (!future1.isSuccess()) {...}
                    connectionManager.newTimeout(timeout ->
                            subscribeFuture.cancel(false),
                            config.getTimeout(), TimeUnit.MILLISECONDS);
                });
            });
        });
    }

該方法中支線內(nèi)容不表述,主要看方法 entry.subscribe(codec, channelName),最后進(jìn)入RedisPubSubConnection.async(...)方法,就是發(fā)送SUBSCRIBE指令的流程:

zrangebyscore和zrange指令

訂閱指令SUBSCRIBE發(fā)出后,在QueueTransferTask.start()方法里添加的監(jiān)聽(tīng)器觸發(fā)了,就會(huì)執(zhí)行pushTask()

pushTaskAsync()方法執(zhí)行完(lua腳本執(zhí)行完),就會(huì)開(kāi)啟一個(gè)定時(shí)任務(wù)scheduleTask()

    ......
    protected abstract RTopic getTopic();
    protected abstract RFuture<Long> pushTaskAsync();
    private void pushTask() {
        // 這個(gè)抽象方法在之前構(gòu)建RedissonDelayedQueue對(duì)象的構(gòu)造函數(shù)里有實(shí)現(xiàn),最后返回元素過(guò)期時(shí)間
        RFuture<Long> startTimeFuture = pushTaskAsync();
        startTimeFuture.onComplete((res, e) -> {
            if (e != null) {
                if (e instanceof RedissonShutdownException) {
                    return;
                }
                log.error(e.getMessage(), e);
                scheduleTask(System.currentTimeMillis() + 5 * 1000L);
                return;
            }
            if (res != null) {
                scheduleTask(res);
            }
        });
    }

BLPOP指令

當(dāng)RedissonDelayedQueue延遲隊(duì)列構(gòu)造完成后,會(huì)調(diào)用延遲隊(duì)列的take()方法獲取延遲任務(wù),然后會(huì)進(jìn)入RedissonBlockingQueue.takeAsync()方法:

    ......
    @Override
    public RFuture<V> takeAsync() {
        return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BLPOP_VALUE, getRawName(), 0);
    }
    /*
     * (non-Javadoc)
     * @see java.util.concurrent.BlockingQueue#take()
     */
    @Override
    public V take() throws InterruptedException {
        return commandExecutor.getInterrupted(takeAsync());
    }
    ......

注意這里的參數(shù)其值為 BLPOP,很明顯這里就是和我們要找的BLPOP指令有關(guān),所以這里其實(shí)就是客戶端通過(guò)BLPOP指令阻塞式獲取值。在客戶端開(kāi)個(gè)線程一直循環(huán)阻塞獲取元素即可;

看下源碼繼續(xù)向下進(jìn)入CommandAsyncService.writeAsync(...)方法,然后繼續(xù)向下進(jìn)入RedisExecutor.execute()方法:

    ......
    public void execute() {
        if (mainPromise.isCancelled()) {...}
        if (!connectionManager.getShutdownLatch().acquire()) {...}
        codec = getCodec(codec);
        // 獲取連接
        RFuture<RedisConnection> connectionFuture = getConnection();
        RPromise<R> attemptPromise = new RedissonPromise<>();
        mainPromiseListener = (r, e) -> {...};
        if (attempt == 0) {...}
        scheduleRetryTimeout(connectionFuture, attemptPromise);
        connectionFuture.onComplete((connection, e) -> {
            if (connectionFuture.isCancelled()) {...}
            if (!connectionFuture.isSuccess()) {...}
            // 連接獲取成功就執(zhí)行當(dāng)前方法
            sendCommand(attemptPromise, connection);
            writeFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    checkWriteFuture(writeFuture, attemptPromise, connection);
                }
            });
        });
        attemptPromise.onComplete((r, e) -> {...});
    }

該方法里一些支線方法按下不表。中間有個(gè)超時(shí)重試機(jī)制,使用netty的時(shí)間輪,不是重點(diǎn)也就不表述了。

先獲取寫入操作連接對(duì)象任務(wù),然后進(jìn)入方法sendCommand(attemptPromise, connection)發(fā)送

指令指令:"BLPOP",參數(shù):"redisson:queue" "0"

offer添加任務(wù)流程源碼解析 項(xiàng)目啟動(dòng)完成后,添加一個(gè)延遲任務(wù)到redis中,查看redis中所執(zhí)行的指令:

然后跟進(jìn)插入元素offer方法,進(jìn)入RedissonDelayedQueue.offerAsync()方法內(nèi),如下所示:

    ......
    @Override
    public void offer(V e, long delay, TimeUnit timeUnit) {
        get(offerAsync(e, delay, timeUnit));
    }
    @Override
    public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {
        if (delay < 0) {
            throw new IllegalArgumentException("Delay can't be negative");
        }
        long delayInMs = timeUnit.toMillis(delay);
        long timeout = System.currentTimeMillis() + delayInMs;
        long randomId = ThreadLocalRandom.current().nextLong();
        return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_VOID,
                "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" 
              + "redis.call('zadd', KEYS[2], ARGV[1], value);"
              + "redis.call('rpush', KEYS[3], value);"
              // if new object added to queue head when publish its startTime 
              // to all scheduler workers 
              + "local v = redis.call('zrange', KEYS[2], 0, 0); "
              + "if v[1] == value then "
                 + "redis.call('publish', KEYS[4], ARGV[1]); "
              + "end;",
              Arrays.<Object>asList(getRawName(), timeoutSetName, queueName, channelName),
              timeout, randomId, encode(e));
    }

其中很明顯一長(zhǎng)串的腳本命令就是在redis中執(zhí)行的指令,基本流程比較簡(jiǎn)單:

"zadd":這是向zset集合"redisson_delay_queue_timeout:{redisson:queue}"里添加元素?cái)?shù)據(jù)(此數(shù)據(jù)被處理過(guò),不用管其結(jié)構(gòu)),排序值為當(dāng)前時(shí)間戳+延遲時(shí)間

"rpush":把元素?cái)?shù)據(jù)推送到list隊(duì)列"redisson:queue"

"zrange":獲取zset集合"redisson_delay_queue_timeout:{redisson:queue}"中排好序的第一個(gè)元素

"publish":如果上述獲取的元素是本次插入的元素,那就發(fā)布通知隊(duì)列"redisson_delay_queue_channel:{redisson:queue}",內(nèi)容為當(dāng)前元素的過(guò)期時(shí)間,這樣做是為了減少本次元素到期的時(shí)間差。

最后定時(shí)器源碼解析

定時(shí)器任務(wù)主要是通過(guò)監(jiān)聽(tīng)器監(jiān)聽(tīng)到了有新的客戶端訂閱或元素通知發(fā)布出來(lái)時(shí),就會(huì)執(zhí)行pushTask()和scheduleTask(...)方法:

    ......
    private int messageListenerId;
    private int statusListenerId;
    public void start() {
        RTopic schedulerTopic = getTopic();
        // 當(dāng)有新的客戶端訂閱schedulerTopic,就是觸發(fā)執(zhí)行pushTask()方法
        statusListenerId = schedulerTopic.addListener(new BaseStatusListener() {
            @Override
            public void onSubscribe(String channel) {
                pushTask();
            }
        });
        // 當(dāng)redis有新的消息通知,就會(huì)觸發(fā)scheduleTask(...)方法,startTime為上述中publish通知的元素過(guò)期時(shí)間
        messageListenerId = schedulerTopic.addListener(Long.class, new MessageListener<Long>() {
            @Override
            public void onMessage(CharSequence channel, Long startTime) {
                scheduleTask(startTime);
            }
        });
    }

pushTask()方法是對(duì)redis延遲隊(duì)列進(jìn)行操作的方法,scheduleTask(...)是netty時(shí)間輪來(lái)控制調(diào)用pushTask()方法,所以pushTask()和scheduleTask()互相調(diào)用。

    ......
    private void scheduleTask(final Long startTime) {
        TimeoutTask oldTimeout = lastTimeout.get();
        if (startTime == null) {...}
        if (oldTimeout != null) {...}
        long delay = startTime - System.currentTimeMillis();
        if (delay > 10) {
            Timeout timeout = connectionManager.newTimeout(new TimerTask() {                    
                @Override
                public void run(Timeout timeout) throws Exception {
                    pushTask();
                    TimeoutTask currentTimeout = lastTimeout.get();
                    if (currentTimeout.getTask() == timeout) {
                        lastTimeout.compareAndSet(currentTimeout, null);
                    }
                }
            }, delay, TimeUnit.MILLISECONDS);
            if (!lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout))) {
                timeout.cancel();
            }
        } else {
            pushTask();
        }
    }
    protected abstract RTopic getTopic();
    protected abstract RFuture<Long> pushTaskAsync();
    private void pushTask() {
        RFuture<Long> startTimeFuture = pushTaskAsync();
        startTimeFuture.onComplete((res, e) -> {
            if (e != null) {
                if (e instanceof RedissonShutdownException) {
                    return;
                }
                log.error(e.getMessage(), e);
                scheduleTask(System.currentTimeMillis() + 5 * 1000L);
                return;
            }
            if (res != null) {
                scheduleTask(res);
            }
        });
    }

總結(jié):

當(dāng)有新的客戶端進(jìn)行訂閱,就調(diào)用pushTask()方法拉取數(shù)據(jù)放入阻塞隊(duì)列。當(dāng)有信的消息進(jìn)行發(fā)布,就調(diào)用scheduleTask(...)方法,并根據(jù)其過(guò)期時(shí)間判斷是通過(guò)時(shí)間輪延遲調(diào)用還是立即調(diào)用pushTask()方法。最后 redisson延遲隊(duì)列的源碼相對(duì)而言其實(shí)是比較抽象復(fù)雜的,感覺(jué)沒(méi)有其分布式鎖這塊源碼容易解析。但仔細(xì)用心去看,跟著主要方法走還是可以了解其執(zhí)行流程。

以上就是Redisson延遲隊(duì)列執(zhí)行流程源碼解析的詳細(xì)內(nèi)容,更多關(guān)于Redisson延遲隊(duì)列執(zhí)行流程的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • SpringBoot 自定義注解異步記錄復(fù)雜日志詳解

    SpringBoot 自定義注解異步記錄復(fù)雜日志詳解

    這篇文章主要為大家介紹了SpringBoot 自定義注解異步記錄復(fù)雜日志詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-09-09
  • springboot @RequiredArgsConstructor的概念與使用方式

    springboot @RequiredArgsConstructor的概念與使用方式

    這篇文章主要介紹了springboot @RequiredArgsConstructor的概念與使用方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-09-09
  • SpringBoot調(diào)用第三方WebService接口的操作技巧(.wsdl與.asmx類型)

    SpringBoot調(diào)用第三方WebService接口的操作技巧(.wsdl與.asmx類型)

    這篇文章主要介紹了SpringBoot調(diào)第三方WebService接口的操作代碼(.wsdl與.asmx類型 ),本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-08-08
  • 為什么ConcurrentHashMap的key value不能為null,map可以?

    為什么ConcurrentHashMap的key value不能為null,map可以?

    這篇文章主要介紹了為什么ConcurrentHashMap的key value不能為null,map可以呢?具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2021-01-01
  • idea快速搭建springboot項(xiàng)目的操作方法

    idea快速搭建springboot項(xiàng)目的操作方法

    下面小編就為大家分享一篇idea快速搭建springboot項(xiàng)目的操作方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2017-12-12
  • 通過(guò)面試題解析 Java 類加載機(jī)制

    通過(guò)面試題解析 Java 類加載機(jī)制

    類加載是 Java 語(yǔ)言的一個(gè)創(chuàng)新,也是 Java 語(yǔ)言流行的重要原因之一。它使得 Java 類可以被動(dòng)態(tài)加載到 Java 虛擬機(jī)中并執(zhí)行。下面小編和大家來(lái)一起學(xué)習(xí)一下吧
    2019-05-05
  • Spring中的AOP動(dòng)態(tài)代理源碼詳解

    Spring中的AOP動(dòng)態(tài)代理源碼詳解

    這篇文章主要介紹了Spring中的AOP動(dòng)態(tài)代理源碼詳解,AOP即面向切面編程也稱面向方面編程,它是面向?qū)ο缶幊蘋OP的一種補(bǔ)充,目前已成為一種比較成熟的編程方式,本文就其源碼進(jìn)行解析,需要的朋友可以參考下
    2023-09-09
  • 使用fileupload組件實(shí)現(xiàn)文件上傳功能

    使用fileupload組件實(shí)現(xiàn)文件上傳功能

    這篇文章主要為大家詳細(xì)介紹了使用fileupload實(shí)現(xiàn)文件上傳功能,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2019-10-10
  • Java多線程局域網(wǎng)聊天室的實(shí)現(xiàn)

    Java多線程局域網(wǎng)聊天室的實(shí)現(xiàn)

    在學(xué)習(xí)了一個(gè)學(xué)期的java以后,搞了一個(gè)多線程的聊天室,熟悉了一下服務(wù)器和客戶機(jī)的操作。感興趣的小伙伴們可以參考一下
    2021-06-06
  • Spring Boot配置Swagger的實(shí)現(xiàn)代碼

    Spring Boot配置Swagger的實(shí)現(xiàn)代碼

    這篇文章主要介紹了Spring Boot配置Swagger的實(shí)現(xiàn)代碼,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2018-12-12

最新評(píng)論