欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

java實現(xiàn)延遲/超時/定時問題

 更新時間:2025年04月10日 10:31:03   作者:yzhSWJ  
這篇文章主要介紹了java實現(xiàn)延遲/超時/定時問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教

java實現(xiàn)延遲/超時/定時

java 每間隔5秒執(zhí)行一次,一共執(zhí)行5次然后結(jié)束

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Main {
    public static void main(String[] args) {
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

        AtomicInteger count = new AtomicInteger(0);

        Runnable task = () -> {
            System.out.println("Task executed at " + System.currentTimeMillis());
            // 這里放置你的任務(wù)代碼
            // ...

            // 當執(zhí)行次數(shù)達到5次后,關(guān)閉線程池
            if (count.getAndIncrement() == 4) { // 注意:計數(shù)從0開始,所以這里是4
                executor.shutdown();
            }
        };

        // 每隔5秒執(zhí)行一次
        executor.scheduleAtFixedRate(task, 0, 5, TimeUnit.SECONDS);
        
        // 確保線程池結(jié)束,避免程序無法退出
        try {
            executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
        } catch (InterruptedException e) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

任務(wù)代碼執(zhí)行時間超過5秒會怎樣

在Java中,如果你使用ScheduledExecutorService.scheduleAtFixedRate()方法,并且你的任務(wù)執(zhí)行時間超過了預(yù)定的周期(在這個例子中是5秒),那么下一次任務(wù)將在當前任務(wù)完成后立即開始執(zhí)行,而不是等待下一個完整周期。

也就是說,如果某個任務(wù)執(zhí)行了6秒,則下一次任務(wù)不會等到10秒(即上一個任務(wù)結(jié)束后的5秒)才開始,而是在第一個任務(wù)完成之后立刻啟動。因此,任務(wù)的執(zhí)行將不再嚴格遵循每隔5秒執(zhí)行一次的規(guī)律,而是盡量保持每兩次任務(wù)執(zhí)行之間的開始時間間隔為5秒。

這可能導(dǎo)致連續(xù)的任務(wù)執(zhí)行時間重疊,尤其是在任務(wù)執(zhí)行時間不穩(wěn)定或持續(xù)較長的情況下。如果你希望每次任務(wù)都至少間隔5秒,即使前一個任務(wù)超時也要等到下個周期再執(zhí)行,你應(yīng)該考慮使用ScheduledExecutorService.scheduleWithFixedDelay()方法替代。

如果你想確保每次任務(wù)執(zhí)行之間至少有5秒的間隔,即使前一個任務(wù)超時也等待到下個周期再開始執(zhí)行,你可以使用ScheduledExecutorService.scheduleWithFixedDelay()方法替代。以下是修改后的代碼

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Main {
    public static void main(String[] args) {
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

        AtomicInteger count = new AtomicInteger(0);

        Runnable task = () -> {
            long startTime = System.currentTimeMillis();
            System.out.println("Task started at " + startTime);
            
            // 這里放置你的任務(wù)代碼
            // ...
            // 假設(shè)這里是模擬耗時的任務(wù)
            try {
                Thread.sleep(7000); // 模擬任務(wù)執(zhí)行時間超過5秒的情況
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            
            System.out.println("Task ended at " + System.currentTimeMillis());

            // 當執(zhí)行次數(shù)達到5次后,取消任務(wù)
            if (count.getAndIncrement() == 4) { 
                executor.shutdown();
            }
        };

        // 每隔5秒(即使前一次任務(wù)執(zhí)行超過5秒)開始執(zhí)行下一次任務(wù)
        executor.scheduleWithFixedDelay(task, 0, 5, TimeUnit.SECONDS);
        
        // 確保線程池結(jié)束,避免程序無法退出
        try {
            executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
        } catch (InterruptedException e) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

在這個示例中,即使任務(wù)執(zhí)行時間超過了5秒,下一次任務(wù)也會在當前任務(wù)結(jié)束后至少等待5秒才開始執(zhí)行

executor.shutdownNow() 方法用于嘗試停止正在執(zhí)行的任務(wù),并取消尚未開始執(zhí)行的任務(wù)。此方法會立即關(guān)閉線程池,同時返回一個包含所有已提交但尚未開始執(zhí)行的任務(wù)列表。

調(diào)用 shutdownNow() 時會發(fā)生以下情況:

  1. 立即停止當前正在執(zhí)行的任務(wù)(如果可能的話)。對于某些任務(wù)來說,這可能意味著中斷正在運行的任務(wù)。因此,你的任務(wù)應(yīng)該能夠正確處理中斷請求(通過檢查 Thread.currentThread().isInterrupted())并盡可能快速且干凈地退出。
  2. 取消所有等待隊列中的未開始執(zhí)行的任務(wù)。
  3. 調(diào)用 shutdownNow() 后,線程池將不再接受新的任務(wù)。

與之相對的是 executor.shutdown() 方法,它不會立即停止正在執(zhí)行的任務(wù),而是等待所有已提交的任務(wù)完成后才關(guān)閉線程池,且不再接受新任務(wù)。但是,shutdown() 方法并不會中斷正在執(zhí)行的任務(wù)。

scheduleAtFixedRate 和 scheduleWithFixedDelay 的區(qū)別

scheduleAtFixedRate()

  • 此方法按照固定的頻率執(zhí)行任務(wù)。
  • 即使前一次任務(wù)尚未完成(如果任務(wù)執(zhí)行時間超過了預(yù)定周期),下一次任務(wù)也會在上一次開始執(zhí)行的時間基礎(chǔ)上加上固定周期后立即啟動。
  • 因此,如果任務(wù)執(zhí)行耗時不一致或較長,連續(xù)的任務(wù)可能會重疊執(zhí)行。

scheduleWithFixedDelay()

  • 此方法確保每次任務(wù)執(zhí)行完成后,都會等待一個固定的延遲時間后再啟動下一次任務(wù)。
  • 即使前一次任務(wù)超時,下一次任務(wù)也會在前一次任務(wù)結(jié)束時刻的基礎(chǔ)上加上指定的延遲時間才開始。
  • 這意味著,無論任務(wù)執(zhí)行所需時間如何,兩次任務(wù)執(zhí)行之間的間隔總是至少等于指定的延遲時間。

總結(jié)來說:

  • 如果你希望任務(wù)按固定的時間間隔開始,而不考慮每個任務(wù)的實際執(zhí)行時間,使用 scheduleAtFixedRate()。
  • 如果你希望每個任務(wù)結(jié)束后有一段固定的“冷靜期”,確保任何時間點相鄰兩次任務(wù)之間至少有一定的時間間隔,那么應(yīng)該使用 scheduleWithFixedDelay()。

DelayQueue

DelayQueue是JDK提供的api,是一個延遲隊列

DelayQueue泛型參數(shù)得實現(xiàn)Delayed接口,Delayed繼承了Comparable接口。

  • getDelay方法返回這個任務(wù)還剩多久時間可以執(zhí)行,小于0的時候說明可以這個延遲任務(wù)到了執(zhí)行的時間了。
  • compareTo這個是對任務(wù)排序的,保證最先到延遲時間的任務(wù)排到隊列的頭。

demo

@Getter
public class SanYouTask implements Delayed {

    private final String taskContent;

    private final Long triggerTime;

    public SanYouTask(String taskContent, Long delayTime) {
        this.taskContent = taskContent;
        this.triggerTime = System.currentTimeMillis() + delayTime * 1000;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(triggerTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return this.triggerTime.compareTo(((SanYouTask) o).triggerTime);
    }

}

SanYouTask實現(xiàn)了Delayed接口,構(gòu)造參數(shù)

  • taskContent:延遲任務(wù)的具體的內(nèi)容
  • delayTime:延遲時間,秒為單位

測試

@Slf4j
public class DelayQueueDemo {

    public static void main(String[] args) {
        DelayQueue<SanYouTask> sanYouTaskDelayQueue = new DelayQueue<>();

        new Thread(() -> {
            while (true) {
                try {
                    SanYouTask sanYouTask = sanYouTaskDelayQueue.take();
                    log.info("獲取到延遲任務(wù):{}", sanYouTask.getTaskContent());
                } catch (Exception e) {
                }
            }
        }).start();

        log.info("提交延遲任務(wù)");
        sanYouTaskDelayQueue.offer(new SanYouTask("三友的java日記5s", 5L));
        sanYouTaskDelayQueue.offer(new SanYouTask("三友的java日記3s", 3L));
        sanYouTaskDelayQueue.offer(new SanYouTask("三友的java日記8s", 8L));
    }
}

開啟一個線程從DelayQueue中獲取任務(wù),然后提交了三個任務(wù),延遲時間分為別5s,3s,8s。

測試結(jié)果:

成功實現(xiàn)了延遲任務(wù)。

實現(xiàn)原理

  • offer方法在提交任務(wù)的時候,會通過根據(jù)compareTo的實現(xiàn)對任務(wù)進行排序,將最先需要被執(zhí)行的任務(wù)放到隊列頭。
  • take方法獲取任務(wù)的時候,會拿到隊列頭部的元素,也就是隊列中最早需要被執(zhí)行的任務(wù),通過getDelay返回值判斷任務(wù)是否需要被立刻執(zhí)行,如果需要的話,就返回任務(wù),如果不需要就會等待這個任務(wù)到延遲時間的剩余時間,當時間到了就會將任務(wù)返回。

Timer

Timer也是JDK提供的api

demo

@Slf4j
public class TimerDemo {

    public static void main(String[] args) {
        Timer timer = new Timer();
        
        log.info("提交延遲任務(wù)");
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                log.info("執(zhí)行延遲任務(wù)");
            }
        }, 5000);
    }

}

通過schedule提交一個延遲時間為5s的延遲任務(wù)

實現(xiàn)原理

提交的任務(wù)是一個TimerTask

public abstract class TimerTask implements Runnable {
    //忽略其它屬性
    
    long nextExecutionTime;
}

TimerTask內(nèi)部有一個nextExecutionTime屬性,代表下一次任務(wù)執(zhí)行的時間,在提交任務(wù)的時候會計算出nextExecutionTime值。

Timer內(nèi)部有一個TaskQueue對象,用來保存TimerTask任務(wù)的,會根據(jù)nextExecutionTime來排序,保證能夠快速獲取到最早需要被執(zhí)行的延遲任務(wù)。

在Timer內(nèi)部還有一個執(zhí)行任務(wù)的線程TimerThread,這個線程就跟DelayQueue demo中開啟的線程作用是一樣的,用來執(zhí)行到了延遲時間的任務(wù)。

所以總的來看,Timer有點像整體封裝了DelayQueue demo中的所有東西,讓用起來簡單點。

雖然Timer用起來比較簡單,但是在阿里規(guī)范中是不推薦使用的,主要是有以下幾點原因:

  • Timer使用單線程來處理任務(wù),長時間運行的任務(wù)會導(dǎo)致其他任務(wù)的延時處理
  • Timer沒有對運行時異常進行處理,一旦某個任務(wù)觸發(fā)運行時異常,會導(dǎo)致整個Timer崩潰,不安全

ScheduledThreadPoolExecutor

由于Timer在使用上有一定的問題,所以在JDK1.5版本的時候提供了ScheduledThreadPoolExecutor,這個跟Timer的作用差不多,并且他們的方法的命名都是差不多的,但是ScheduledThreadPoolExecutor解決了單線程和異常崩潰等問題。

demo

@Slf4j
public class ScheduledThreadPoolExecutorDemo {

    public static void main(String[] args) {
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2, new ThreadPoolExecutor.CallerRunsPolicy());

        log.info("提交延遲任務(wù)");
        executor.schedule(() -> log.info("執(zhí)行延遲任務(wù)"), 5, TimeUnit.SECONDS);
    }

}

結(jié)果

實現(xiàn)原理

ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor,也就是繼承了線程池,所以可以有很多個線程來執(zhí)行任務(wù)。

ScheduledThreadPoolExecutor在構(gòu)造的時候會傳入一個DelayedWorkQueue阻塞隊列,所以線程池內(nèi)部的阻塞隊列是DelayedWorkQueue。

在提交延遲任務(wù)的時候,任務(wù)會被封裝一個任務(wù)會被封裝成ScheduledFutureTask對象,然后放到DelayedWorkQueue阻塞隊列中。

ScheduledFutureTask實現(xiàn)了前面提到的Delayed接口,所以其實可以猜到DelayedWorkQueue會根據(jù)ScheduledFutureTask對于Delayed接口的實現(xiàn)來排序,所以線程能夠獲取到最早到延遲時間的任務(wù)。

當線程從DelayedWorkQueue中獲取到需要執(zhí)行的任務(wù)之后就會執(zhí)行任務(wù)。

監(jiān)聽Redis過期key

在Redis中,有個發(fā)布訂閱的機制

生產(chǎn)者在消息發(fā)送時需要到指定發(fā)送到哪個channel上,消費者訂閱這個channel就能獲取到消息。圖中channel理解成MQ中的topic。

并且在Redis中,有很多默認的channel,只不過向這些channel發(fā)送消息的生產(chǎn)者不是我們寫的代碼,而是Redis本身。這里面就有這么一個channel叫做__keyevent@<db>__:expired,db是指Redis數(shù)據(jù)庫的序號。

當某個Redis的key過期之后,Redis內(nèi)部會發(fā)布一個事件到__keyevent@<db>__:expired這個channel上,只要監(jiān)聽這個事件,那么就可以獲取到過期的key。

所以基于監(jiān)聽Redis過期key實現(xiàn)延遲任務(wù)的原理如下:

  • 將延遲任務(wù)作為key,過期時間設(shè)置為延遲時間
  • 監(jiān)聽__keyevent@<db>__:expired這個channel,那么一旦延遲任務(wù)到了過期時間(延遲時間),那么就可以獲取到這個任務(wù)

demo

Spring已經(jīng)實現(xiàn)了監(jiān)聽__keyevent@*__:expired這個channel這個功能,__keyevent@*__:expired中的*代表通配符的意思,監(jiān)聽所有的數(shù)據(jù)庫。

所以demo寫起來就很簡單了,只需4步即可

依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.2.5.RELEASE</version>
</dependency>

配置文件

spring:
  redis:
    host: 192.168.200.144
    port: 6379

配置類

@Configuration
public class RedisConfiguration {

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(connectionFactory);
        return redisMessageListenerContainer;
    }

    @Bean
    public KeyExpirationEventMessageListener redisKeyExpirationListener(RedisMessageListenerContainer redisMessageListenerContainer) {
        return new KeyExpirationEventMessageListener(redisMessageListenerContainer);
    }

}

KeyExpirationEventMessageListener實現(xiàn)了對__keyevent@*__:expiredchannel的監(jiān)聽

當KeyExpirationEventMessageListener收到Redis發(fā)布的過期Key的消息的時候,會發(fā)布RedisKeyExpiredEvent事件

所以我們只需要監(jiān)聽RedisKeyExpiredEvent事件就可以拿到過期消息的Key,也就是延遲消息。

對RedisKeyExpiredEvent事件的監(jiān)聽實現(xiàn)MyRedisKeyExpiredEventListener

@Component
public class MyRedisKeyExpiredEventListener implements ApplicationListener<RedisKeyExpiredEvent> {

    @Override
    public void onApplicationEvent(RedisKeyExpiredEvent event) {
        byte[] body = event.getSource();
        System.out.println("獲取到延遲消息:" + new String(body));
    }

}

代碼寫好,啟動應(yīng)用

之后我直接通過Redis命令設(shè)置消息,消息的key為sanyou,值為task,值不重要,過期時間為5s

set sanyou task 
expire sanyou 5

成功獲取到延遲任務(wù)

雖然這種方式可以實現(xiàn)延遲任務(wù),但是這種方式比較多

任務(wù)存在延遲

Redis過期事件的發(fā)布不是指key到了過期時間就發(fā)布,而是key到了過期時間被清除之后才會發(fā)布事件。

而Redis過期key的兩種清除策略,就是面試八股文常背的兩種:

  • 惰性清除。當這個key過期之后,訪問時,這個Key才會被清除
  • 定時清除。后臺會定期檢查一部分key,如果有key過期了,就會被清除

所以即使key到了過期時間,Redis也不一定會發(fā)送key過期事件,這就到導(dǎo)致雖然延遲任務(wù)到了延遲時間也可能獲取不到延遲任務(wù)。

丟消息太頻繁

Redis實現(xiàn)的發(fā)布訂閱模式,消息是沒有持久化機制,當消息發(fā)布到某個channel之后,如果沒有客戶端訂閱這個channel,那么這個消息就丟了,并不會像MQ一樣進行持久化,等有消費者訂閱的時候再給消費者消費。

所以說,假設(shè)服務(wù)重啟期間,某個生產(chǎn)者或者是Redis本身發(fā)布了一條消息到某個channel,由于服務(wù)重啟,沒有監(jiān)聽這個channel,那么這個消息自然就丟了。

消息消費只有廣播模式

Redis的發(fā)布訂閱模式消息消費只有廣播模式一種。

所謂的廣播模式就是多個消費者訂閱同一個channel,那么每個消費者都能消費到發(fā)布到這個channel的所有消息。

如圖,生產(chǎn)者發(fā)布了一條消息,內(nèi)容為sanyou,那么兩個消費者都可以同時收到sanyou這條消息。

所以,如果通過監(jiān)聽channel來獲取延遲任務(wù),那么一旦服務(wù)實例有多個的話,還得保證消息不能重復(fù)處理,額外地增加了代碼開發(fā)量。

接收到所有key的某個事件

這個不屬于Redis發(fā)布訂閱模式的問題,而是Redis本身事件通知的問題。

當監(jiān)聽了__keyevent@<db>__:expired的channel,那么所有的Redis的key只要發(fā)生了過期事件都會被通知給消費者,不管這個key是不是消費者想接收到的。

所以如果你只想消費某一類消息的key,那么還得自行加一些標記,比如消息的key加個前綴,消費的時候判斷一下帶前綴的key就是需要消費的任務(wù)。

Redisson的RDelayedQueue

Redisson他是Redis的兒子(Redis son),基于Redis實現(xiàn)了非常多的功能,其中最常使用的就是Redis分布式鎖的實現(xiàn),但是除了實現(xiàn)Redis分布式鎖之外,它還實現(xiàn)了延遲隊列的功能。

demo

引入pom

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.1</version>
</dependency

封裝了一個RedissonDelayQueue類

@Component
@Slf4j
public class RedissonDelayQueue {

    private RedissonClient redissonClient;

    private RDelayedQueue<String> delayQueue;
    private RBlockingQueue<String> blockingQueue;

    @PostConstruct
    public void init() {
        initDelayQueue();
        startDelayQueueConsumer();
    }

    private void initDelayQueue() {
        Config config = new Config();
        SingleServerConfig serverConfig = config.useSingleServer();
        serverConfig.setAddress("redis://localhost:6379");
        redissonClient = Redisson.create(config);

        blockingQueue = redissonClient.getBlockingQueue("SANYOU");
        delayQueue = redissonClient.getDelayedQueue(blockingQueue);
    }

    private void startDelayQueueConsumer() {
        new Thread(() -> {
            while (true) {
                try {
                    String task = blockingQueue.take();
                    log.info("接收到延遲任務(wù):{}", task);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "SANYOU-Consumer").start();
    }

    public void offerTask(String task, long seconds) {
        log.info("添加延遲任務(wù):{} 延遲時間:{}s", task, seconds);
        delayQueue.offer(task, seconds, TimeUnit.SECONDS);
    }

}

這個類在創(chuàng)建的時候會去初始化延遲隊列,創(chuàng)建一個RedissonClient對象,之后通過RedissonClient對象獲取到RDelayedQueue和RBlockingQueue對象,傳入的隊列名字叫SANYOU,這個名字無所謂。

當延遲隊列創(chuàng)建之后,會開啟一個延遲任務(wù)的消費線程,這個線程會一直從RBlockingQueue中通過take方法阻塞獲取延遲任務(wù)。

添加任務(wù)的時候是通過RDelayedQueue的offer方法添加的。

controller類,通過接口添加任務(wù),延遲時間為5s

@RestController
public class RedissonDelayQueueController {

    @Resource
    private RedissonDelayQueue redissonDelayQueue;

    @GetMapping("/add")
    public void addTask(@RequestParam("task") String task) {
        redissonDelayQueue.offerTask(task, 5);
    }

}

啟動項目,在瀏覽器輸入如下連接,添加任務(wù)

http://localhost:8080/add?task=sanyou

靜靜等待5s,成功獲取到任務(wù)。

實現(xiàn)原理

如下是Redisson延遲隊列的實現(xiàn)原理

SANYOU前面的前綴都是固定的,Redisson創(chuàng)建的時候會拼上前綴。

  • redisson_delay_queue_timeout:SANYOU,sorted set數(shù)據(jù)類型,存放所有延遲任務(wù),按照延遲任務(wù)的到期時間戳(提交任務(wù)時的時間戳 + 延遲時間)來排序的,所以列表的最前面的第一個元素就是整個延遲隊列中最早要被執(zhí)行的任務(wù),這個概念很重要
  • redisson_delay_queue:SANYOU,list數(shù)據(jù)類型,也是存放所有的任務(wù),但是研究下來發(fā)現(xiàn)好像沒什么用。。
  • SANYOU,list數(shù)據(jù)類型,被稱為目標隊列,這個里面存放的任務(wù)都是已經(jīng)到了延遲時間的,可以被消費者獲取的任務(wù),所以上面demo中的RBlockingQueue的take方法是從這個目標隊列中獲取到任務(wù)的
  • redisson_delay_queue_channel:SANYOU,是一個channel,用來通知客戶端開啟一個延遲任務(wù)

任務(wù)提交的時候,Redisson會將任務(wù)放到redisson_delay_queue_timeout:SANYOU中,分數(shù)就是提交任務(wù)的時間戳+延遲時間,就是延遲任務(wù)的到期時間戳

Redisson客戶端內(nèi)部通過監(jiān)聽redisson_delay_queue_channel:SANYOU這個channel來提交一個延遲任務(wù),這個延遲任務(wù)能夠保證將redisson_delay_queue_timeout:SANYOU中到了延遲時間的任務(wù)從redisson_delay_queue_timeout:SANYOU中移除,存到SANYOU這個目標隊列中。

于是消費者就可以從SANYOU這個目標隊列獲取到延遲任務(wù)了。

所以從這可以看出,Redisson的延遲任務(wù)的實現(xiàn)跟前面說的MQ的實現(xiàn)都是殊途同歸,最開始任務(wù)放到中間的一個地方,叫做redisson_delay_queue_timeout:SANYOU,然后會開啟一個類似于定時任務(wù)的一個東西,去判斷這個中間地方的消息是否到了延遲時間,到了再放到最終的目標的隊列供消費者消費。

Redisson的這種實現(xiàn)方式比監(jiān)聽Redis過期key的實現(xiàn)方式更加可靠,因為消息都存在list和sorted set數(shù)據(jù)類型中,所以消息很少丟。

Hutool的SystemTimer

Hutool工具類也提供了延遲任務(wù)的實現(xiàn)SystemTimer

demo

@Slf4j
public class SystemTimerDemo {

    public static void main(String[] args) {
        SystemTimer systemTimer = new SystemTimer();
        systemTimer.start();

        log.info("提交延遲任務(wù)");
        systemTimer.addTask(new TimerTask(() -> log.info("執(zhí)行延遲任務(wù)"), 5000));
    }

}

執(zhí)行結(jié)果

Hutool底層其實也用到了時間輪。

Quartz

quartz是一款開源作業(yè)調(diào)度框架,基于quartz提供的api也可以實現(xiàn)延遲任務(wù)的功能。

demo

依賴

<dependency>
    <groupId>org.quartz-scheduler</groupId>
    <artifactId>quartz</artifactId>
    <version>2.3.2</version>
</dependency>

SanYouJob實現(xiàn)Job接口,當任務(wù)到達執(zhí)行時間的時候會調(diào)用execute的實現(xiàn),從context可以獲取到任務(wù)的內(nèi)容

@Slf4j
public class SanYouJob implements Job {
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        JobDetail jobDetail = context.getJobDetail();
        JobDataMap jobDataMap = jobDetail.getJobDataMap();
        log.info("獲取到延遲任務(wù):{}", jobDataMap.get("delayTask"));
    }
}

測試類

public class QuartzDemo {

    public static void main(String[] args) throws SchedulerException, InterruptedException {
        // 1.創(chuàng)建Scheduler的工廠
        SchedulerFactory sf = new StdSchedulerFactory();
        // 2.從工廠中獲取調(diào)度器實例
        Scheduler scheduler = sf.getScheduler();

        // 6.啟動 調(diào)度器
        scheduler.start();

        // 3.創(chuàng)建JobDetail,Job類型就是上面說的SanYouJob
        JobDetail jb = JobBuilder.newJob(SanYouJob.class)
                .usingJobData("delayTask", "這是一個延遲任務(wù)")
                .build();

        // 4.創(chuàng)建Trigger
        Trigger t = TriggerBuilder.newTrigger()
                //任務(wù)的觸發(fā)時間就是延遲任務(wù)到的延遲時間
                .startAt(DateUtil.offsetSecond(new Date(), 5))
                .build();

        // 5.注冊任務(wù)和定時器
        log.info("提交延遲任務(wù)");
        scheduler.scheduleJob(jb, t);
    }
}

執(zhí)行結(jié)果:

實現(xiàn)原理

核心組件

  • Job:表示一個任務(wù),execute方法的實現(xiàn)是對任務(wù)的執(zhí)行邏輯
  • JobDetail:任務(wù)的詳情,可以設(shè)置任務(wù)需要的參數(shù)等信息
  • Trigger:觸發(fā)器,是用來觸發(fā)業(yè)務(wù)的執(zhí)行,比如說指定5s后觸發(fā)任務(wù),那么任務(wù)就會在5s后觸發(fā)
  • Scheduler:調(diào)度器,內(nèi)部可以注冊多個任務(wù)和對應(yīng)任務(wù)的觸發(fā)器,之后會調(diào)度任務(wù)的執(zhí)行

啟動的時候會開啟一個QuartzSchedulerThread調(diào)度線程,這個線程會去判斷任務(wù)是否到了執(zhí)行時間,到的話就將任務(wù)交給任務(wù)線程池去執(zhí)行。

無限輪詢延遲任務(wù)

無限輪詢的意思就是開啟一個線程不停的去輪詢?nèi)蝿?wù),當這些任務(wù)到達了延遲時間,那么就執(zhí)行任務(wù)。

demo

@Slf4j
public class PollingTaskDemo {

    private static final List<DelayTask> DELAY_TASK_LIST = new CopyOnWriteArrayList<>();

    public static void main(String[] args) {
        new Thread(() -> {
            while (true) {
                try {
                    for (DelayTask delayTask : DELAY_TASK_LIST) {
                        if (delayTask.triggerTime <= System.currentTimeMillis()) {
                            log.info("處理延遲任務(wù):{}", delayTask.taskContent);
                            DELAY_TASK_LIST.remove(delayTask);
                        }
                    }
                    TimeUnit.MILLISECONDS.sleep(100);
                } catch (Exception e) {
                }
            }
        }).start();

        log.info("提交延遲任務(wù)");
        DELAY_TASK_LIST.add(new DelayTask("三友的java日記", 5L));
    }

    @Getter
    @Setter
    public static class DelayTask {

        private final String taskContent;

        private final Long triggerTime;

        public DelayTask(String taskContent, Long delayTime) {
            this.taskContent = taskContent;
            this.triggerTime = System.currentTimeMillis() + delayTime * 1000;
        }
    }

}

任務(wù)可以存在數(shù)據(jù)庫又或者是內(nèi)存,看具體的需求,這里我為了簡單就放在內(nèi)存里了。

執(zhí)行結(jié)果:

這種操作簡單,但是就是效率低下,每次都得遍歷所有的任務(wù)。

總結(jié)

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • Druid如何平行替換為Hikari

    Druid如何平行替換為Hikari

    這篇文章主要介紹了Druid如何平行替換為Hikari問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-05-05
  • java WSDL接口webService實現(xiàn)方式

    java WSDL接口webService實現(xiàn)方式

    這篇文章主要為大家詳細介紹了java WSDL接口webService實現(xiàn)方式的相關(guān)資料,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-04-04
  • lombok @Accessors用法詳解

    lombok @Accessors用法詳解

    這篇文章主要介紹了lombok @Accessors用法詳解,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習或者工作具有一定的參考學(xué)習價值,需要的朋友們下面隨著小編來一起學(xué)習學(xué)習吧
    2020-11-11
  • java數(shù)據(jù)結(jié)構(gòu)之二分查找法 binarySearch的實例

    java數(shù)據(jù)結(jié)構(gòu)之二分查找法 binarySearch的實例

    這篇文章主要介紹了java數(shù)據(jù)結(jié)構(gòu)之二分查找法 binarySearch的實例的相關(guān)資料,希望通過本文能幫助到大家,讓大家理解掌握這部分內(nèi)容,需要的朋友可以參考下
    2017-10-10
  • SpringMVC post請求中文亂碼問題解決

    SpringMVC post請求中文亂碼問題解決

    這篇文章主要介紹了SpringMVC post請求中文亂碼問題解決,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習或者工作具有一定的參考學(xué)習價值,需要的朋友可以參考下
    2019-12-12
  • Java中將MultipartFile和File互轉(zhuǎn)的方法詳解

    Java中將MultipartFile和File互轉(zhuǎn)的方法詳解

    我們在開發(fā)過程中經(jīng)常需要接收前端傳來的文件,通常需要處理MultipartFile格式的文件,今天來介紹一下MultipartFile和File怎么進行優(yōu)雅的互轉(zhuǎn),需要的朋友可以參考下
    2023-10-10
  • Java中classpath講解及使用方式

    Java中classpath講解及使用方式

    本文詳細講解了Java中classpath講解及使用方式,文中通過示例代碼介紹的非常詳細。對大家的學(xué)習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-12-12
  • java實現(xiàn)文本框和文本區(qū)的輸入輸出

    java實現(xiàn)文本框和文本區(qū)的輸入輸出

    這篇文章主要介紹了java實現(xiàn)文本框和文本區(qū)的輸入輸出的方法和具體示例,有需要的小伙伴可以參考下。
    2015-06-06
  • java 全角半角字符轉(zhuǎn)換的方法實例

    java 全角半角字符轉(zhuǎn)換的方法實例

    這篇文章主要介紹了java 全角半角字符轉(zhuǎn)換的方法,大家參考使用吧
    2013-11-11
  • Mybatis如何通過接口實現(xiàn)sql執(zhí)行原理解析

    Mybatis如何通過接口實現(xiàn)sql執(zhí)行原理解析

    為了簡化MyBatis的使用,MyBatis提供了接口方式自動化生成調(diào)用過程,可以大大簡化MyBatis的開發(fā),下面這篇文章主要給大家介紹了關(guān)于Mybatis如何通過接口實現(xiàn)sql執(zhí)行原理解析的相關(guān)資料,需要的朋友可以參考下
    2023-01-01

最新評論