DolphinScheduler容錯源碼分析之Worker
引言
上一篇文章介紹了DolphinScheduler中Master的容錯機制,作為去中心化的多Master和多Worker服務(wù)對等架構(gòu),Worker的容錯機制也是我們需要關(guān)注的。
和Master一樣源碼的版本基于3.1.3
Worker容錯源碼分析
worker啟動注冊
首先Worker的啟動入口是在WorkerServer中,在Worker啟動后就會執(zhí)行其run方法
@PostConstruct public void run() { this.workerRpcServer.start(); this.workerRpcClient.start(); this.taskPluginManager.loadPlugin(); this.workerRegistryClient.setRegistryStoppable(this); this.workerRegistryClient.start(); this.workerManagerThread.start(); this.messageRetryRunner.start(); /* * registry hooks, which are called before the process exits */ Runtime.getRuntime().addShutdownHook(new Thread(() -> { if (!ServerLifeCycleManager.isStopped()) { close("WorkerServer shutdown hook"); } })); }
這里我們只關(guān)心this.workerRegistryClient.start();方法所做的事情:注冊當前worker信息到Zookeeper,并且啟動了一個心跳任務(wù)定時更新worker的信息到Zookeeper。
/** * registry */ private void registry() { WorkerHeartBeat workerHeartBeat = workerHeartBeatTask.getHeartBeat(); String workerZKPath = workerConfig.getWorkerRegistryPath(); // remove before persist registryClient.remove(workerZKPath); registryClient.persistEphemeral(workerZKPath, JSONUtils.toJsonString(workerHeartBeat)); log.info("Worker node: {} registry to ZK {} successfully", workerConfig.getWorkerAddress(), workerZKPath); while (!registryClient.checkNodeExists(workerConfig.getWorkerAddress(), NodeType.WORKER)) { ThreadUtils.sleep(SLEEP_TIME_MILLIS); } // sleep 1s, waiting master failover remove ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); workerHeartBeatTask.start(); log.info("Worker node: {} registry finished", workerConfig.getWorkerAddress()); }
這里和master的注冊流程基本一致,來看看worker注冊的目錄:
worker注冊到zk的路徑如下,并且和master都有相同的父級目錄名稱是/node:
// /nodes/worker/+ip:listenPortworkerConfig.setWorkerRegistryPath(REGISTRY_DOLPHINSCHEDULER_WORKERS + "/" + workerConfig.getWorkerAddress());
注冊的內(nèi)容就是當前worker節(jié)點的健康狀況,包含了cpu,內(nèi)存,負載,磁盤等信息,通過這些信息就可以標識當前worker是否健康,可以接收任務(wù)的分配并且去執(zhí)行。
@Override public WorkerHeartBeat getHeartBeat() { double loadAverage = OSUtils.loadAverage(); double cpuUsage = OSUtils.cpuUsage(); int maxCpuLoadAvg = workerConfig.getMaxCpuLoadAvg(); double reservedMemory = workerConfig.getReservedMemory(); double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize(); int execThreads = workerConfig.getExecThreads(); int workerWaitingTaskCount = this.workerWaitingTaskCount.get(); int serverStatus = getServerStatus(loadAverage, maxCpuLoadAvg, availablePhysicalMemorySize, reservedMemory, execThreads, workerWaitingTaskCount); return WorkerHeartBeat.builder() .startupTime(ServerLifeCycleManager.getServerStartupTime()) .reportTime(System.currentTimeMillis()) .cpuUsage(cpuUsage) .loadAverage(loadAverage) .availablePhysicalMemorySize(availablePhysicalMemorySize) .maxCpuloadAvg(maxCpuLoadAvg) .memoryUsage(OSUtils.memoryUsage()) .reservedMemory(reservedMemory) .diskAvailable(OSUtils.diskAvailable()) .processId(processId) .workerHostWeight(workerConfig.getHostWeight()) .workerWaitingTaskCount(this.workerWaitingTaskCount.get()) .workerExecThreadCount(workerConfig.getExecThreads()) .serverStatus(serverStatus) .build(); }
Master監(jiān)聽worker在zk節(jié)點的狀態(tài)
接下來,master就會對注冊的worker節(jié)點進行監(jiān)控,在上一篇的介紹中,master啟動注冊后對node節(jié)點已經(jīng)進行了監(jiān)聽,大家可以進行回顧一下,這里監(jiān)聽了/node/節(jié)點,當其下面的子路徑/master或者/worker有變動就會觸發(fā)回調(diào) :
//node registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener());
因此當worker臨時節(jié)點異常后,master就會感知到其變化。最終會回調(diào)MasterRegistryDataListener中的notify方法,并根據(jù)變動的路徑來判斷是master還是worker:
@Override public void notify(Event event) { final String path = event.path(); if (Strings.isNullOrEmpty(path)) { return; } //monitor master if (path.startsWith(REGISTRY_DOLPHINSCHEDULER_MASTERS + Constants.SINGLE_SLASH)) { handleMasterEvent(event); } else if (path.startsWith(REGISTRY_DOLPHINSCHEDULER_WORKERS + Constants.SINGLE_SLASH)) { //monitor worker handleWorkerEvent(event); } }
這段代碼在之前master的容錯中也見到過。這里是對于worker的容錯,就會觸發(fā)handleWorkerEvent方法。
private void handleWorkerEvent(Event event) { final String path = event.path(); switch (event.type()) { case ADD: logger.info("worker node added : {}", path); break; case REMOVE: logger.info("worker node deleted : {}", path); masterRegistryClient.removeWorkerNodePath(path, NodeType.WORKER, true); break; default: break; } }
接下來就是獲取到下線worker節(jié)點的host信息進行進一步的容錯處理了:
public void removeWorkerNodePath(String path, NodeType nodeType, boolean failover) { logger.info("{} node deleted : {}", nodeType, path); try { //獲取節(jié)點信息 String serverHost = null; if (!StringUtils.isEmpty(path)) { serverHost = registryClient.getHostByEventDataPath(path); if (StringUtils.isEmpty(serverHost)) { logger.error("server down error: unknown path: {}", path); return; } if (!registryClient.exists(path)) { logger.info("path: {} not exists", path); } } // failover server if (failover) { failoverService.failoverServerWhenDown(serverHost, nodeType); } } catch (Exception e) { logger.error("{} server failover failed", nodeType, e); } }
整個worker容錯的大致過程如下:
1-獲取需要容錯worker節(jié)點的啟動時間,用于后續(xù)判斷worker節(jié)點是否還在下線狀態(tài),或者是否已經(jīng)重新啟動
2-根據(jù)異常的worker的信息查詢需要容錯的任務(wù)實例,獲取只屬于當前master節(jié)點需要容錯的任務(wù)實例信息,這里也是和master不同的,并且容錯沒加鎖的原因。
3-遍歷所有要容錯的任務(wù)實例進行容錯 這里注意的是需要容錯的任務(wù)是在worker重新啟動之前的任務(wù),之后worker異常重啟后分配的新任務(wù)不要容錯
/** * Do the worker failover. Will find the SUBMITTED_SUCCESS/DISPATCH/RUNNING_EXECUTION/DELAY_EXECUTION/READY_PAUSE/READY_STOP tasks belong the given worker, * and failover these tasks. * <p> * Note: When we do worker failover, the master will only failover the processInstance belongs to the current master. * * @param workerHost worker host */ public void failoverWorker(@NonNull String workerHost) { LOGGER.info("Worker[{}] failover starting", workerHost); final StopWatch failoverTimeCost = StopWatch.createStarted(); //獲取需要容錯worker節(jié)點的啟動時間,用于后續(xù)判斷worker節(jié)點是否還在下線狀態(tài),或者是否已經(jīng)重新啟動 // we query the task instance from cache, so that we can directly update the cache final Optional<Date> needFailoverWorkerStartTime = getServerStartupTime(registryClient.getServerList(NodeType.WORKER), workerHost); //根據(jù)異常的worker的信息查詢需要容錯的任務(wù)實例,獲取只屬于當前master節(jié)點需要容錯的任務(wù)實例信息,這里也是和master不同的,并且容錯沒加鎖的原因。 final List<TaskInstance> needFailoverTaskInstanceList = getNeedFailoverTaskInstance(workerHost); if (CollectionUtils.isEmpty(needFailoverTaskInstanceList)) { LOGGER.info("Worker[{}] failover finished there are no taskInstance need to failover", workerHost); return; } LOGGER.info( "Worker[{}] failover there are {} taskInstance may need to failover, will do a deep check, taskInstanceIds: {}", workerHost, needFailoverTaskInstanceList.size(), needFailoverTaskInstanceList.stream().map(TaskInstance::getId).collect(Collectors.toList())); final Map<Integer, ProcessInstance> processInstanceCacheMap = new HashMap<>(); for (TaskInstance taskInstance : needFailoverTaskInstanceList) { LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(), taskInstance.getId()); try { ProcessInstance processInstance = processInstanceCacheMap.computeIfAbsent( taskInstance.getProcessInstanceId(), k -> { WorkflowExecuteRunnable workflowExecuteRunnable = cacheManager.getByProcessInstanceId( taskInstance.getProcessInstanceId()); if (workflowExecuteRunnable == null) { return null; } return workflowExecuteRunnable.getProcessInstance(); }); //這里注意的是需要容錯的任務(wù)是在worker重新啟動之前的任務(wù),之后worker異常重啟后分配的新任務(wù)不要容錯 if (!checkTaskInstanceNeedFailover(needFailoverWorkerStartTime, processInstance, taskInstance)) { LOGGER.info("Worker[{}] the current taskInstance doesn't need to failover", workerHost); continue; } LOGGER.info( "Worker[{}] failover: begin to failover taskInstance, will set the status to NEED_FAULT_TOLERANCE", workerHost); failoverTaskInstance(processInstance, taskInstance); LOGGER.info("Worker[{}] failover: Finish failover taskInstance", workerHost); } catch (Exception ex) { LOGGER.info("Worker[{}] failover taskInstance occur exception", workerHost, ex); } finally { LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); } } failoverTimeCost.stop(); LOGGER.info("Worker[{}] failover finished, useTime:{}ms", workerHost, failoverTimeCost.getTime(TimeUnit.MILLISECONDS)); }
4-更新taskInstance的狀態(tài)為TaskExecutionStatus.NEED_FAULT_TOLERANCE。并且構(gòu)造TaskStateEvent事件,設(shè)置其狀態(tài)為需要容TaskExecutionStatus.NEED_FAULT_TOLERANCE的,其類型是TASK_STATE_CHANGE。最后提交需要容錯的event。
private void failoverTaskInstance(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) { TaskMetrics.incTaskInstanceByState("failover"); boolean isMasterTask = TaskProcessorFactory.isMasterTask(taskInstance.getTaskType()); taskInstance.setProcessInstance(processInstance); if (!isMasterTask) { LOGGER.info("The failover taskInstance is not master task"); TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get() .buildTaskInstanceRelatedInfo(taskInstance) .buildProcessInstanceRelatedInfo(processInstance) .buildProcessDefinitionRelatedInfo(processInstance.getProcessDefinition()) .create(); if (masterConfig.isKillYarnJobWhenTaskFailover()) { // only kill yarn job if exists , the local thread has exited LOGGER.info("TaskInstance failover begin kill the task related yarn job"); ProcessUtils.killYarnJob(logClient, taskExecutionContext); } } else { LOGGER.info("The failover taskInstance is a master task"); } taskInstance.setState(TaskExecutionStatus.NEED_FAULT_TOLERANCE); taskInstance.setFlag(Flag.NO); processService.saveTaskInstance(taskInstance); //提交event TaskStateEvent stateEvent = TaskStateEvent.builder() .processInstanceId(processInstance.getId()) .taskInstanceId(taskInstance.getId()) .status(TaskExecutionStatus.NEED_FAULT_TOLERANCE) .type(StateEventType.TASK_STATE_CHANGE) .build(); workflowExecuteThreadPool.submitStateEvent(stateEvent); }
event的提交會去根據(jù)其所屬的工作流實例來選擇其對應(yīng)的WorkflowExecuteRunnable進行提交容錯:
public void submitStateEvent(StateEvent stateEvent) { WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId()); if (workflowExecuteThread == null) { logger.warn("Submit state event error, cannot from workflowExecuteThread from cache manager, stateEvent:{}", stateEvent); return; } workflowExecuteThread.addStateEvent(stateEvent); logger.info("Submit state event success, stateEvent: {}", stateEvent); }
處理容錯event事件
在上面的代碼中已經(jīng)對需要容錯的任務(wù)提交了一個event事件,那么肯定會有線程對這個event進行具體的處理。我們來看WorkflowExecuteRunnable類,submitStateEvent就是將event提交到了這個類中的stateEvents隊列中:
private final ConcurrentLinkedQueue<StateEvent> stateEvents = new ConcurrentLinkedQueue<>();
WorkflowExecuteRunnable在master啟動的時候就已經(jīng)啟動了,并且會不停的從stateEvents中獲取event進行處理:
/** * handle event */ public void handleEvents() { if (!isStart()) { logger.info( "The workflow instance is not started, will not handle its state event, current state event size: {}", stateEvents); return; } StateEvent stateEvent = null; while (!this.stateEvents.isEmpty()) { try { stateEvent = this.stateEvents.peek(); LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId()); // if state handle success then will remove this state, otherwise will retry this state next time. // The state should always handle success except database error. checkProcessInstance(stateEvent); StateEventHandler stateEventHandler = StateEventHandlerManager.getStateEventHandler(stateEvent.getType()) .orElseThrow(() -> new StateEventHandleError( "Cannot find handler for the given state event")); logger.info("Begin to handle state event, {}", stateEvent); if (stateEventHandler.handleStateEvent(this, stateEvent)) { this.stateEvents.remove(stateEvent); } } catch (StateEventHandleError stateEventHandleError) { logger.error("State event handle error, will remove this event: {}", stateEvent, stateEventHandleError); this.stateEvents.remove(stateEvent); ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); } catch (StateEventHandleException stateEventHandleException) { logger.error("State event handle error, will retry this event: {}", stateEvent, stateEventHandleException); ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); } catch (Exception e) { // we catch the exception here, since if the state event handle failed, the state event will still keep // in the stateEvents queue. logger.error("State event handle error, get a unknown exception, will retry this event: {}", stateEvent, e); ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); } finally { LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); } } }
根據(jù)提交事件的類型StateEventType.TASK_STATE_CHANGE 可以獲取到具體的StateEventHandler實現(xiàn)是TaskStateEventHandler。在TaskStateEventHandler的handleStateEvent方法中主要對需要容錯的任務(wù)做了如下處理:
if (task.getState().isFinished()) { if (completeTaskMap.containsKey(task.getTaskCode()) && completeTaskMap.get(task.getTaskCode()) == task.getId()) { logger.warn("The task instance is already complete, stateEvent: {}", stateEvent); return true; } workflowExecuteRunnable.taskFinished(task); if (task.getTaskGroupId() > 0) { logger.info("The task instance need to release task Group: {}", task.getTaskGroupId()); workflowExecuteRunnable.releaseTaskGroup(task); } return true; }
其中判斷是否完成的具體實現(xiàn)中就包含了是否是容錯的狀態(tài)。
public boolean isFinished() { return isSuccess() || isKill() || isFailure() || isPause(); } public boolean isFailure() { return this == TaskExecutionStatus.FAILURE || this == NEED_FAULT_TOLERANCE; }
接著就會調(diào)用workflowExecuteRunnable.taskFinished(task);方法去處理各種任務(wù)實例狀態(tài)變化后的事件。這里我們只關(guān)注容錯相關(guān)的代碼分支:
} else if (taskInstance.taskCanRetry() && !processInstance.getState().isReadyStop()) { // retry task logger.info("Retry taskInstance taskInstance state: {}", taskInstance.getState()); retryTaskInstance(taskInstance); } //判斷了是否容錯的狀態(tài),前面對其已經(jīng)進行了更新 public boolean taskCanRetry() { if (this.isSubProcess()) { return false; } if (this.getState() == TaskExecutionStatus.NEED_FAULT_TOLERANCE) { return true; } return this.getState() == TaskExecutionStatus.FAILURE && (this.getRetryTimes() < this.getMaxRetryTimes()); } /** * crate new task instance to retry, different objects from the original * */ private void retryTaskInstance(TaskInstance taskInstance) throws StateEventHandleException { if (!taskInstance.taskCanRetry()) { return; } TaskInstance newTaskInstance = cloneRetryTaskInstance(taskInstance); if (newTaskInstance == null) { logger.error("Retry task fail because new taskInstance is null, task code:{}, task id:{}", taskInstance.getTaskCode(), taskInstance.getId()); return; } waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance); if (!taskInstance.retryTaskIntervalOverTime()) { logger.info( "Failure task will be submitted, process id: {}, task instance code: {}, state: {}, retry times: {} / {}, interval: {}", processInstance.getId(), newTaskInstance.getTaskCode(), newTaskInstance.getState(), newTaskInstance.getRetryTimes(), newTaskInstance.getMaxRetryTimes(), newTaskInstance.getRetryInterval()); stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, newTaskInstance); stateWheelExecuteThread.addTask4RetryCheck(processInstance, newTaskInstance); } else { addTaskToStandByList(newTaskInstance); submitStandByTask(); waitToRetryTaskInstanceMap.remove(newTaskInstance.getTaskCode()); } }
最終將需要容錯的任務(wù)實例重新加入到了readyToSubmitTaskQueue隊列中,重新進行submit:
addTaskToStandByList(newTaskInstance); submitStandByTask();
后面就是和正常任務(wù)一樣處理了通過submitTaskExec方法提交任務(wù)到具體的worker執(zhí)行。
總結(jié)
對于Worker的容錯流程大致如下:
1-Master基于ZK的監(jiān)聽來感知需要容錯的Worker節(jié)點信息
2-每個Master只負責容錯屬于自己調(diào)度的工作流實例,在容錯前會比較實例的開始時間和服務(wù)節(jié)點的啟動時間,在服務(wù)啟動時間之后的則跳過容錯;
3-需要容錯的任務(wù)實例會重新加入到readyToSubmitTaskQueue,并提交運行。
到此,對于Worker的容錯,就到這里了,更多關(guān)于DolphinScheduler容錯Worker的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
使用IntelliJ IDEA 15和Maven創(chuàng)建Java Web項目(圖文)
本篇文章主要介紹了使用IntelliJ IDEA 15和Maven創(chuàng)建Java Web項目(圖文),具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-09-09Java實現(xiàn)JDBC連接數(shù)據(jù)庫簡單案例
這篇文章主要介紹了Java實現(xiàn)JDBC連接數(shù)據(jù)庫簡單案例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-08-08Java使用easyExcel批量導(dǎo)入數(shù)據(jù)詳解
這篇文章主要介紹了Java使用easyExcel批量導(dǎo)入數(shù)據(jù)詳解,通常我們會提供一個模板,此模塊我們可以使用easyExcel導(dǎo)出數(shù)據(jù)生成的一個Excel文件當作模板,提供下載鏈接,用戶在該文件內(nèi)填入規(guī)定的數(shù)據(jù)格式以后可以批量導(dǎo)入數(shù)據(jù)到數(shù)據(jù)庫中,需要的朋友可以參考下2023-08-08