一分鐘掌握Java?Quartz定時任務(wù)
前言
前幾篇介紹了單體架構(gòu)的定時任務(wù)解決方式,但是現(xiàn)代軟件架構(gòu)由于業(yè)務(wù)復(fù)雜度高,業(yè)務(wù)的耦合性太強,已經(jīng)由單體架構(gòu)拆分成了分布式架構(gòu)。因此,定時任務(wù)的架構(gòu)也隨之修改。而Quartz是分布式定時任務(wù)解決方案中使用簡單,結(jié)構(gòu)清晰,且不依賴第三方分布式調(diào)度中間件的。上車,mars醬帶你車里細說~
角色介紹
Quartz入門使用的角色不多,三個角色足夠,分別是:
Scheduler:調(diào)度器。用來負責(zé)任務(wù)的調(diào)度;
Job:任務(wù)。這是一個接口,業(yè)務(wù)代碼繼承Job接口并實現(xiàn)它的execute方法,是業(yè)務(wù)執(zhí)行的主體部分;
Trigger: 觸發(fā)器。也是個接口,有兩個觸發(fā)器比較關(guān)鍵,一個是SimpleTrigger,另一個是CronTrigger。前者支持簡單的定時,比如:按時、按秒等;后者直接支持cron表達式。下面我們從官方的源代碼入手,看看Quartz如何做到分布式的。
官方例子
官方源代碼down下來之后,有個examples文件夾:

example1是入門級中最簡單的。就兩個java文件,一個HelloJob:
package org.quartz.examples.example1;
import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
/**
* <p>
* This is just a simple job that says "Hello" to the world.
* </p>
*
* @author Bill Kratzer
*/
public class HelloJob implements Job {
private static Logger _log = LoggerFactory.getLogger(HelloJob.class);
/**
* <p>
* Empty constructor for job initilization
* </p>
* <p>
* Quartz requires a public empty constructor so that the
* scheduler can instantiate the class whenever it needs.
* </p>
*/
public HelloJob() {
}
/**
* <p>
* Called by the <code>{@link org.quartz.Scheduler}</code> when a
* <code>{@link org.quartz.Trigger}</code> fires that is associated with
* the <code>Job</code>.
* </p>
*
* @throws JobExecutionException
* if there is an exception while executing the job.
*/
public void execute(JobExecutionContext context)
throws JobExecutionException {
// Say Hello to the World and display the date/time
_log.info("Hello World! - " + new Date());
}
}另一個SimpleExample:
package org.quartz.examples.example1;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerFactory;
import org.quartz.Trigger;
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import static org.quartz.DateBuilder.evenMinuteDate;
import static org.quartz.JobBuilder.newJob;
import static org.quartz.TriggerBuilder.newTrigger;
/**
* This Example will demonstrate how to start and shutdown the Quartz scheduler and how to schedule a job to run in
* Quartz.
*
* @author Bill Kratzer
*/
public class SimpleExample {
public void run() throws Exception {
Logger log = LoggerFactory.getLogger(SimpleExample.class);
log.info("------- Initializing ----------------------");
// 1. 創(chuàng)建一個scheduler
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler sched = sf.getScheduler();
log.info("------- Initialization Complete -----------");
// computer a time that is on the next round minute
Date runTime = evenMinuteDate(new Date());
log.info("------- Scheduling Job -------------------");
// 2. 指定一個job
JobDetail job = newJob(HelloJob.class).withIdentity("job1", "group1").build();
// 3. 指定一個trigger
Trigger trigger = newTrigger().withIdentity("trigger1", "group1").startAt(runTime).build();
// 4. 綁定job和trigger
sched.scheduleJob(job, trigger);
log.info(job.getKey() + " will run at: " + runTime);
// 5. 執(zhí)行
sched.start();
log.info("------- Started Scheduler -----------------");
// wait long enough so that the scheduler as an opportunity to
// run the job!
log.info("------- Waiting 65 seconds... -------------");
try {
// wait 65 seconds to show job
Thread.sleep(65L * 1000L);
// executing...
} catch (Exception e) {
//
}
// shut down the scheduler
log.info("------- Shutting Down ---------------------");
sched.shutdown(true);
log.info("------- Shutdown Complete -----------------");
}
public static void main(String[] args) throws Exception {
SimpleExample example = new SimpleExample();
example.run();
}
}整個SimpleExample只有五個步驟:
- 創(chuàng)建Scheduler,這是一個調(diào)度器,例子中使用調(diào)度器工廠來創(chuàng)建一個調(diào)度器;
- 創(chuàng)建一個Job。實際上Job就是那個HelloJob,但是這里把HelloJob丟給了JobDetail對象,Job接口本身只有一個execute函數(shù),沒有其他的屬性了,如果需要附加其他屬性,JobDetail就支持,比如我們需要往Job中傳遞參數(shù),JobDetail中提供了一個JobDataMap。當(dāng)Job在運行的時候,execute函數(shù)里面的就能獲取到JobDetail對象,并將設(shè)置的數(shù)據(jù)傳遞給Job接口的實現(xiàn);
- 創(chuàng)建一個Trigger。Trigger對象主責(zé)是任務(wù)的執(zhí)行時間,比如官方例子中的startAt函數(shù),就指定了具體的運行時間,還有startNow(立即執(zhí)行);
- 用scheduler綁定Job和Trigger;
- 執(zhí)行scheduler。
Quartz的使用是不是簡單又清晰?Job是任務(wù),單一職責(zé),不做任何其他事情。Trigger負責(zé)執(zhí)行的頻率等等屬性。Scheduler負責(zé)按照Trigger的規(guī)則去執(zhí)行Job的內(nèi)容。各自部分的功能符合單一原則。
但是,到這里都不是分布式的方式,依然是單體架構(gòu)的。那么,Quartz如何做到分布式的呢?
Quartz如何分布式?
Quartz的分布式實現(xiàn)方式并不依賴其他分布式協(xié)調(diào)管理中間件完成,而是使用數(shù)據(jù)鎖來實現(xiàn)。使用數(shù)據(jù)做協(xié)調(diào)管理中間件的唯一的前提是:需要把集群的每臺機器時間校對一致。
Quartz數(shù)據(jù)庫核心表如下:
| 表名 | 功能描述 |
|---|---|
| QRTZ_CALENDARS | 存儲Quartz的Calendar信息 |
| QRTZ_CRON_TRIGGERS | 存儲CronTrigger,包括Cron表達式和時區(qū)信息 |
| QRTZ_FIRED_TRIGGERS | 存儲與已觸發(fā)的Trigger相關(guān)的狀態(tài)信息,以及相聯(lián)Job的執(zhí)行信息 |
| QRTZ_PAUSED_TRIGGER_GRPS | 存儲已暫停的Trigger組的信息 |
| QRTZ_SCHEDULER_STATE | 存儲少量的有關(guān)Scheduler的狀態(tài)信息,和別的Scheduler實例 |
| QRTZ_LOCKS | 存儲程序的悲觀鎖的信息 |
| QRTZ_JOB_DETAILS | 存儲每一個已配置的Job的詳細信息 |
| QRTZ_JOB_LISTENERS | 存儲有關(guān)已配置的JobListener的信息 |
| QRTZ_SIMPLE_TRIGGERS | 存儲簡單的Trigger,包括重復(fù)次數(shù)、間隔、以及已觸的次數(shù) |
| QRTZ_BLOG_TRIGGERS | Trigger作為Blob類型存儲 |
| QRTZ_TRIGGER_LISTENERS | 存儲已配置的TriggerListener的信息 |
| QRTZ_TRIGGERS | 存儲已配置的Trigger的信息 |
字體加粗的QRTZ_LOCKS表是一個悲觀鎖的存儲實現(xiàn),Quartz認為每條記錄都可能會產(chǎn)生并發(fā)沖突。以上表的SQL可以在quartz目錄中找到:

找到自己喜歡的數(shù)據(jù)庫品牌,并創(chuàng)建好表即可。
跟著官方例子看源碼
我們從Hello的execute方法開始,反著找,繼續(xù)看看分布式的方式如何實現(xiàn)。為什么反著找呢?因為這里是我們業(yè)務(wù)實現(xiàn)的主體內(nèi)容,Quartz框架最終必須要調(diào)用到這個execute的具體實現(xiàn)的。我們找到調(diào)用execute的地方有很多處:

從類名來分析,DirectoryScanJob看著是目錄掃描任務(wù),F(xiàn)ileScanJob直譯是文件掃描任務(wù),SendMailJob是發(fā)送郵件任務(wù),最后只剩那個JobRunShell,畢竟翻譯過來叫“任務(wù)運行の核心”啊。進入JobRunShell,找到調(diào)用execute函數(shù)的部分,execute函數(shù)被包裹在一個一百三十多行長又長的run函數(shù)中:
public void run() {
qs.addInternalSchedulerListener(this);
try {
// ...省略很多源代碼
do {
// ...省略很多源代碼
try {
begin();
} catch (SchedulerException se) {
// ... 省略源代碼
}
// ... 省略源代碼
try {
log.debug("Calling execute on job " + jobDetail.getKey());
// 這里負責(zé)執(zhí)行job的execute函數(shù)
job.execute(jec);
endTime = System.currentTimeMillis();
} catch (JobExecutionException jee) {
// ... 省略源代碼
} catch (Throwable e) {
// ... 省略源代碼
}
// ...省略很多源代碼
try {
complete(true);
} catch (SchedulerException se) {
// ... 省略源代碼
}
// ...省略很多源代碼
} while (true);
} finally {
qs.removeInternalSchedulerListener(this);
}
}可以看到run中間的execute被夾在一個begin函數(shù)和comlete函數(shù)中,而begin和complete的實現(xiàn)是一個基于JTA事務(wù)的JTAJobRunShell的實現(xiàn)來完成的。JobRunShell是一個Runnable接口的實現(xiàn),那么剛剛的run方法,必定在某處啟用了線程(池)的start方法。
mars醬繼續(xù)跟蹤查找源代碼,在QuartzSchedulerThread中的run函數(shù)中,找到JobRunShell的調(diào)用部分:
@Override
public void run() {
int acquiresFailed = 0;
while (!halted.get()) {
// ...省略很多源代碼
// 源代碼279行
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
// ...省略很多源代碼
if(availThreadCount > 0) {
// ...省略很多源代碼
// 取下一個trigger,周期是30秒取一次
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
// ...省略很多源代碼
// 觸發(fā)trigger
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
// ...省略很多源代碼
// 釋放trigger,當(dāng)bndle的結(jié)果是null就釋放trigger
if (bndle == null) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
// ...省略很多源代碼
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
// 這里調(diào)用JobRunShell
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// ...省略很多源代碼
}
}
}
}QuartzSchedulerThread的run函數(shù)就是核心處理流程了,qsRsrcs.getThreadPool().runInThread(shell)內(nèi)部就根據(jù)具體的SimpleThreadPool或者ZeroSizeThreadPool來執(zhí)行run函數(shù),while循環(huán)基本就是不停的在輪詢不斷的去拿trigger,然后判斷trigger的時間是不是到了,再按照時間trigger的時間規(guī)則執(zhí)行Job,最后再標記為完成、釋放trigger。
Trigger的處理
上面的邏輯都是通過qsRsrcs.getJobStore()得到的對象去處理Trigger的,返回對象是JobStore。任意查看qsRsrcs.getJobStore()調(diào)用的函數(shù),比如:releaseAcquiredTriggerJobStore,它的實現(xiàn)有兩個是比較重要的:一個是RAMJobStore,一個是JobStoreSupport。前者是RAM作為存儲介質(zhì),作者還特意寫上了這樣一段注釋:
This class implements a JobStore that utilizes RAM as its storage device.
As you should know, the ramification of this is that access is extrememly fast, but the data is completely volatile - therefore this JobStore should not be used if true persistence between program shutdowns is required.
這段英文的央視翻譯:
這個類實現(xiàn)了一個使用RAM作為存儲設(shè)備的JobStore。
您應(yīng)該知道,這樣做的后果是訪問速度非常快,但是數(shù)據(jù)是完全不穩(wěn)定的——因此,如果需要在程序關(guān)閉之間實現(xiàn)真正的持久性,則不應(yīng)該使用這個JobStore。
而且內(nèi)存存儲也無法分布式處理吧?所以,mars醬選擇了觀看JobStoreSupport:

從import可以知道,這個玩意是連接了數(shù)據(jù)庫的,所以呢,acquireNextTriggers、triggersFired、releaseAcquiredTrigger這些方法負責(zé)具體trigger的相關(guān)操作,都最終會執(zhí)行到JobStoreSupport的第3844行的executeInNonManagedTXLock函數(shù):
/**
* Execute the given callback having optionally acquired the given lock.
* This uses the non-managed transaction connection.
*
* @param lockName The name of the lock to acquire, for example
* "TRIGGER_ACCESS". If null, then no lock is acquired, but the
* lockCallback is still executed in a non-managed transaction.
*/
protected <T> T executeInNonManagedTXLock(
String lockName,
TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException {
boolean transOwner = false;
Connection conn = null;
try {
if (lockName != null) {
// If we aren't using db locks, then delay getting DB connection
// until after acquiring the lock since it isn't needed.
if (getLockHandler().requiresConnection()) {
conn = getNonManagedTXConnection();
}
transOwner = getLockHandler().obtainLock(conn, lockName);
}
if (conn == null) {
conn = getNonManagedTXConnection();
}
final T result = txCallback.execute(conn);
try {
commitConnection(conn);
} catch (JobPersistenceException e) {
rollbackConnection(conn);
if (txValidator == null || !retryExecuteInNonManagedTXLock(lockName, new TransactionCallback<Boolean>() {
@Override
public Boolean execute(Connection conn) throws JobPersistenceException {
return txValidator.validate(conn, result);
}
})) {
throw e;
}
}
Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion();
if(sigTime != null && sigTime >= 0) {
signalSchedulingChangeImmediately(sigTime);
}
return result;
} catch (JobPersistenceException e) {
rollbackConnection(conn);
throw e;
} catch (RuntimeException e) {
rollbackConnection(conn);
throw new JobPersistenceException("Unexpected runtime exception: "
+ e.getMessage(), e);
} finally {
try {
releaseLock(lockName, transOwner);
} finally {
cleanupConnection(conn);
}
}
}整體的過程簡要說明就是:獲取數(shù)據(jù)庫連接,給需要執(zhí)行的trigger加鎖,處理完之后再釋放鎖。
結(jié)合起來
結(jié)合前面的流程來看,一個調(diào)度器在執(zhí)行前如果涉及到分布式的情況,流程如下:
- 首先要獲取QUARTZ_LOCKS表中對應(yīng)的鎖(在
executeInNonManagedTXLock函數(shù)的getLockHandler().obtainLock(conn, lockName)中); - 獲取鎖后執(zhí)行QuartzSchedulerThread中的JobRunShell,完成任務(wù)的執(zhí)行;
- 最后QuartzSchedulerThread中調(diào)用
triggeredJobComplete函數(shù),鎖被釋放,在executeInNonManagedTXLock函數(shù)的releaseLock(lockName, transOwner)中執(zhí)行;
集群中的每一個調(diào)度器實例都遵循這樣的操作流程。
總結(jié)
Quartz 是一款用于分布式系統(tǒng)的高性能調(diào)度框架,它采用了數(shù)據(jù)庫作為分布式鎖機制來保證同一時刻只有一個 Scheduler 實例訪問數(shù)據(jù)庫中的 Trigger。
在 Quartz 中,調(diào)度器線程會爭搶訪問數(shù)據(jù)庫中的 Trigger,以確保在同一時刻只有一個調(diào)度器線程執(zhí)行某個 Trigger 的操作。如果有多個調(diào)度器線程同時嘗試訪問同一個 Trigger,它們會相互等待對方釋放鎖。當(dāng)一個調(diào)度器線程獲得了鎖,它就可以訪問數(shù)據(jù)庫并執(zhí)行相應(yīng)的操作。
另外,Quartz 還采用了悲觀鎖的策略來避免死鎖的發(fā)生。當(dāng)一個調(diào)度器線程嘗試獲取鎖時,如果鎖已經(jīng)被其他線程占用,那么這個線程會等待,直到有線程釋放了鎖。如果在等待過程中沒有其他線程釋放鎖,那么這個線程就會一直等待下去,直到調(diào)度器重新分配了鎖。
總之,Quartz 的分布式調(diào)度原理是通過數(shù)據(jù)庫鎖和悲觀鎖來實現(xiàn)的,以保證同一時刻只有一個調(diào)度器線程訪問數(shù)據(jù)庫中的 Trigger,從而提高系統(tǒng)的性能和可靠性。
以上就是一分鐘掌握Java Quartz定時任務(wù)的詳細內(nèi)容,更多關(guān)于Java Quartz定時任務(wù)的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
詳解MyBatis-Puls中saveBatch批量添加慢的問題
本文主要介紹了詳解MyBatis-Puls中saveBatch批量添加慢的問題,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-01-01
Java8中的LocalDateTime和Date一些時間操作方法
這篇文章主要介紹了Java8中的LocalDateTime和Date一些時間操作方法,本文通過實例代碼給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-04-04
Mybatis如何使用ognl表達式實現(xiàn)動態(tài)sql
這篇文章主要介紹了Mybatis使用ognl表達式實現(xiàn)動態(tài)sql的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-06-06

