DolphinScheduler容錯Master源碼分析
引言
最近產(chǎn)品上選擇使用DolphinScheduler作為大數(shù)據(jù)的任務(wù)調(diào)度系統(tǒng)。作為分布式調(diào)度系統(tǒng)DolphinScheduler采用了去中心化的多Master和多Worker服務(wù)對等架構(gòu),可以避免單Master壓力過大。在這種架構(gòu)下,如果一個Master或者Worker掛掉,那么相應(yīng)的容錯處理則必不可少。下面會介紹其具體容錯處理的方式以及相關(guān)源碼的分析。
容錯設(shè)計
詳細(xì)的設(shè)計結(jié)構(gòu)我們可以參考官方文檔。
服務(wù)容錯設(shè)計依賴于ZooKeeper的Watcher機(jī)制,Master會監(jiān)控其他Master和Worker的目錄,如果監(jiān)聽到remove事件,則會根據(jù)具體的業(yè)務(wù)邏輯進(jìn)行流程實例容錯或者任務(wù)實例容錯。
Zookeeper中主要的目錄名稱如下,這里先簡單了解一下,后面源碼中會一一使用到:
public static final String REGISTRY_DOLPHINSCHEDULER_MASTERS = "/nodes/master"; public static final String REGISTRY_DOLPHINSCHEDULER_WORKERS = "/nodes/worker"; public static final String REGISTRY_DOLPHINSCHEDULER_NODE = "/nodes";
官方架構(gòu)圖:
Master容錯源碼分析
Master啟動入口
MasterServer的run方法中會有容錯的入口:
@PostConstruct public void run() throws SchedulerException { // init rpc server this.masterRPCServer.start(); // install task plugin this.taskPluginManager.loadPlugin(); // self tolerant this.masterRegistryClient.start(); this.masterRegistryClient.setRegistryStoppable(this); this.masterSchedulerBootstrap.init(); this.masterSchedulerBootstrap.start(); this.eventExecuteService.start(); this.failoverExecuteThread.start(); this.schedulerApi.start(); ...... }
Master的啟動主要有幾個步驟:
基于netty的rpc服務(wù)端啟動、任務(wù)插件的加載、容錯代碼的初始化、任務(wù)調(diào)度的初始化、任務(wù)事件處理線程的啟動。
這里我們只關(guān)心容錯相關(guān)的代碼 masterRegistryClient.start():
public void start() { try { this.masterHeartBeatTask = new MasterHeartBeatTask(masterConfig, registryClient); // master registry registry(); registryClient.addConnectionStateListener( new MasterConnectionStateListener(masterConfig, registryClient, masterConnectStrategy)); registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener()); } catch (Exception e) { throw new RegistryException("Master registry client start up error", e); } }
在上面的start方法中主要做了三件事,我們一個個來看。
Master啟動注冊信息
注冊當(dāng)前master信息到Zookeeper,并且啟動了一個心跳任務(wù)定時更新master的信息到Zookeeper。
/** * Registry the current master server itself to registry. */ void registry() { logger.info("Master node : {} registering to registry center", masterConfig.getMasterAddress()); String masterRegistryPath = masterConfig.getMasterRegistryPath(); // remove before persist registryClient.remove(masterRegistryPath); registryClient.persistEphemeral(masterRegistryPath, JSONUtils.toJsonString(masterHeartBeatTask.getHeartBeat())); while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.MASTER)) { logger.warn("The current master server node:{} cannot find in registry", NetUtils.getHost()); ThreadUtils.sleep(SLEEP_TIME_MILLIS); } // sleep 1s, waiting master failover remove ThreadUtils.sleep(SLEEP_TIME_MILLIS); masterHeartBeatTask.start(); logger.info("Master node : {} registered to registry center successfully", masterConfig.getMasterAddress()); }
master在ZK注冊的路徑如下:
masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort())); //nodes/master+ "/" +ip:listenPort masterConfig.setMasterRegistryPath(REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + masterConfig.getMasterAddress());
注冊的信息主要是Master自身的健康狀態(tài)如下,并且會定時更新:
@Override public MasterHeartBeat getHeartBeat() { return MasterHeartBeat.builder() .startupTime(ServerLifeCycleManager.getServerStartupTime()) .reportTime(System.currentTimeMillis()) .cpuUsage(OSUtils.cpuUsage()) .loadAverage(OSUtils.loadAverage()) .availablePhysicalMemorySize(OSUtils.availablePhysicalMemorySize()) .maxCpuloadAvg(masterConfig.getMaxCpuLoadAvg()) .reservedMemory(masterConfig.getReservedMemory()) .memoryUsage(OSUtils.memoryUsage()) .diskAvailable(OSUtils.diskAvailable()) .processId(processId) .build(); }
Master監(jiān)聽和訂閱集群狀態(tài)
監(jiān)聽zk客戶端與集群連接的狀態(tài)變化
@Override public void addConnectionStateListener(ConnectionListener listener) { client.getConnectionStateListenable().addListener(new ZookeeperConnectionStateListener(listener)); }
當(dāng)客戶端和服務(wù)端因為某些原因重連后會調(diào)用MasterConnectionStateListener相關(guān)的監(jiān)聽事件處理:
@Override public void onUpdate(ConnectionState state) { logger.info("Master received a {} event from registry, the current server state is {}", state, ServerLifeCycleManager.getServerStatus()); switch (state) { case CONNECTED: break; case SUSPENDED: break; case RECONNECTED: masterConnectStrategy.reconnect(); break; case DISCONNECTED: masterConnectStrategy.disconnect(); break; default: } }
訂閱Master、Worker注冊目錄頂級目錄/nodes相關(guān)的事件,這里主要訂閱Remove事件。
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener()); @Override public boolean subscribe(String path, SubscribeListener listener) { final TreeCache treeCache = treeCacheMap.computeIfAbsent(path, $ -> new TreeCache(client, path)); treeCache.getListenable().addListener(($, event) -> listener.notify(new EventAdaptor(event, path))); try { treeCache.start(); } catch (Exception e) { treeCacheMap.remove(path); throw new RegistryException("Failed to subscribe listener for key: " + path, e); } return true; }
這里是基于curator客戶端中的TreeCache來實現(xiàn)訂閱,它允許對ZK中某個路徑的數(shù)據(jù)和路徑變更以及其下所有子孫節(jié)點的數(shù)據(jù)和路徑變更進(jìn)行監(jiān)聽。ZK監(jiān)聽數(shù)據(jù)變化后最終會回調(diào)到MasterRegistryDataListener中的notify方法:
public void notify(Event event) { //這里的path就是/node目錄下發(fā)生變化的path信息,可能是/nodes/master/**或者/nodes/worker/** final String path = event.path(); if (Strings.isNullOrEmpty(path)) { return; } //monitor master if (path.startsWith(REGISTRY_DOLPHINSCHEDULER_MASTERS + Constants.SINGLE_SLASH)) { handleMasterEvent(event); } else if (path.startsWith(REGISTRY_DOLPHINSCHEDULER_WORKERS + Constants.SINGLE_SLASH)) { //monitor worker handleWorkerEvent(event); } }
在notify中會針對ZK中/nodes/master/或者/nodes/worker/ 路徑的變動做不同的處理。
Master容錯流程
這里先來看一下當(dāng)ZK中/nodes/master/路徑下發(fā)生了變動之后做了哪些事情:
private void handleMasterEvent(Event event) { final String path = event.path(); switch (event.type()) { case ADD: logger.info("master node added : {}", path); break; case REMOVE: masterRegistryClient.removeMasterNodePath(path, NodeType.MASTER, true); break; default: break; } }
如果ZK監(jiān)聽到有master node path被刪除,則說明有master節(jié)點異常,此時需要對其上面的任務(wù)進(jìn)行容錯。
public void removeMasterNodePath(String path, NodeType nodeType, boolean failover) { logger.info("{} node deleted : {}", nodeType, path); if (StringUtils.isEmpty(path)) { logger.error("server down error: empty path: {}, nodeType:{}", path, nodeType); return; } //獲取異常節(jié)點的ip:port String serverHost = registryClient.getHostByEventDataPath(path); if (StringUtils.isEmpty(serverHost)) { logger.error("server down error: unknown path: {}, nodeType:{}", path, nodeType); return; } try { if (!registryClient.exists(path)) { logger.info("path: {} not exists", path); } // failover server if (failover) { failoverService.failoverServerWhenDown(serverHost, nodeType); } } catch (Exception e) { logger.error("{} server failover failed, host:{}", nodeType, serverHost, e); } } /** * failover server when server down * * @param serverHost server host * @param nodeType node type */ public void failoverServerWhenDown(String serverHost, NodeType nodeType) { switch (nodeType) { case MASTER: LOGGER.info("Master failover starting, masterServer: {}", serverHost); masterFailoverService.failoverMaster(serverHost); LOGGER.info("Master failover finished, masterServer: {}", serverHost); break; case WORKER: LOGGER.info("Worker failover staring, workerServer: {}", serverHost); workerFailoverService.failoverWorker(serverHost); LOGGER.info("Worker failover finished, workerServer: {}", serverHost); break; default: break; } }
Master的容錯最終會調(diào)用masterFailoverService.failoverMaster(serverHost);這里的serverHost就是異常Master節(jié)點的ip:port信息。
因為這里可能會有多個Master監(jiān)聽到異常Master節(jié)點掛掉的事件,所以會先拿到分布式鎖然后進(jìn)行容錯的操作,拿到鎖的Master會開始進(jìn)行容錯:
public void failoverMaster(String masterHost) { String failoverPath = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS + "/" + masterHost; try { registryClient.getLock(failoverPath); doFailoverMaster(masterHost); } catch (Exception e) { LOGGER.error("Master server failover failed, host:{}", masterHost, e); } finally { registryClient.releaseLock(failoverPath); } }
整個容錯的過程大致如下:
Failover master, will failover process instance and associated task instance. When the process instance belongs to the given masterHost and the restartTime is before the current server start up time,then the process instance will be failovered.
1-首先會會根據(jù)異常節(jié)點的masterHost去DB中查詢出所有需要容錯的工作流實例和任務(wù)實例。
2-其次會比較工作流實例啟動時間和當(dāng)前masterHost節(jié)點啟動時間,在服務(wù)啟動時間之后的則跳過容錯。如果當(dāng)前節(jié)點還沒有重新啟動,那么就需要容錯所有的實例。
3-變量工作流實例下所有的任務(wù)實例,進(jìn)行容錯處理
4-將工作流實例的Host更新為NULL,并且新增RECOVER_TOLERANCE_FAULT_PROCESS類型的Command到command表中。
private void doFailoverMaster(@NonNull String masterHost) { StopWatch failoverTimeCost = StopWatch.createStarted(); Optional<Date> masterStartupTimeOptional = getServerStartupTime(registryClient.getServerList(NodeType.MASTER), masterHost); //1-根據(jù) 異常節(jié)點的masterHost查詢所有工作流實例 List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances( masterHost); if (CollectionUtils.isEmpty(needFailoverProcessInstanceList)) { return; } LOGGER.info( "Master[{}] failover starting there are {} workflowInstance may need to failover, will do a deep check, workflowInstanceIds: {}", masterHost, needFailoverProcessInstanceList.size(), needFailoverProcessInstanceList.stream().map(ProcessInstance::getId).collect(Collectors.toList())); for (ProcessInstance processInstance : needFailoverProcessInstanceList) { try { LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId()); LOGGER.info("WorkflowInstance failover starting"); //2-校驗工作流實例啟動時間是否滿足容錯條件 if (!checkProcessInstanceNeedFailover(masterStartupTimeOptional, processInstance)) { LOGGER.info("WorkflowInstance doesn't need to failover"); continue; } // todo: use batch query ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); processInstance.setProcessDefinition(processDefinition); int processInstanceId = processInstance.getId(); List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstanceId); //3-對任務(wù)實例進(jìn)行容錯,具體在failoverTaskInstance方法中 for (TaskInstance taskInstance : taskInstanceList) { try { LoggerUtils.setTaskInstanceIdMDC(taskInstance.getId()); LOGGER.info("TaskInstance failover starting"); if (!checkTaskInstanceNeedFailover(taskInstance)) { LOGGER.info("The taskInstance doesn't need to failover"); continue; } failoverTaskInstance(processInstance, taskInstance); LOGGER.info("TaskInstance failover finished"); } finally { LoggerUtils.removeTaskInstanceIdMDC(); } } //4-insert a failover command ProcessInstanceMetrics.incProcessInstanceByState("failover"); // updateProcessInstance host is null to mark this processInstance has been failover // and insert a failover command processInstance.setHost(Constants.NULL); processService.processNeedFailoverProcessInstances(processInstance); LOGGER.info("WorkflowInstance failover finished"); } finally { LoggerUtils.removeWorkflowInstanceIdMDC(); } } failoverTimeCost.stop(); LOGGER.info("Master[{}] failover finished, useTime:{}ms", masterHost, failoverTimeCost.getTime(TimeUnit.MILLISECONDS)); }
構(gòu)造Command,類型為RECOVER_TOLERANCE_FAULT_PROCESS
/** * process need failover process instance * * @param processInstance processInstance */ @Override @Transactional public void processNeedFailoverProcessInstances(ProcessInstance processInstance) { // 1 update processInstance host is null processInstance.setHost(Constants.NULL); processInstanceMapper.updateById(processInstance); ProcessDefinition processDefinition = findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); // 2 insert into recover command Command cmd = new Command(); cmd.setProcessDefinitionCode(processDefinition.getCode()); cmd.setProcessDefinitionVersion(processDefinition.getVersion()); cmd.setProcessInstanceId(processInstance.getId()); cmd.setCommandParam( String.format("{\"%s\":%d}", CMD_PARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId())); cmd.setExecutorId(processInstance.getExecutorId()); cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS); cmd.setProcessInstancePriority(processInstance.getProcessInstancePriority()); createCommand(cmd); }
工作流實例中任務(wù)實例的處理,設(shè)置狀態(tài)TaskExecutionStatus.NEED_FAULT_TOLERANCE
private void failoverTaskInstance(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) { TaskMetrics.incTaskInstanceByState("failover"); boolean isMasterTask = TaskProcessorFactory.isMasterTask(taskInstance.getTaskType()); taskInstance.setProcessInstance(processInstance); if (!isMasterTask) { LOGGER.info("The failover taskInstance is not master task"); TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get() .buildTaskInstanceRelatedInfo(taskInstance) .buildProcessInstanceRelatedInfo(processInstance) .buildProcessDefinitionRelatedInfo(processInstance.getProcessDefinition()) .create(); if (masterConfig.isKillYarnJobWhenTaskFailover()) { // only kill yarn job if exists , the local thread has exited LOGGER.info("TaskInstance failover begin kill the task related yarn job"); ProcessUtils.killYarnJob(logClient, taskExecutionContext); } // kill worker task, When the master failover and worker failover happened in the same time, // the task may not be failover if we don't set NEED_FAULT_TOLERANCE. // This can be improved if we can load all task when cache a workflowInstance in memory sendKillCommandToWorker(taskInstance); } else { LOGGER.info("The failover taskInstance is a master task"); } taskInstance.setState(TaskExecutionStatus.NEED_FAULT_TOLERANCE); processService.saveTaskInstance(taskInstance); }
容錯工作流被重新調(diào)度
前面介紹了在Master啟動之后,會啟動一個MasterSchedulerBootstrap線程對任務(wù)進(jìn)行調(diào)度。在DolphinScheduler中不管是定時任務(wù),還是單次任務(wù),或者是容錯的任務(wù),如果到了需要執(zhí)行的時刻都會生成一個command命令插入到command表中。而MasterSchedulerBootstrap這個線程的作用就是不斷從command表中獲取需要被執(zhí)行的command,來進(jìn)行調(diào)度。
/** * run of MasterSchedulerService */ @Override public void run() { while (!ServerLifeCycleManager.isStopped()) { try { if (!ServerLifeCycleManager.isRunning()) { // the current server is not at running status, cannot consume command. logger.warn("The current server {} is not at running status, cannot consumes commands.", this.masterAddress); Thread.sleep(Constants.SLEEP_TIME_MILLIS); } // todo: if the workflow event queue is much, we need to handle the back pressure boolean isOverload = OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory()); if (isOverload) { logger.warn("The current server {} is overload, cannot consumes commands.", this.masterAddress); MasterServerMetrics.incMasterOverload(); Thread.sleep(Constants.SLEEP_TIME_MILLIS); continue; } List<Command> commands = findCommands(); if (CollectionUtils.isEmpty(commands)) { // indicate that no command ,sleep for 1s Thread.sleep(Constants.SLEEP_TIME_MILLIS); continue; } List<ProcessInstance> processInstances = command2ProcessInstance(commands); if (CollectionUtils.isEmpty(processInstances)) { // indicate that the command transform to processInstance error, sleep for 1s Thread.sleep(Constants.SLEEP_TIME_MILLIS); continue; } MasterServerMetrics.incMasterConsumeCommand(commands.size()); processInstances.forEach(processInstance -> { try { LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId()); if (processInstanceExecCacheManager.contains(processInstance.getId())) { logger.error( "The workflow instance is already been cached, this case shouldn't be happened"); } WorkflowExecuteRunnable workflowRunnable = new WorkflowExecuteRunnable(processInstance, processService, processInstanceDao, nettyExecutorManager, processAlertManager, masterConfig, stateWheelExecuteThread, curingGlobalParamsService); processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable); workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW, processInstance.getId())); } finally { LoggerUtils.removeWorkflowInstanceIdMDC(); } }); } catch (InterruptedException interruptedException) { logger.warn("Master schedule bootstrap interrupted, close the loop", interruptedException); Thread.currentThread().interrupt(); break; } catch (Exception e) { logger.error("Master schedule workflow error", e); // sleep for 1s here to avoid the database down cause the exception boom ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); } } }
上面的代碼每次會從DB中批量獲取10個(默認(rèn))command。然后構(gòu)造成工作流實例進(jìn)行遍歷處理。
這里需要關(guān)注一下command2ProcessInstance方法,會將獲取到需要執(zhí)行的command轉(zhuǎn)為工作流實例。在其內(nèi)部最終會調(diào)用handleCommand方法中的constructProcessInstance方法來構(gòu)造工作流實例。在其內(nèi)部會將當(dāng)前處理此實例的host節(jié)點信息設(shè)置到實例信息中,并且會對commandType是RECOVER_TOLERANCE_FAULT_PROCESS容錯類型的情況進(jìn)行設(shè)置(省略部分代碼):
protected @Nullable ProcessInstance constructProcessInstance(Command command, String host) throws CronParseException, CodeGenerateException { ProcessInstance processInstance; ProcessDefinition processDefinition; CommandType commandType = command.getCommandType(); processDefinition = this.findProcessDefinition(command.getProcessDefinitionCode(), command.getProcessDefinitionVersion()); if (processDefinition == null) { logger.error("cannot find the work process define! define code : {}", command.getProcessDefinitionCode()); throw new IllegalArgumentException("Cannot find the process definition for this workflowInstance"); } Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam()); int processInstanceId = command.getProcessInstanceId(); if (processInstanceId == 0) { processInstance = generateNewProcessInstance(processDefinition, command, cmdParam); } else { processInstance = this.findProcessInstanceDetailById(processInstanceId).orElse(null); if (processInstance == null) { return null; } } if (cmdParam != null) { ...... } // reset command parameter if (processInstance.getCommandParam() != null) { Map<String, String> processCmdParam = JSONUtils.toMap(processInstance.getCommandParam()); processCmdParam.forEach((key, value) -> { if (!cmdParam.containsKey(key)) { cmdParam.put(key, value); } }); } // reset command parameter if sub process if (cmdParam.containsKey(CommandKeyConstants.CMD_PARAM_SUB_PROCESS)) { processInstance.setCommandParam(command.getCommandParam()); } if (Boolean.FALSE.equals(checkCmdParam(command, cmdParam))) { logger.error("command parameter check failed!"); return null; } if (command.getScheduleTime() != null) { processInstance.setScheduleTime(command.getScheduleTime()); } //設(shè)置處理節(jié)點的host和restartTime。 processInstance.setHost(host); processInstance.setRestartTime(new Date()); WorkflowExecutionStatus runStatus = WorkflowExecutionStatus.RUNNING_EXECUTION; int runTime = processInstance.getRunTimes(); switch (commandType) { case START_PROCESS: break; case START_FAILURE_TASK_PROCESS: // find failed tasks and init these tasks ...... break; case START_CURRENT_TASK_PROCESS: break; case RECOVER_WAITING_THREAD: break; case RECOVER_SUSPENDED_PROCESS: // find pause tasks and init task's state ...... break; //這里對容錯類型的command進(jìn)行設(shè)置Flag.YES case RECOVER_TOLERANCE_FAULT_PROCESS: // recover tolerance fault process processInstance.setRecovery(Flag.YES); processInstance.setRunTimes(runTime + 1); runStatus = processInstance.getState(); break; case COMPLEMENT_DATA: // delete all the valid tasks when complement data if id is not null ...... break; case REPEAT_RUNNING: // delete the recover task names from command parameter ...... break; case SCHEDULER: break; default: break; } processInstance.setStateWithDesc(runStatus, commandType.getDescp()); return processInstance; }
接著對于每個工作流實例都會加入到一個隊列中。
private static final LinkedBlockingQueue<WorkflowEvent> workflowEventQueue = new LinkedBlockingQueue<>(); /** * Add a workflow event. */ public void addEvent(WorkflowEvent workflowEvent) { workflowEventQueue.add(workflowEvent); logger.info("Added workflow event to workflowEvent queue, event: {}", workflowEvent); } /** * Pool the head of the workflow event queue and wait an workflow event. */ public WorkflowEvent poolEvent() throws InterruptedException { return workflowEventQueue.take(); }
有添加就會有消費,在Master啟動之后就已經(jīng)啟動了消費的線程WorkflowEventLooper。
this.masterSchedulerBootstrap.start(); 啟動入口:
@Override public synchronized void start() { logger.info("Master schedule bootstrap starting.."); super.start(); workflowEventLooper.start(); logger.info("Master schedule bootstrap started..."); }
具體消費邏輯代碼:
public void run() { WorkflowEvent workflowEvent = null; while (!ServerLifeCycleManager.isStopped()) { try { workflowEvent = workflowEventQueue.poolEvent(); LoggerUtils.setWorkflowInstanceIdMDC(workflowEvent.getWorkflowInstanceId()); logger.info("Workflow event looper receive a workflow event: {}, will handle this", workflowEvent); WorkflowEventHandler workflowEventHandler = workflowEventHandlerMap.get(workflowEvent.getWorkflowEventType()); workflowEventHandler.handleWorkflowEvent(workflowEvent); } catch (InterruptedException e) { logger.warn("WorkflowEventLooper thread is interrupted, will close this loop", e); Thread.currentThread().interrupt(); break; } catch (WorkflowEventHandleException workflowEventHandleException) { logger.error("Handle workflow event failed, will add this event to event queue again, event: {}", workflowEvent, workflowEventHandleException); workflowEventQueue.addEvent(workflowEvent); ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); } catch (WorkflowEventHandleError workflowEventHandleError) { logger.error("Handle workflow event error, will drop this event, event: {}", workflowEvent, workflowEventHandleError); } catch (Exception unknownException) { logger.error( "Handle workflow event failed, get a unknown exception, will add this event to event queue again, event: {}", workflowEvent, unknownException); workflowEventQueue.addEvent(workflowEvent); ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); } finally { LoggerUtils.removeWorkflowInstanceIdMDC(); } } }
可以看到就是從隊列中獲取添加的event,然后找對應(yīng)的handler處理。最終會進(jìn)入到WorkflowStartEventHandler中:
@Override public void handleWorkflowEvent(final WorkflowEvent workflowEvent) throws WorkflowEventHandleError { logger.info("Handle workflow start event, begin to start a workflow, event: {}", workflowEvent); WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager.getByProcessInstanceId( workflowEvent.getWorkflowInstanceId()); if (workflowExecuteRunnable == null) { throw new WorkflowEventHandleError( "The workflow start event is invalid, cannot find the workflow instance from cache"); } ProcessInstanceMetrics.incProcessInstanceByState("submit"); ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance(); CompletableFuture.supplyAsync(workflowExecuteRunnable::call, workflowExecuteThreadPool) .thenAccept(workflowSubmitStatue -> { if (WorkflowSubmitStatue.SUCCESS == workflowSubmitStatue) { // submit failed will resend the event to workflow event queue logger.info("Success submit the workflow instance"); if (processInstance.getTimeout() > 0) { stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance); } } else { logger.error("Failed to submit the workflow instance, will resend the workflow start event: {}", workflowEvent); workflowEventQueue.addEvent(workflowEvent); } }); }
最終就會對工作流實例中的Task進(jìn)行提交處理:
構(gòu)造工作流實例的DAG,初始化隊列,提交DAG的頭節(jié)點
@Override public WorkflowSubmitStatue call() { if (isStart()) { // This case should not been happened logger.warn("[WorkflowInstance-{}] The workflow has already been started", processInstance.getId()); return WorkflowSubmitStatue.DUPLICATED_SUBMITTED; } try { LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId()); if (workflowRunnableStatus == WorkflowRunnableStatus.CREATED) { buildFlowDag(); workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_DAG; logger.info("workflowStatue changed to :{}", workflowRunnableStatus); } if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_DAG) { initTaskQueue(); workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_QUEUE; logger.info("workflowStatue changed to :{}", workflowRunnableStatus); } if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_QUEUE) { submitPostNode(null); workflowRunnableStatus = WorkflowRunnableStatus.STARTED; logger.info("workflowStatue changed to :{}", workflowRunnableStatus); } return WorkflowSubmitStatue.SUCCESS; } catch (Exception e) { logger.error("Start workflow error", e); return WorkflowSubmitStatue.FAILED; } finally { LoggerUtils.removeWorkflowInstanceIdMDC(); } }
那么針對需要容錯的任務(wù)是在哪里處理的呢? 可以去initTaskQueue方法中瞧一瞧:
首先會調(diào)用isNewProcessInstance方法來判斷是否是新的工作流實例,具體代碼如下
private boolean isNewProcessInstance() { if (Flag.YES.equals(processInstance.getRecovery())) { logger.info("This workInstance will be recover by this execution"); return false; } if (WorkflowExecutionStatus.RUNNING_EXECUTION == processInstance.getState() && processInstance.getRunTimes() == 1) { return true; } logger.info( "The workflowInstance has been executed before, this execution is to reRun, processInstance status: {}, runTimes: {}", processInstance.getState(), processInstance.getRunTimes()); return false; }
通過上面可以看到如果工作流實例的recovery熟悉等于Flag.YES,則會返回false。通過前面我們可以知道在處理容錯類型command轉(zhuǎn)換為ProcessInstance的時候?qū)ζ?strong>recovery屬性設(shè)置了Flag.YES。因此返回false,就會執(zhí)行如下的處理
if (!isNewProcessInstance()) { logger.info("The workflowInstance is not a newly running instance, runtimes: {}, recover flag: {}", processInstance.getRunTimes(), processInstance.getRecovery()); List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId()); for (TaskInstance task : validTaskInstanceList) { try { LoggerUtils.setWorkflowAndTaskInstanceIDMDC(task.getProcessInstanceId(), task.getId()); logger.info( "Check the taskInstance from a exist workflowInstance, existTaskInstanceCode: {}, taskInstanceStatus: {}", task.getTaskCode(), task.getState()); if (validTaskMap.containsKey(task.getTaskCode())) { logger.warn("Have same taskCode taskInstance when init task queue, need to check taskExecutionStatus, taskCode:{}", task.getTaskCode()); int oldTaskInstanceId = validTaskMap.get(task.getTaskCode()); TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId); if (!oldTaskInstance.getState().isFinished() && task.getState().isFinished()) { task.setFlag(Flag.NO); processService.updateTaskInstance(task); continue; } } validTaskMap.put(task.getTaskCode(), task.getId()); taskInstanceMap.put(task.getId(), task); if (task.isTaskComplete()) { logger.info("TaskInstance is already complete."); completeTaskMap.put(task.getTaskCode(), task.getId()); continue; } if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) { continue; } if (task.taskCanRetry()) { if (task.getState().isNeedFaultTolerance()) { logger.info("TaskInstance needs fault tolerance, will be added to standby list."); task.setFlag(Flag.NO); processService.updateTaskInstance(task); // tolerantTaskInstance add to standby list directly TaskInstance tolerantTaskInstance = cloneTolerantTaskInstance(task); addTaskToStandByList(tolerantTaskInstance); } else { logger.info("Retry taskInstance, taskState: {}", task.getState()); retryTaskInstance(task); } continue; } if (task.getState().isFailure()) { errorTaskMap.put(task.getTaskCode(), task.getId()); } } finally { LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); } } } else { logger.info("The current workflowInstance is a newly running workflowInstance"); }
上面就會對工作流實例下的所有任務(wù)實例進(jìn)行處理,對已完成的任務(wù)加入到completeTaskMap中。并且通過task.getState().isNeedFaultTolerance()來判斷是否是需要容錯的任務(wù)還是重試任務(wù)。容錯任務(wù)會加入到隊列readyToSubmitTaskQueue中(tolerantTaskInstance add to standby list directly)。
最后通過submitPostNode方法來觸發(fā)工作流實例中任務(wù)實例的執(zhí)行。可以發(fā)現(xiàn)和普通任務(wù)沒有什么區(qū)別。
總結(jié)
對于Master的容錯流程大致分為三個方向:
1-獲取容錯范圍:全程會加鎖獲取,根據(jù)異常節(jié)點的host去獲取哪些工作流實例需要加速
2-容錯處理:包括容錯工作流實例和任務(wù)實例,在容錯前會比較實例的開始時間和服務(wù)節(jié)點的啟動時間,在服務(wù)啟動時間之后的則跳過容錯;最終生成command命令到表中。
3-容錯任務(wù)的調(diào)度:調(diào)度線程遍歷到command命令之后會重新構(gòu)造為工作流實例,并根據(jù)容錯的類型初始化任務(wù)實例到對應(yīng)的隊列中,然后對其任務(wù)實例重新進(jìn)行調(diào)度。
可以參考官方文檔
以上就是DolphinScheduler容錯Master源碼分析的詳細(xì)內(nèi)容,更多關(guān)于DolphinScheduler容錯Master的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
springcloud整合gateway實現(xiàn)網(wǎng)關(guān)全局過濾器功能
本文主要介紹了springcloud整合gateway實現(xiàn)網(wǎng)關(guān)全局過濾器功能,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-02-02Java8中l(wèi)ambda表達(dá)式的應(yīng)用及一些泛型相關(guān)知識
這篇文章主要介紹了Java8中l(wèi)ambda表達(dá)式的應(yīng)用及一些泛型相關(guān)知識的相關(guān)資料2017-01-01Java中設(shè)置session超時(失效)的三種方法
這篇文章主要介紹了Java中設(shè)置session超時(失效)的三種方法,本文講解了在web容器中設(shè)置、在工程的web.xml中設(shè)置、通過java代碼設(shè)置3種方法,需要的朋友可以參考下2015-07-07解析SpringSecurity+JWT認(rèn)證流程實現(xiàn)
這篇文章主要介紹了解析SpringSecurity+JWT認(rèn)證流程實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-07-07Java中Object轉(zhuǎn)換為List類型的實現(xiàn)方法
這篇文章主要介紹了Java中Object轉(zhuǎn)換為List類型的實現(xiàn)方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-03-03Eclipse設(shè)定文件的默認(rèn)打開方式的具體操作步驟
以下是對Eclipse設(shè)定文件的默認(rèn)打開方式的具體操作步驟進(jìn)行了詳細(xì)的介紹,需要的朋友可以過來參考下2013-08-08