一分鐘掌握Java?ElasticJob分布式定時任務
前言
ElasticJob 是面向互聯(lián)網(wǎng)生態(tài)和海量任務的分布式調(diào)度解決方案。 它通過彈性調(diào)度、資源管控、以及任務治理的功能,打造一個適用于互聯(lián)網(wǎng)場景的分布式調(diào)度解決方案,并通過開放的架構(gòu)設(shè)計,提供多元化的任務生態(tài)。 它的各個產(chǎn)品使用統(tǒng)一的任務 API,開發(fā)者僅需一次開發(fā),即可隨意部署。
架構(gòu)
elasticjob由兩個相互獨立的子項目 ElasticJob-Lite 和 ElasticJob-Cloud 組成組成,這是ElasticJob-Lite 的架構(gòu)圖:
從架構(gòu)圖可以看到,左上角App1和App2兩個業(yè)務模塊中的Elastic-Job往zk中注冊了信息,右邊的Elastic-Job-Lite是監(jiān)聽了zk的,因此,整個任務的調(diào)度是由zk來完成的。下面的console通過Rest API去獲取zk中的信息,得到調(diào)度數(shù)據(jù)和日志,并存盤。
這是ElasticJob-Cloud的架構(gòu)圖:
ElasticJob-Cloud的調(diào)度是依賴Mesos的,從架構(gòu)圖的理解,Mesos和zk結(jié)合做好任務調(diào)度,再分發(fā)給Mesos的代理并執(zhí)行。
功能和特性
以下是ElasticJob的特性優(yōu)點
- 支持任務在分布式場景下的分片和高可用
- 能夠水平擴展任務的吞吐量和執(zhí)行效率
- 任務處理能力隨資源配備彈性伸縮
- 優(yōu)化任務和資源調(diào)度
- 相同任務聚合至相同的執(zhí)行器統(tǒng)一處理
- 動態(tài)調(diào)配追加資源至新分配的任務
- 失效轉(zhuǎn)移
- 錯過任務重新執(zhí)行
- 分布式環(huán)境下任務自動診斷和修復
- 基于有向無環(huán)圖 (DAG) 的任務依賴
- 基于有向無環(huán)圖 (DAG) 的任務項目依賴
- 可擴展的任務類型統(tǒng)一接口
- 支持豐富的任務類型庫--包括數(shù)據(jù)流、腳本、HTTP、文件、大數(shù)據(jù)
- 易于對接業(yè)務任務--兼容 Spring IOC
- 任務管控端
- 任務事件追蹤
- 注冊中心管理
入門角色
既然這么多優(yōu)點,我們就入門試試吧。入門elasticjob-lite也繼承了Quartz框架,同樣的很簡單,只要三個角色:
SimpleJob
:任務主體。如果用過Quartz,那么應該能夠理解這個,基本上和Quartz的Job接口類似,只要實現(xiàn)一個execute方法就行了,入門用這個就行;
JobConfiguration
:任務配置。同樣的可以理解為類似Quartz框架中的Trigger,最重要的就是配置任務的執(zhí)行頻率;
ScheduleJobBootstrap
:調(diào)度主體。這個一樣,參考Quartz框架中的Scheduler對象,它把任務和配置結(jié)合起來,任務按照配置中的頻率執(zhí)行。
寫個例子
我們創(chuàng)建這三種角色,首先創(chuàng)建任務主體:
import org.apache.shardingsphere.elasticjob.api.ShardingContext; import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; /** * (這個類的說明) * * @author mars醬 */ public class MarsSimpleJob implements SimpleJob { @Override public void execute(final ShardingContext shardingContext) { System.out.printf("Item: %s | Time: %s | Thread: %s | %s%n", shardingContext.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId(), "就是這么簡單~"); } }
再創(chuàng)建任務配置:
import org.apache.shardingsphere.elasticjob.api.JobConfiguration; import org.apache.shardingsphere.elasticjob.tracing.api.TracingConfiguration; import javax.sql.DataSource; import java.util.Objects; /** * (這個類的說明) * * @author mars醬 */ public class JobConfigurationBuilder { public static JobConfiguration buildJobConfiguration(String jobName, String cronExpression, TracingConfiguration<DataSource> tracingConfig) { JobConfiguration.Builder builder = JobConfiguration.newBuilder(jobName, 3) .cron(cronExpression) .shardingItemParameters("0=a,1=b,2=c"); if (Objects.nonNull(tracingConfig)) { builder.addExtraConfigurations(tracingConfig); } return builder.build(); } }
最后創(chuàng)建調(diào)度器,并執(zhí)行:
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap; import org.apache.shardingsphere.elasticjob.lite.example.job.simple.JavaSimpleJob; import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter; import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration; import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter; import org.apache.shardingsphere.elasticjob.tracing.api.TracingConfiguration; import javax.sql.DataSource; /** * (這個類的說明) * * @author mars醬 */ public final class SchedulerMain { private static final int EMBED_ZOOKEEPER_PORT = 4181; private static final String ZOOKEEPER_CONNECTION_STRING = "localhost:" + EMBED_ZOOKEEPER_PORT; private static final String JOB_NAMESPACE = "elasticjob-marsz-lite-java"; // CHECKSTYLE:OFF public static void main(final String[] args) { // 內(nèi)嵌zk服務 EmbedZookeeperServer.start(EMBED_ZOOKEEPER_PORT); CoordinatorRegistryCenter regCenter = setUpRegistryCenter(); // 簡單作業(yè) setUpSimpleJob(regCenter, null); } private static CoordinatorRegistryCenter setUpRegistryCenter() { ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(ZOOKEEPER_CONNECTION_STRING, JOB_NAMESPACE); CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(zkConfig); result.init(); return result; } private static void setUpSimpleJob(final CoordinatorRegistryCenter regCenter, final TracingConfiguration<DataSource> tracingConfig) { new ScheduleJobBootstrap(regCenter, new MarsSimpleJob(), JobConfigurationBuilder.buildJobConfiguration("marsSimpleJob", "0/5 * * * * ?", tracingConfig)).schedule(); } }
運行的效果:
截圖中Item
是處理的分片項,Thread
是當前線程的id,看到了Quartz框架的影子...。
任務執(zhí)行流程
既然能成功運行,我們看看內(nèi)部的處理邏輯吧。Mars醬本機并沒有安裝zk,所以copy了官方的例子,在程序運行前先啟用了一個內(nèi)嵌的zk服務:
EmbedZookeeperServer.start(EMBED_ZOOKEEPER_PORT);
這個只能在模擬的時候使用,千萬不能拿去放生產(chǎn)環(huán)境。接下來就是注冊中心的配置了,我們需要的是CoordinatorRegistryCenter對象:
private static CoordinatorRegistryCenter setUpRegistryCenter() { ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(ZOOKEEPER_CONNECTION_STRING, JOB_NAMESPACE); CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(zkConfig); result.init(); return result; }
好了,zk的部分處理完成,下面就是直接SchedulerJobBootstrap的部分了。
ScheduleJobBootstrap初始化
ScheduleJobBootstrap的初始化在例子中需要三個參數(shù):
CoordinatorRegistryCenter
:這個是協(xié)調(diào)用的注冊中心。是一個接口類,它的實現(xiàn)在ElasticJob里面只有一個ZookeeperRegisterCenter對象,未來是不是會支持其他的注冊中心呢?
ElasticJob
: Mars醬理解為任務對象。但是ElasticJob這個對象本身是個空接口,有兩個子接口SimpleJob
和DataflowJob
,前者Mars醬的理解是和Quartz中的Job對象類似,只要實現(xiàn)execute函數(shù)就行,后者有需要實現(xiàn)兩個接口,一個fetchData
獲取數(shù)據(jù),一個processData
處理數(shù)據(jù)。所以,ElasticJob這個接口留空,是為了還有其他擴展吧?
JobConfiguration
:彈性任務配置項。構(gòu)建這個對象不能直接設(shè)置,只能用buider的方式構(gòu)建。需要配置的屬性很多,但是核心屬性大致就是幾個:任務名稱、分片數(shù)、執(zhí)行頻率、分片參數(shù)。JobConfiguration的所有屬性如下:
屬性名 | 說明 |
---|---|
String jobName | 任務名稱 |
String cron | cron表達式 |
String timeZone | 任務運行的時區(qū) |
int shardingTotalCount | 任務分片總數(shù) |
String shardingItemParameters | 分片序號和參數(shù),多個鍵值對之間用逗號分隔,從0開始,但是不能大于或等于任務分片的總數(shù) |
String jobParameter | 任務自定義任務參數(shù) |
boolean monitorExecution | 是否監(jiān)聽執(zhí)行 |
boolean failover | 是否啟用故障轉(zhuǎn)移。開啟表示如果任務在一次任務執(zhí)行中途宕機,允許將該次未完成的任務在另一任務節(jié)點上補償執(zhí)行 |
boolean misfire | 不發(fā)火。哈哈,其實是是否開啟錯過任務重新執(zhí)行 |
int maxTimeDiffSeconds | 最大時差 |
int reconcileIntervalMinutes | 間隔時長 |
String jobShardingStrategyType | 任務分片策略類型,總共三種 |
String jobExecutorServiceHandlerType | 任務執(zhí)行程序服務處理程序類型 |
String jobErrorHandlerType | 任務錯誤處理類型 |
Collection jobListenerTypes | 任務監(jiān)聽類型 |
Collection extraConfigurations | 附加配置信息 |
String description | 任務描述 |
Properties props | 擴展用屬性值 |
boolean disabled | 是否禁用 |
boolean overwrite | 是否覆蓋 |
String label | 標簽 |
boolean staticSharding | 是否支持靜態(tài)分片 |
ScheduleJobBootstrap執(zhí)行
同樣的,例子中的MarsSimpleJob的execute函數(shù),最終會被ElasticJob框架調(diào)用,我們按照被執(zhí)行的反向順序往上找。MarsSimpleJob是繼承SimpleJob
的, 而SimpleJob
的execute函數(shù)是被SimpleJobExecutor
所調(diào)用:
/** * Simple job executor. */ public final class SimpleJobExecutor implements ClassedJobItemExecutor<SimpleJob> { @Override public void process(final SimpleJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade, final ShardingContext shardingContext) { // 這里調(diào)用execute函數(shù) elasticJob.execute(shardingContext); } @Override public Class<SimpleJob> getElasticJobClass() { return SimpleJob.class; } }
再繼續(xù)往上找,process的核心流程就是在ElasticJobExecutor
里面了,調(diào)用process的部分在ElasticJobExcutor
中幾個重載的process方法調(diào)用的,兩個process函數(shù)完成不同的功能,調(diào)用SimpleExecutor的process部分是這樣:
@SuppressWarnings("unchecked") private void process(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) { jobFacade.postJobExecutionEvent(startEvent); log.trace("Job '{}' executing, item is: '{}'.", jobConfig.getJobName(), item); JobExecutionEvent completeEvent; try { // 這里調(diào)用SimpleJobExecutor的process jobItemExecutor.process(elasticJob, jobConfig, jobFacade, shardingContexts.createShardingContext(item)); completeEvent = startEvent.executionSuccess(); log.trace("Job '{}' executed, item is: '{}'.", jobConfig.getJobName(), item); jobFacade.postJobExecutionEvent(completeEvent); // CHECKSTYLE:OFF } catch (final Throwable cause) { // CHECKSTYLE:ON completeEvent = startEvent.executionFailure(ExceptionUtils.transform(cause)); jobFacade.postJobExecutionEvent(completeEvent); itemErrorMessages.put(item, ExceptionUtils.transform(cause)); JobErrorHandler jobErrorHandler = executorContext.get(JobErrorHandler.class); jobErrorHandler.handleException(jobConfig.getJobName(), cause); } }
上面這個process負責最終任務的執(zhí)行部分,由JobItemExecutor對象調(diào)用,SimpleJobExecutor被JobItemExecutor接口定義。整個這個proces由guava包的EventBus處理消息事件,執(zhí)行之前有startEvent,執(zhí)行完成有completeEvent,異常也有對應的失敗event,方面架構(gòu)圖中存盤事件日志、ELK日志收集動作。
調(diào)用這個process的部分,由另一個process完成,長這樣的:
private void process(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final ExecutionSource executionSource) { Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet(); if (1 == items.size()) { int item = shardingContexts.getShardingItemParameters().keySet().iterator().next(); JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, item); process(jobConfig, shardingContexts, item, jobExecutionEvent); return; } CountDownLatch latch = new CountDownLatch(items.size()); for (int each : items) { JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, each); ExecutorService executorService = executorContext.get(ExecutorService.class); if (executorService.isShutdown()) { return; } // 提交給線程池執(zhí)行 executorService.submit(() -> { try { process(jobConfig, shardingContexts, each, jobExecutionEvent); } finally { latch.countDown(); } }); } try { latch.await(); } catch (final InterruptedException ex) { Thread.currentThread().interrupt(); } }
上面這個process負責把分片參數(shù)依次組裝好,設(shè)置好JobExecutionEvent中的ip、主機名等參數(shù),然后放入線程池中去執(zhí)行。再往上,看現(xiàn)在這個process被調(diào)用的部分:
private void execute(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final ExecutionSource executionSource) { if (shardingContexts.getShardingItemParameters().isEmpty()) { jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobConfig.getJobName())); return; } // 往注冊中心注冊ShardingContexts信息 jobFacade.registerJobBegin(shardingContexts); String taskId = shardingContexts.getTaskId(); // 發(fā)送跟蹤日志,標記任務正在運行 jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, ""); try { // 調(diào)用process process(jobConfig, shardingContexts, executionSource); } finally { // TODO Consider increasing the status of job failure, and how to handle the overall loop of job failure // 告知注冊中心任務完成 jobFacade.registerJobCompleted(shardingContexts); if (itemErrorMessages.isEmpty()) { // 沒有失敗信息,通知任務完成 jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, ""); } else { // 否則通知失敗 jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString()); itemErrorMessages.clear(); } } }
方法execute從注冊中心注冊ShardingContext信息,并發(fā)送跟蹤日志事件,然后調(diào)用process,最后發(fā)送跟蹤消息標記任務完成。再有一個重載的execute方法調(diào)用上面這個execute方法,如下:
public void execute() { // job的配置信息 JobConfiguration jobConfig = jobFacade.loadJobConfiguration(true); executorContext.reloadIfNecessary(jobConfig); JobErrorHandler jobErrorHandler = executorContext.get(JobErrorHandler.class); try { jobFacade.checkJobExecutionEnvironment(); } catch (final JobExecutionEnvironmentException cause) { jobErrorHandler.handleException(jobConfig.getJobName(), cause); } // 這里有玄機 ShardingContexts shardingContexts = jobFacade.getShardingContexts(); // 發(fā)送時間消息總線 jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobConfig.getJobName())); if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) { jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format( "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobConfig.getJobName(), shardingContexts.getShardingItemParameters().keySet())); return; } try { // 任務執(zhí)行的前置流程 jobFacade.beforeJobExecuted(shardingContexts); //CHECKSTYLE:OFF } catch (final Throwable cause) { //CHECKSTYLE:ON jobErrorHandler.handleException(jobConfig.getJobName(), cause); } // 調(diào)用上面的execute方法 execute(jobConfig, shardingContexts, ExecutionSource.NORMAL_TRIGGER); while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) { jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet()); execute(jobConfig, shardingContexts, ExecutionSource.MISFIRE); } // 故障轉(zhuǎn)移 jobFacade.failoverIfNecessary(); try { // 任務執(zhí)行的后置流程 jobFacade.afterJobExecuted(shardingContexts); //CHECKSTYLE:OFF } catch (final Throwable cause) { //CHECKSTYLE:ON jobErrorHandler.handleException(jobConfig.getJobName(), cause); } }
這個execute就由Quartz的JobRunShell調(diào)用了,Quartz的調(diào)用的過程在 Java | 一分鐘掌握定時任務 | 6 - Quartz定時任務里面還好Mars醬分析過了。
執(zhí)行流程總結(jié)
那么,追蹤完源代碼,大致的流程就應該是如下:
1.組裝基本參數(shù)(任務、頻率等) -> 2. ScheduleJobBootstrap初始化 -> 3.配置任務屬性 -> 4.設(shè)置各種facade -> 5.初始化ElasticJobExecutor -> 6.調(diào)用scheduler執(zhí)行任務 -> 7.獲取任務執(zhí)行器(SimpleJobExecutor) -> 8.各種校驗邏輯 -> 9. 處理分片參數(shù) -> 10. 設(shè)置任務為運行狀態(tài) -> 11. 提交任務到線程池 -> 12.執(zhí)行任務 -> 13.處理任務后續(xù)邏輯
任務的調(diào)度過程由zk完成,取決于zk的任務調(diào)度策略吧?如果一臺機器的定時運行時掛了,zk會轉(zhuǎn)移到另一臺運行中的機器中去。-- Mars醬
分片的策略
任務的分片策略,用于將任務在分布式環(huán)境下分解成為任務使用。
SPI 名稱 | 詳細說明 |
---|---|
JobShardingStrategy | 作業(yè)分片策略接口 |
已知實現(xiàn)類 | 詳細說明 |
---|---|
AverageAllocationJobShardingStrategy | 根據(jù)分片項平均分片 |
OdevitySortByNameJobShardingStrategy | 根據(jù)任務名稱哈希值的奇偶數(shù)決定按照任務服務器 IP 升序或是降序的方式分片 |
RoundRobinByNameJobShardingStrategy | 根據(jù)任務名稱輪詢分片 |
那么任務的分片策略在哪里使用的呢?就在代碼中注釋的“這里有玄機”那行。在getShardingContexts的方法中會調(diào)用ShardingService,它會去獲取JobConfiguration
中配置的分片策略方式:
public void shardingIfNecessary() { List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances(); if (!isNeedSharding() || availableJobInstances.isEmpty()) { return; } if (!leaderService.isLeaderUntilBlock()) { blockUntilShardingCompleted(); return; } waitingOtherShardingItemCompleted(); JobConfiguration jobConfig = configService.load(false); int shardingTotalCount = jobConfig.getShardingTotalCount(); log.debug("Job '{}' sharding begin.", jobName); jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, ""); resetShardingInfo(shardingTotalCount); // 獲取任務分片策略 JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(jobConfig.getJobShardingStrategyType()); jobNodeStorage.executeInTransaction(getShardingResultTransactionOperations(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount))); log.debug("Job '{}' sharding complete.", jobName); }
如果不設(shè)置,默認使用的是平均分片策略。
以上就是一分鐘掌握Java ElasticJob分布式定時任務的詳細內(nèi)容,更多關(guān)于Java ElasticJob定時任務的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
詳解springMVC兩種方式實現(xiàn)多文件上傳及效率比較
本篇文章介紹了springMVC兩種方式實現(xiàn)多文件上傳及效率比較。springMVC實現(xiàn)多文件上傳有兩種,一種是字節(jié)流的方式進行文件上傳,另外一種是使用springMVC包裝好的解析器進行上傳,有興趣的可以了解一下。2016-12-12使用java swing實現(xiàn)qq登錄界面示例分享
這篇文章主要介紹了使用java swing實現(xiàn)qq登錄界面示例,需要的朋友可以參考下2014-04-04SpringBoot 使用Mongo的GridFs實現(xiàn)分布式文件存儲操作
這篇文章主要介紹了Spring Boot 使用Mongo的GridFs實現(xiàn)分布式文件存儲操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-10-10java中調(diào)用GDAL DLL的實現(xiàn)方法
本篇文章是對java中調(diào)用GDAL DLL的實現(xiàn)方法進行了詳細的分析介紹,需要的朋友參考下2013-05-05Java8的DateTimeFormatter與SimpleDateFormat的區(qū)別詳解
這篇文章主要介紹了Java8的DateTimeFormatter與SimpleDateFormat的區(qū)別詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2021-03-03