使用Java實現(xiàn)一個簡單的定時器
1. 概要
說到定時任務(wù),大家都不會模式,前幾天在看到 Java 的 Timer 定時(延時)任務(wù),就順著 Timer - ScheduledThreadPoolExecutor - RocketMQ - Netty 這些框架看了下定時任務(wù)的一些核心邏輯實現(xiàn),現(xiàn)在就寫一個系列的文章來記錄下這些源碼的實現(xiàn)過程,如果有錯誤的地方歡迎指出。
定時任務(wù)在我們平時的項目中都會多多少少有接觸,在很多場景下都非常有用,比如:
- 每天批量處理數(shù)據(jù)、生成報告等
- 會議提醒、郵件發(fā)送等
- 訂單支付超時退款、清理臨時文件、日志文件…
下面在介紹切入正題之前我們先自己實現(xiàn)一個最簡單的定時/延時器
2. 注意問題
要實現(xiàn)一個定時任務(wù),首先我們要考慮以下幾點問題
- 如何讓任務(wù)按照執(zhí)行的時間排序(什么樣的集合)
- 定時任務(wù)到期之后如何執(zhí)行
- 如何實現(xiàn)任務(wù)執(zhí)行多次
帶著這三個問題,我們來考慮定時器的實現(xiàn)
首先是第一個問題,什么樣的集合能滿足我們的要求,需要滿足按照固定順序自動排序調(diào)整任務(wù),用 JDK 的 PriorityQueue 優(yōu)先隊列就可以了,只需要設(shè)置按照任務(wù)的執(zhí)行時間排序,當(dāng)我們把任務(wù)添加到隊列里面,隊列就會按照任務(wù)的執(zhí)行時間自動排序,這部分代碼就不用我們考慮了
第二個問題,我們可以額外啟動一個任務(wù)處理線程專門去循環(huán)隊列處理里面的到期任務(wù),如果任務(wù)沒到期就阻塞等待,如果到期了就喚醒執(zhí)行任務(wù)
第三個問題,添加任務(wù)的時候可以設(shè)置一個 Count 表示任務(wù)需要執(zhí)行的次數(shù),添加任務(wù)的時候就可以 for 循環(huán)去遍歷添加
3. 實現(xiàn)
3.1 任務(wù)定義
首先定義一個任務(wù) bean,里面包括任務(wù)的執(zhí)行時間和執(zhí)行的任務(wù):
class Task { // 執(zhí)行時間 private long executeTime; // 任務(wù) Runnable runnable; public Task(long executeTime, Runnable runnable) { this.executeTime = executeTime; this.runnable = runnable; } public long getExecuteTime() { return executeTime; } public void setExecuteTime(long executeTime) { this.executeTime = executeTime; } public Runnable getRunnable() { return runnable; } public void setRunnable(Runnable runnable) { this.runnable = runnable; } }
3.2 添加任務(wù)的方法
public boolean add(long addTime, Runnable runnable, int count) { long now = System.currentTimeMillis(); synchronized (queue) { for (int i = 0; i < count; i++) { // 計算任務(wù)的執(zhí)行時間 long newTime = now + addTime * (i + 1); // 如果小于 now,說明溢出了 if (newTime < now) { throw new RuntimeException(Thread.currentThread().getName() + ": overflow long limit"); } Task min = getMin(); if (min == null || newTime < min.executeTime) { // 加入任務(wù)隊列 queue.offer(new Task(newTime, runnable)); // 喚醒任務(wù)線程 queue.notifyAll(); continue; } // 加入任務(wù)隊列 queue.offer(new Task(newTime, runnable)); } } return true; }
首先計算任務(wù)的執(zhí)行時間,判斷一下是否會溢出,如果溢出就拋出異常
再獲取隊首任務(wù),如果添加的新任務(wù)執(zhí)行時間比隊首任務(wù)更塊,調(diào)用 queue.notifyAll() 喚醒線程去執(zhí)行任務(wù)
將任務(wù)加入隊列中
3.3 任務(wù)線程
Thread thread = new Thread(() -> { System.out.println(Thread.currentThread().getName() + ": 啟動 execute-job-thread 線程"); while (true) { try { synchronized (queue) { if (queue.isEmpty()) { this.queue.wait(); } long now = System.currentTimeMillis(); while (!queue.isEmpty() && now >= queue.peek().getExecuteTime()) { Task task = queue.poll(); // 線程池執(zhí)行任務(wù) executor.execute(task.runnable); } if (queue.isEmpty()) { this.queue.wait(); } else { long div = queue.peek().executeTime - now; this.queue.wait(div); } } } catch (Exception e) { throw new RuntimeException(e); } } }, "execute-job-thread");
線程在一個死循環(huán)里面不斷遍歷隊列看有沒有任務(wù)值
- 如果沒有任務(wù),就阻塞等待
- 被喚醒后代表有任務(wù),這時候獲取當(dāng)前時間,獲取所有的過期的任務(wù)依次執(zhí)行
- 執(zhí)行完之后根據(jù)隊列中是否還有任務(wù)來判斷要阻塞多長時間
3.4 整體代碼
PQueueService 代碼如下:
public class PQueueService { private static final int MAX_COUNT = Integer.MAX_VALUE; private final PriorityQueue<Task> queue = new PriorityQueue<>((o1, o2) -> { return Long.compare(o1.executeTime, o2.executeTime); }); static final ThreadPoolExecutor executor = new ThreadPoolExecutor(16, 32, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(16)); Thread thread = new Thread(() -> { System.out.println(Thread.currentThread().getName() + ": 啟動 execute-job-thread 線程"); while (true) { try { synchronized (queue) { if (queue.isEmpty()) { this.queue.wait(); } long now = System.currentTimeMillis(); while (!queue.isEmpty() && now >= queue.peek().getExecuteTime()) { Task task = queue.poll(); // 線程池執(zhí)行任務(wù) executor.execute(task.runnable); } if (queue.isEmpty()) { this.queue.wait(); } else { long div = queue.peek().executeTime - now; this.queue.wait(div); } } } catch (Exception e) { throw new RuntimeException(e); } } }, "execute-job-thread"); public void init() { thread.start(); } public Task getMin() { if (queue.isEmpty()) { return null; } synchronized (queue) { return queue.peek(); } } public Task pollMin() { if (queue.isEmpty()) { return null; } synchronized (queue) { return queue.poll(); } } public boolean add(long addTime, Runnable runnable, int count) { long now = System.currentTimeMillis(); synchronized (queue) { for (int i = 0; i < count; i++) { long newTime = now + addTime * (i + 1); if (newTime < now) { throw new RuntimeException(Thread.currentThread().getName() + ": overflow long limit"); } Task min = getMin(); if (min == null || newTime < min.executeTime) { // 加入任務(wù)隊列 queue.offer(new Task(newTime, runnable)); // 喚醒任務(wù)線程 queue.notifyAll(); continue; } // 加入任務(wù)隊列 queue.offer(new Task(newTime, runnable)); } } return true; } class Task { private long executeTime; Runnable runnable; public Task(long executeTime, Runnable runnable) { this.executeTime = executeTime; this.runnable = runnable; } public long getExecuteTime() { return executeTime; } public void setExecuteTime(long executeTime) { this.executeTime = executeTime; } public Runnable getRunnable() { return runnable; } public void setRunnable(Runnable runnable) { this.runnable = runnable; } } }
Timer 類代碼如下:
public class Timer { public static final Logger logger = LoggerFactory.getLogger(Timer.class); PQueueService pQueueService; public Timer() { pQueueService = new PQueueService(); pQueueService.init(); } /** * 執(zhí)行任務(wù) */ public void fixExecuteOne(int seconds, TimeUnit timeUnit, Runnable runnable, int count) { if (timeUnit.ordinal() != TimeUnit.SECONDS.ordinal()) { throw new RuntimeException("unknown unit"); } long time = seconds * 1000L; pQueueService.add(time, runnable, count); } public static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public static void main(String[] args) { Timer timer = new Timer(); // 1.添加一個10s后執(zhí)行的任務(wù) timer.fixExecuteOne(10, TimeUnit.SECONDS, () -> { System.out.println("10s任務(wù) ===> " + Thread.currentThread().getName() + ": " + "執(zhí)行了!!!!!, 當(dāng)前時間:" + simpleDateFormat.format(new Date())); }, 2); // 2.添加一個3s后執(zhí)行的任務(wù) timer.fixExecuteOne(3, TimeUnit.SECONDS, () -> { System.out.println("3s任務(wù) ===> " + Thread.currentThread().getName() + ": " + "執(zhí)行了!!!!!, 當(dāng)前時間:" + simpleDateFormat.format(new Date())); }, 2); // 3.添加一個20s后執(zhí)行的任務(wù) timer.fixExecuteOne(20, TimeUnit.SECONDS, () -> { System.out.println("20s任務(wù) ===> " + Thread.currentThread().getName() + ": " + "執(zhí)行了!!!!!, 當(dāng)前時間:" + simpleDateFormat.format(new Date())); }, 2); // 4.添加一個1s后執(zhí)行的任務(wù) timer.fixExecuteOne(1, TimeUnit.SECONDS, () -> { System.out.println("1s任務(wù)1 ===> " + Thread.currentThread().getName() + ": " + "執(zhí)行了!!!!!, 當(dāng)前時間:" + simpleDateFormat.format(new Date())); }, 2); // 5.添加一個1s后執(zhí)行的任務(wù) timer.fixExecuteOne(1, TimeUnit.SECONDS, () -> { System.out.println("1s任務(wù)2 ===> " + Thread.currentThread().getName() + ": " + "執(zhí)行了!!!!!, 當(dāng)前時間:" + simpleDateFormat.format(new Date())); }, 2); // 6.添加一個1s后執(zhí)行的任務(wù) timer.fixExecuteOne(1, TimeUnit.SECONDS, () -> { System.out.println("1s任務(wù)3 ===> " + Thread.currentThread().getName() + ": " + "執(zhí)行了!!!!!, 當(dāng)前時間:" + simpleDateFormat.format(new Date())); }, 2); try { Thread.sleep(30000); // 7.添加一個1s后執(zhí)行的任務(wù) timer.fixExecuteOne(1, TimeUnit.SECONDS, () -> { System.out.println("1s任務(wù)3 ===> " + Thread.currentThread().getName() + ": " + "執(zhí)行了!!!!!, 當(dāng)前時間:" + simpleDateFormat.format(new Date())); }, 1); Thread.sleep(30000); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
執(zhí)行結(jié)果:
execute-job-thread: 啟動 execute-job-thread 線程
1s任務(wù)2 ===> pool-1-thread-2: 執(zhí)行了!!!!!, 當(dāng)前時間:2024-11-23 21:17:21
1s任務(wù)1 ===> pool-1-thread-1: 執(zhí)行了!!!!!, 當(dāng)前時間:2024-11-23 21:17:21
1s任務(wù)3 ===> pool-1-thread-3: 執(zhí)行了!!!!!, 當(dāng)前時間:2024-11-23 21:17:21
1s任務(wù)1 ===> pool-1-thread-4: 執(zhí)行了!!!!!, 當(dāng)前時間:2024-11-23 21:17:22
1s任務(wù)2 ===> pool-1-thread-5: 執(zhí)行了!!!!!, 當(dāng)前時間:2024-11-23 21:17:22
1s任務(wù)3 ===> pool-1-thread-6: 執(zhí)行了!!!!!, 當(dāng)前時間:2024-11-23 21:17:22
3s任務(wù) ===> pool-1-thread-7: 執(zhí)行了!!!!!, 當(dāng)前時間:2024-11-23 21:17:23
3s任務(wù) ===> pool-1-thread-8: 執(zhí)行了!!!!!, 當(dāng)前時間:2024-11-23 21:17:26
10s任務(wù) ===> pool-1-thread-9: 執(zhí)行了!!!!!, 當(dāng)前時間:2024-11-23 21:17:30
10s任務(wù) ===> pool-1-thread-10: 執(zhí)行了!!!!!, 當(dāng)前時間:2024-11-23 21:17:40
20s任務(wù) ===> pool-1-thread-11: 執(zhí)行了!!!!!, 當(dāng)前時間:2024-11-23 21:17:40
30s后添加的1s任務(wù) ===> pool-1-thread-12: 執(zhí)行了!!!!!, 當(dāng)前時間:2024-11-23 21:17:51
20s任務(wù) ===> pool-1-thread-13: 執(zhí)行了!!!!!, 當(dāng)前時間:2024-11-23 21:18:00
4. 存在問題
上面實現(xiàn)了一個簡單的定時器,但是這個定時器還是存在下面一些問題:
- 功能比較簡單,沒有其他的 remove 等操作
- 任務(wù)線程拿到任務(wù)之后會丟到另外一個線程池中執(zhí)行,如果線程池中任務(wù)比較多,就會阻塞導(dǎo)致執(zhí)行時間不準(zhǔn)
- 代碼中對 queue 進行了加鎖,但是一個任務(wù)可以添加到多個 queue,這里沒有做并發(fā)處理
- …
總之,這就是一個簡單的 Timer Demo,主要了解下定時器/延時器的執(zhí)行過程, 但是其實 JDK 里面的定時器/延時器實現(xiàn)流程都大差不差,下一篇文章,我們就來看看 Timer 的用法和源碼
到此這篇關(guān)于使用Java實現(xiàn)一個簡單的定時器的文章就介紹到這了,更多相關(guān)Java定時器內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringCloud輪詢拉取注冊表與服務(wù)發(fā)現(xiàn)流程詳解
這篇文章主要介紹了SpringCloud輪詢拉取注冊表與服務(wù)發(fā)現(xiàn),現(xiàn)在很多創(chuàng)業(yè)公司都開始往springcloud靠了,可能是由于文檔和組件比較豐富的原因吧,畢竟是一款目前來說比較完善的微服務(wù)架構(gòu)2022-11-11使用Spring Boot的LoggersEndpoint管理日志級別
這篇文章主要為大家介紹了使用Spring Boot的LoggersEndpoint管理日志級別,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-11-11Spring 實現(xiàn)excel及pdf導(dǎo)出表格示例
本篇文章主要介紹了Spring 實現(xiàn)excel及pdf導(dǎo)出表格示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-03-03Java實現(xiàn)猜數(shù)字小游戲(有次數(shù)限制)
這篇文章主要為大家詳細介紹了Java實現(xiàn)猜數(shù)字小游戲,有次數(shù)限制,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2020-05-05