一分鐘掌握J(rèn)ava?Quartz定時(shí)任務(wù)
前言
前幾篇介紹了單體架構(gòu)的定時(shí)任務(wù)解決方式,但是現(xiàn)代軟件架構(gòu)由于業(yè)務(wù)復(fù)雜度高,業(yè)務(wù)的耦合性太強(qiáng),已經(jīng)由單體架構(gòu)拆分成了分布式架構(gòu)。因此,定時(shí)任務(wù)的架構(gòu)也隨之修改。而Quartz是分布式定時(shí)任務(wù)解決方案中使用簡(jiǎn)單,結(jié)構(gòu)清晰,且不依賴(lài)第三方分布式調(diào)度中間件的。上車(chē),mars醬帶你車(chē)?yán)锛?xì)說(shuō)~
角色介紹
Quartz入門(mén)使用的角色不多,三個(gè)角色足夠,分別是:
Scheduler
:調(diào)度器。用來(lái)負(fù)責(zé)任務(wù)的調(diào)度;
Job
:任務(wù)。這是一個(gè)接口,業(yè)務(wù)代碼繼承Job接口并實(shí)現(xiàn)它的execute
方法,是業(yè)務(wù)執(zhí)行的主體部分;
Trigger
: 觸發(fā)器。也是個(gè)接口,有兩個(gè)觸發(fā)器比較關(guān)鍵,一個(gè)是SimpleTrigger
,另一個(gè)是CronTrigger
。前者支持簡(jiǎn)單的定時(shí),比如:按時(shí)、按秒等;后者直接支持cron表達(dá)式。下面我們從官方的源代碼入手,看看Quartz如何做到分布式的。
官方例子
官方源代碼down下來(lái)之后,有個(gè)examples文件夾:
example1是入門(mén)級(jí)中最簡(jiǎn)單的。就兩個(gè)java文件,一個(gè)HelloJob:
package org.quartz.examples.example1; import java.util.Date; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; /** * <p> * This is just a simple job that says "Hello" to the world. * </p> * * @author Bill Kratzer */ public class HelloJob implements Job { private static Logger _log = LoggerFactory.getLogger(HelloJob.class); /** * <p> * Empty constructor for job initilization * </p> * <p> * Quartz requires a public empty constructor so that the * scheduler can instantiate the class whenever it needs. * </p> */ public HelloJob() { } /** * <p> * Called by the <code>{@link org.quartz.Scheduler}</code> when a * <code>{@link org.quartz.Trigger}</code> fires that is associated with * the <code>Job</code>. * </p> * * @throws JobExecutionException * if there is an exception while executing the job. */ public void execute(JobExecutionContext context) throws JobExecutionException { // Say Hello to the World and display the date/time _log.info("Hello World! - " + new Date()); } }
另一個(gè)SimpleExample:
package org.quartz.examples.example1; import org.quartz.JobDetail; import org.quartz.Scheduler; import org.quartz.SchedulerFactory; import org.quartz.Trigger; import org.quartz.impl.StdSchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Date; import static org.quartz.DateBuilder.evenMinuteDate; import static org.quartz.JobBuilder.newJob; import static org.quartz.TriggerBuilder.newTrigger; /** * This Example will demonstrate how to start and shutdown the Quartz scheduler and how to schedule a job to run in * Quartz. * * @author Bill Kratzer */ public class SimpleExample { public void run() throws Exception { Logger log = LoggerFactory.getLogger(SimpleExample.class); log.info("------- Initializing ----------------------"); // 1. 創(chuàng)建一個(gè)scheduler SchedulerFactory sf = new StdSchedulerFactory(); Scheduler sched = sf.getScheduler(); log.info("------- Initialization Complete -----------"); // computer a time that is on the next round minute Date runTime = evenMinuteDate(new Date()); log.info("------- Scheduling Job -------------------"); // 2. 指定一個(gè)job JobDetail job = newJob(HelloJob.class).withIdentity("job1", "group1").build(); // 3. 指定一個(gè)trigger Trigger trigger = newTrigger().withIdentity("trigger1", "group1").startAt(runTime).build(); // 4. 綁定job和trigger sched.scheduleJob(job, trigger); log.info(job.getKey() + " will run at: " + runTime); // 5. 執(zhí)行 sched.start(); log.info("------- Started Scheduler -----------------"); // wait long enough so that the scheduler as an opportunity to // run the job! log.info("------- Waiting 65 seconds... -------------"); try { // wait 65 seconds to show job Thread.sleep(65L * 1000L); // executing... } catch (Exception e) { // } // shut down the scheduler log.info("------- Shutting Down ---------------------"); sched.shutdown(true); log.info("------- Shutdown Complete -----------------"); } public static void main(String[] args) throws Exception { SimpleExample example = new SimpleExample(); example.run(); } }
整個(gè)SimpleExample只有五個(gè)步驟:
- 創(chuàng)建Scheduler,這是一個(gè)調(diào)度器,例子中使用調(diào)度器工廠來(lái)創(chuàng)建一個(gè)調(diào)度器;
- 創(chuàng)建一個(gè)Job。實(shí)際上Job就是那個(gè)HelloJob,但是這里把HelloJob丟給了JobDetail對(duì)象,Job接口本身只有一個(gè)execute函數(shù),沒(méi)有其他的屬性了,如果需要附加其他屬性,JobDetail就支持,比如我們需要往Job中傳遞參數(shù),JobDetail中提供了一個(gè)JobDataMap。當(dāng)Job在運(yùn)行的時(shí)候,execute函數(shù)里面的就能獲取到JobDetail對(duì)象,并將設(shè)置的數(shù)據(jù)傳遞給Job接口的實(shí)現(xiàn);
- 創(chuàng)建一個(gè)Trigger。Trigger對(duì)象主責(zé)是任務(wù)的執(zhí)行時(shí)間,比如官方例子中的startAt函數(shù),就指定了具體的運(yùn)行時(shí)間,還有startNow(立即執(zhí)行);
- 用scheduler綁定Job和Trigger;
- 執(zhí)行scheduler。
Quartz的使用是不是簡(jiǎn)單又清晰?Job是任務(wù),單一職責(zé),不做任何其他事情。Trigger負(fù)責(zé)執(zhí)行的頻率等等屬性。Scheduler負(fù)責(zé)按照Trigger的規(guī)則去執(zhí)行Job的內(nèi)容。各自部分的功能符合單一原則。
但是,到這里都不是分布式的方式,依然是單體架構(gòu)的。那么,Quartz如何做到分布式的呢?
Quartz如何分布式?
Quartz的分布式實(shí)現(xiàn)方式并不依賴(lài)其他分布式協(xié)調(diào)管理中間件完成,而是使用數(shù)據(jù)鎖來(lái)實(shí)現(xiàn)。使用數(shù)據(jù)做協(xié)調(diào)管理中間件的唯一的前提是:需要把集群的每臺(tái)機(jī)器時(shí)間校對(duì)一致。
Quartz數(shù)據(jù)庫(kù)核心表如下:
表名 | 功能描述 |
---|---|
QRTZ_CALENDARS | 存儲(chǔ)Quartz的Calendar信息 |
QRTZ_CRON_TRIGGERS | 存儲(chǔ)CronTrigger,包括Cron表達(dá)式和時(shí)區(qū)信息 |
QRTZ_FIRED_TRIGGERS | 存儲(chǔ)與已觸發(fā)的Trigger相關(guān)的狀態(tài)信息,以及相聯(lián)Job的執(zhí)行信息 |
QRTZ_PAUSED_TRIGGER_GRPS | 存儲(chǔ)已暫停的Trigger組的信息 |
QRTZ_SCHEDULER_STATE | 存儲(chǔ)少量的有關(guān)Scheduler的狀態(tài)信息,和別的Scheduler實(shí)例 |
QRTZ_LOCKS | 存儲(chǔ)程序的悲觀鎖的信息 |
QRTZ_JOB_DETAILS | 存儲(chǔ)每一個(gè)已配置的Job的詳細(xì)信息 |
QRTZ_JOB_LISTENERS | 存儲(chǔ)有關(guān)已配置的JobListener的信息 |
QRTZ_SIMPLE_TRIGGERS | 存儲(chǔ)簡(jiǎn)單的Trigger,包括重復(fù)次數(shù)、間隔、以及已觸的次數(shù) |
QRTZ_BLOG_TRIGGERS | Trigger作為Blob類(lèi)型存儲(chǔ) |
QRTZ_TRIGGER_LISTENERS | 存儲(chǔ)已配置的TriggerListener的信息 |
QRTZ_TRIGGERS | 存儲(chǔ)已配置的Trigger的信息 |
字體加粗的QRTZ_LOCKS表是一個(gè)悲觀鎖的存儲(chǔ)實(shí)現(xiàn),Quartz認(rèn)為每條記錄都可能會(huì)產(chǎn)生并發(fā)沖突。以上表的SQL可以在quartz目錄中找到:
找到自己喜歡的數(shù)據(jù)庫(kù)品牌,并創(chuàng)建好表即可。
跟著官方例子看源碼
我們從Hello的execute方法開(kāi)始,反著找,繼續(xù)看看分布式的方式如何實(shí)現(xiàn)。為什么反著找呢?因?yàn)檫@里是我們業(yè)務(wù)實(shí)現(xiàn)的主體內(nèi)容,Quartz框架最終必須要調(diào)用到這個(gè)execute的具體實(shí)現(xiàn)的。我們找到調(diào)用execute的地方有很多處:
從類(lèi)名來(lái)分析,DirectoryScanJob看著是目錄掃描任務(wù),F(xiàn)ileScanJob直譯是文件掃描任務(wù),SendMailJob是發(fā)送郵件任務(wù),最后只剩那個(gè)JobRunShell,畢竟翻譯過(guò)來(lái)叫“任務(wù)運(yùn)行の核心”啊。進(jìn)入JobRunShell,找到調(diào)用execute函數(shù)的部分,execute函數(shù)被包裹在一個(gè)一百三十多行長(zhǎng)又長(zhǎng)的run函數(shù)中:
public void run() { qs.addInternalSchedulerListener(this); try { // ...省略很多源代碼 do { // ...省略很多源代碼 try { begin(); } catch (SchedulerException se) { // ... 省略源代碼 } // ... 省略源代碼 try { log.debug("Calling execute on job " + jobDetail.getKey()); // 這里負(fù)責(zé)執(zhí)行job的execute函數(shù) job.execute(jec); endTime = System.currentTimeMillis(); } catch (JobExecutionException jee) { // ... 省略源代碼 } catch (Throwable e) { // ... 省略源代碼 } // ...省略很多源代碼 try { complete(true); } catch (SchedulerException se) { // ... 省略源代碼 } // ...省略很多源代碼 } while (true); } finally { qs.removeInternalSchedulerListener(this); } }
可以看到run中間的execute被夾在一個(gè)begin函數(shù)和comlete函數(shù)中,而begin和complete的實(shí)現(xiàn)是一個(gè)基于JTA事務(wù)的JTAJobRunShell的實(shí)現(xiàn)來(lái)完成的。JobRunShell是一個(gè)Runnable接口的實(shí)現(xiàn),那么剛剛的run方法,必定在某處啟用了線程(池)的start方法。
mars醬繼續(xù)跟蹤查找源代碼,在QuartzSchedulerThread中的run函數(shù)中,找到JobRunShell的調(diào)用部分:
@Override public void run() { int acquiresFailed = 0; while (!halted.get()) { // ...省略很多源代碼 // 源代碼279行 int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads(); // ...省略很多源代碼 if(availThreadCount > 0) { // ...省略很多源代碼 // 取下一個(gè)trigger,周期是30秒取一次 triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow()); // ...省略很多源代碼 // 觸發(fā)trigger List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers); // ...省略很多源代碼 // 釋放trigger,當(dāng)bndle的結(jié)果是null就釋放trigger if (bndle == null) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); continue; } // ...省略很多源代碼 JobRunShell shell = null; try { shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle); shell.initialize(qs); } catch (SchedulerException se) { qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); continue; } // 這里調(diào)用JobRunShell if (qsRsrcs.getThreadPool().runInThread(shell) == false) { // ...省略很多源代碼 } } } }
QuartzSchedulerThread的run函數(shù)就是核心處理流程了,qsRsrcs.getThreadPool().runInThread(shell)
內(nèi)部就根據(jù)具體的SimpleThreadPool或者ZeroSizeThreadPool來(lái)執(zhí)行run函數(shù),while循環(huán)基本就是不停的在輪詢(xún)不斷的去拿trigger,然后判斷trigger的時(shí)間是不是到了,再按照時(shí)間trigger的時(shí)間規(guī)則執(zhí)行Job,最后再標(biāo)記為完成、釋放trigger。
Trigger的處理
上面的邏輯都是通過(guò)qsRsrcs.getJobStore()
得到的對(duì)象去處理Trigger的,返回對(duì)象是JobStore。任意查看qsRsrcs.getJobStore()
調(diào)用的函數(shù),比如:releaseAcquiredTriggerJobStore,它的實(shí)現(xiàn)有兩個(gè)是比較重要的:一個(gè)是RAMJobStore,一個(gè)是JobStoreSupport。前者是RAM作為存儲(chǔ)介質(zhì),作者還特意寫(xiě)上了這樣一段注釋?zhuān)?/p>
This class implements a JobStore that utilizes RAM as its storage device.
As you should know, the ramification of this is that access is extrememly fast, but the data is completely volatile - therefore this JobStore should not be used if true persistence between program shutdowns is required.
這段英文的央視翻譯:
這個(gè)類(lèi)實(shí)現(xiàn)了一個(gè)使用RAM作為存儲(chǔ)設(shè)備的JobStore。
您應(yīng)該知道,這樣做的后果是訪問(wèn)速度非??欤菙?shù)據(jù)是完全不穩(wěn)定的——因此,如果需要在程序關(guān)閉之間實(shí)現(xiàn)真正的持久性,則不應(yīng)該使用這個(gè)JobStore。
而且內(nèi)存存儲(chǔ)也無(wú)法分布式處理吧?所以,mars醬選擇了觀看JobStoreSupport:
從import可以知道,這個(gè)玩意是連接了數(shù)據(jù)庫(kù)的,所以呢,acquireNextTriggers、triggersFired、releaseAcquiredTrigger這些方法負(fù)責(zé)具體trigger的相關(guān)操作,都最終會(huì)執(zhí)行到JobStoreSupport的第3844行的executeInNonManagedTXLock函數(shù):
/** * Execute the given callback having optionally acquired the given lock. * This uses the non-managed transaction connection. * * @param lockName The name of the lock to acquire, for example * "TRIGGER_ACCESS". If null, then no lock is acquired, but the * lockCallback is still executed in a non-managed transaction. */ protected <T> T executeInNonManagedTXLock( String lockName, TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException { boolean transOwner = false; Connection conn = null; try { if (lockName != null) { // If we aren't using db locks, then delay getting DB connection // until after acquiring the lock since it isn't needed. if (getLockHandler().requiresConnection()) { conn = getNonManagedTXConnection(); } transOwner = getLockHandler().obtainLock(conn, lockName); } if (conn == null) { conn = getNonManagedTXConnection(); } final T result = txCallback.execute(conn); try { commitConnection(conn); } catch (JobPersistenceException e) { rollbackConnection(conn); if (txValidator == null || !retryExecuteInNonManagedTXLock(lockName, new TransactionCallback<Boolean>() { @Override public Boolean execute(Connection conn) throws JobPersistenceException { return txValidator.validate(conn, result); } })) { throw e; } } Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion(); if(sigTime != null && sigTime >= 0) { signalSchedulingChangeImmediately(sigTime); } return result; } catch (JobPersistenceException e) { rollbackConnection(conn); throw e; } catch (RuntimeException e) { rollbackConnection(conn); throw new JobPersistenceException("Unexpected runtime exception: " + e.getMessage(), e); } finally { try { releaseLock(lockName, transOwner); } finally { cleanupConnection(conn); } } }
整體的過(guò)程簡(jiǎn)要說(shuō)明就是:獲取數(shù)據(jù)庫(kù)連接,給需要執(zhí)行的trigger加鎖,處理完之后再釋放鎖。
結(jié)合起來(lái)
結(jié)合前面的流程來(lái)看,一個(gè)調(diào)度器在執(zhí)行前如果涉及到分布式的情況,流程如下:
- 首先要獲取QUARTZ_LOCKS表中對(duì)應(yīng)的鎖(在
executeInNonManagedTXLock
函數(shù)的getLockHandler().obtainLock(conn, lockName)
中); - 獲取鎖后執(zhí)行QuartzSchedulerThread中的JobRunShell,完成任務(wù)的執(zhí)行;
- 最后QuartzSchedulerThread中調(diào)用
triggeredJobComplete
函數(shù),鎖被釋放,在executeInNonManagedTXLock
函數(shù)的releaseLock(lockName, transOwner)
中執(zhí)行;
集群中的每一個(gè)調(diào)度器實(shí)例都遵循這樣的操作流程。
總結(jié)
Quartz 是一款用于分布式系統(tǒng)的高性能調(diào)度框架,它采用了數(shù)據(jù)庫(kù)作為分布式鎖機(jī)制來(lái)保證同一時(shí)刻只有一個(gè) Scheduler 實(shí)例訪問(wèn)數(shù)據(jù)庫(kù)中的 Trigger。
在 Quartz 中,調(diào)度器線程會(huì)爭(zhēng)搶訪問(wèn)數(shù)據(jù)庫(kù)中的 Trigger,以確保在同一時(shí)刻只有一個(gè)調(diào)度器線程執(zhí)行某個(gè) Trigger 的操作。如果有多個(gè)調(diào)度器線程同時(shí)嘗試訪問(wèn)同一個(gè) Trigger,它們會(huì)相互等待對(duì)方釋放鎖。當(dāng)一個(gè)調(diào)度器線程獲得了鎖,它就可以訪問(wèn)數(shù)據(jù)庫(kù)并執(zhí)行相應(yīng)的操作。
另外,Quartz 還采用了悲觀鎖的策略來(lái)避免死鎖的發(fā)生。當(dāng)一個(gè)調(diào)度器線程嘗試獲取鎖時(shí),如果鎖已經(jīng)被其他線程占用,那么這個(gè)線程會(huì)等待,直到有線程釋放了鎖。如果在等待過(guò)程中沒(méi)有其他線程釋放鎖,那么這個(gè)線程就會(huì)一直等待下去,直到調(diào)度器重新分配了鎖。
總之,Quartz 的分布式調(diào)度原理是通過(guò)數(shù)據(jù)庫(kù)鎖和悲觀鎖來(lái)實(shí)現(xiàn)的,以保證同一時(shí)刻只有一個(gè)調(diào)度器線程訪問(wèn)數(shù)據(jù)庫(kù)中的 Trigger,從而提高系統(tǒng)的性能和可靠性。
以上就是一分鐘掌握J(rèn)ava Quartz定時(shí)任務(wù)的詳細(xì)內(nèi)容,更多關(guān)于Java Quartz定時(shí)任務(wù)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- SpringBoot中使用Quartz設(shè)置定時(shí)任務(wù)的實(shí)例詳解
- Springboot集成Quartz實(shí)現(xiàn)定時(shí)任務(wù)代碼實(shí)例
- SpringBoot集成quartz實(shí)現(xiàn)定時(shí)任務(wù)
- Springboot整合quartz實(shí)現(xiàn)多個(gè)定時(shí)任務(wù)實(shí)例
- SpringBoot+Quartz實(shí)現(xiàn)定時(shí)任務(wù)的代碼模版分享
- 如何對(duì)quartz定時(shí)任務(wù)設(shè)置結(jié)束時(shí)間
相關(guān)文章
Java中創(chuàng)建對(duì)象的5種方式總結(jié)
本篇文章主要介紹了Java中創(chuàng)建對(duì)象的5種方式總結(jié),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-02-02詳解MyBatis-Puls中saveBatch批量添加慢的問(wèn)題
本文主要介紹了詳解MyBatis-Puls中saveBatch批量添加慢的問(wèn)題,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-01-01Java8中的LocalDateTime和Date一些時(shí)間操作方法
這篇文章主要介紹了Java8中的LocalDateTime和Date一些時(shí)間操作方法,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-04-04ThreadLocal?在上下文傳值場(chǎng)景實(shí)踐源碼
這篇文章主要為大家介紹了ThreadLocal在上下文傳值場(chǎng)景下的實(shí)踐源碼,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步2022-03-03Spring?boot配置綁定和配置屬性校驗(yàn)的方式詳解
這篇文章主要介紹了Spring?boot配置綁定和配置屬性校驗(yàn),SpringBoot 提供了2 種方式進(jìn)行配置綁定,即使用 @ConfigurationProperties 注解和使用 @Value 注解,需要的朋友可以參考下2022-05-05Mybatis如何使用ognl表達(dá)式實(shí)現(xiàn)動(dòng)態(tài)sql
這篇文章主要介紹了Mybatis使用ognl表達(dá)式實(shí)現(xiàn)動(dòng)態(tài)sql的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06Java實(shí)現(xiàn)鼠標(biāo)拖放功能的方法
這篇文章主要介紹了Java實(shí)現(xiàn)鼠標(biāo)拖放功能的方法,很實(shí)用的功能,需要的朋友可以參考下2014-07-07