欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

DolphinScheduler容錯源碼分析之Worker

 更新時間:2023年02月06日 11:42:14   作者:leo的跟班  
這篇文章主要為大家介紹了DolphinScheduler容錯源碼分析之Worker,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

引言

上一篇文章介紹了DolphinScheduler中Master的容錯機制,作為去中心化的多Master和多Worker服務(wù)對等架構(gòu),Worker的容錯機制也是我們需要關(guān)注的。

和Master一樣源碼的版本基于3.1.3

Worker容錯源碼分析

worker啟動注冊

首先Worker的啟動入口是在WorkerServer中,在Worker啟動后就會執(zhí)行其run方法

@PostConstruct
public void run() {
	this.workerRpcServer.start();
	this.workerRpcClient.start();
	this.taskPluginManager.loadPlugin();
	this.workerRegistryClient.setRegistryStoppable(this);
	this.workerRegistryClient.start();
	this.workerManagerThread.start();
	this.messageRetryRunner.start();
	/*
	 * registry hooks, which are called before the process exits
	 */
	Runtime.getRuntime().addShutdownHook(new Thread(() -> {
		if (!ServerLifeCycleManager.isStopped()) {
			close("WorkerServer shutdown hook");
		}
	}));
}

這里我們只關(guān)心this.workerRegistryClient.start();方法所做的事情:注冊當前worker信息到Zookeeper,并且啟動了一個心跳任務(wù)定時更新worker的信息到Zookeeper。

/**
 * registry
 */
private void registry() {
	WorkerHeartBeat workerHeartBeat = workerHeartBeatTask.getHeartBeat();
	String workerZKPath = workerConfig.getWorkerRegistryPath();
	// remove before persist
	registryClient.remove(workerZKPath);
	registryClient.persistEphemeral(workerZKPath, JSONUtils.toJsonString(workerHeartBeat));
	log.info("Worker node: {} registry to ZK {} successfully", workerConfig.getWorkerAddress(), workerZKPath);
	while (!registryClient.checkNodeExists(workerConfig.getWorkerAddress(), NodeType.WORKER)) {
		ThreadUtils.sleep(SLEEP_TIME_MILLIS);
	}
	// sleep 1s, waiting master failover remove
	ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
	workerHeartBeatTask.start();
	log.info("Worker node: {} registry finished", workerConfig.getWorkerAddress());
}

這里和master的注冊流程基本一致,來看看worker注冊的目錄:

worker注冊到zk的路徑如下,并且和master都有相同的父級目錄名稱是/node:

// /nodes/worker/+ip:listenPortworkerConfig.setWorkerRegistryPath(REGISTRY_DOLPHINSCHEDULER_WORKERS + "/" + workerConfig.getWorkerAddress());

注冊的內(nèi)容就是當前worker節(jié)點的健康狀況,包含了cpu,內(nèi)存,負載,磁盤等信息,通過這些信息就可以標識當前worker是否健康,可以接收任務(wù)的分配并且去執(zhí)行。

@Override
public WorkerHeartBeat getHeartBeat() {
	double loadAverage = OSUtils.loadAverage();
	double cpuUsage = OSUtils.cpuUsage();
	int maxCpuLoadAvg = workerConfig.getMaxCpuLoadAvg();
	double reservedMemory = workerConfig.getReservedMemory();
	double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize();
	int execThreads = workerConfig.getExecThreads();
	int workerWaitingTaskCount = this.workerWaitingTaskCount.get();
	int serverStatus = getServerStatus(loadAverage, maxCpuLoadAvg, availablePhysicalMemorySize, reservedMemory,
			execThreads, workerWaitingTaskCount);
	return WorkerHeartBeat.builder()
			.startupTime(ServerLifeCycleManager.getServerStartupTime())
			.reportTime(System.currentTimeMillis())
			.cpuUsage(cpuUsage)
			.loadAverage(loadAverage)
			.availablePhysicalMemorySize(availablePhysicalMemorySize)
			.maxCpuloadAvg(maxCpuLoadAvg)
			.memoryUsage(OSUtils.memoryUsage())
			.reservedMemory(reservedMemory)
			.diskAvailable(OSUtils.diskAvailable())
			.processId(processId)
			.workerHostWeight(workerConfig.getHostWeight())
			.workerWaitingTaskCount(this.workerWaitingTaskCount.get())
			.workerExecThreadCount(workerConfig.getExecThreads())
			.serverStatus(serverStatus)
			.build();
}

Master監(jiān)聽worker在zk節(jié)點的狀態(tài)

接下來,master就會對注冊的worker節(jié)點進行監(jiān)控,在上一篇的介紹中,master啟動注冊后對node節(jié)點已經(jīng)進行了監(jiān)聽,大家可以進行回顧一下,這里監(jiān)聽了/node/節(jié)點,當其下面的子路徑/master或者/worker有變動就會觸發(fā)回調(diào) :

//node
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener());

因此當worker臨時節(jié)點異常后,master就會感知到其變化。最終會回調(diào)MasterRegistryDataListener中的notify方法,并根據(jù)變動的路徑來判斷是master還是worker:

@Override
public void notify(Event event) {
	final String path = event.path();
	if (Strings.isNullOrEmpty(path)) {
		return;
	}
	//monitor master
	if (path.startsWith(REGISTRY_DOLPHINSCHEDULER_MASTERS + Constants.SINGLE_SLASH)) {
		handleMasterEvent(event);
	} else if (path.startsWith(REGISTRY_DOLPHINSCHEDULER_WORKERS + Constants.SINGLE_SLASH)) {
		//monitor worker
		handleWorkerEvent(event);
	}
}

這段代碼在之前master的容錯中也見到過。這里是對于worker的容錯,就會觸發(fā)handleWorkerEvent方法。

private void handleWorkerEvent(Event event) {
	final String path = event.path();
	switch (event.type()) {
		case ADD:
			logger.info("worker node added : {}", path);
			break;
		case REMOVE:
			logger.info("worker node deleted : {}", path);
			masterRegistryClient.removeWorkerNodePath(path, NodeType.WORKER, true);
			break;
		default:
			break;
	}
}

接下來就是獲取到下線worker節(jié)點的host信息進行進一步的容錯處理了:

public void removeWorkerNodePath(String path, NodeType nodeType, boolean failover) {
	logger.info("{} node deleted : {}", nodeType, path);
	try {
                //獲取節(jié)點信息
		String serverHost = null;
		if (!StringUtils.isEmpty(path)) {
			serverHost = registryClient.getHostByEventDataPath(path);
			if (StringUtils.isEmpty(serverHost)) {
				logger.error("server down error: unknown path: {}", path);
				return;
			}
			if (!registryClient.exists(path)) {
				logger.info("path: {} not exists", path);
			}
		}
		// failover server
		if (failover) {
			failoverService.failoverServerWhenDown(serverHost, nodeType);
		}
	} catch (Exception e) {
		logger.error("{} server failover failed", nodeType, e);
	}
}

整個worker容錯的大致過程如下:

1-獲取需要容錯worker節(jié)點的啟動時間,用于后續(xù)判斷worker節(jié)點是否還在下線狀態(tài),或者是否已經(jīng)重新啟動 

2-根據(jù)異常的worker的信息查詢需要容錯的任務(wù)實例,獲取只屬于當前master節(jié)點需要容錯的任務(wù)實例信息,這里也是和master不同的,并且容錯沒加鎖的原因。 

3-遍歷所有要容錯的任務(wù)實例進行容錯 這里注意的是需要容錯的任務(wù)是在worker重新啟動之前的任務(wù),之后worker異常重啟后分配的新任務(wù)不要容錯   

/**
 * Do the worker failover. Will find the SUBMITTED_SUCCESS/DISPATCH/RUNNING_EXECUTION/DELAY_EXECUTION/READY_PAUSE/READY_STOP tasks belong the given worker,
 * and failover these tasks.
 * <p>
 * Note: When we do worker failover, the master will only failover the processInstance belongs to the current master.
 *
 * @param workerHost worker host
 */
public void failoverWorker(@NonNull String workerHost) {
	LOGGER.info("Worker[{}] failover starting", workerHost);
	final StopWatch failoverTimeCost = StopWatch.createStarted();
	//獲取需要容錯worker節(jié)點的啟動時間,用于后續(xù)判斷worker節(jié)點是否還在下線狀態(tài),或者是否已經(jīng)重新啟動
	// we query the task instance from cache, so that we can directly update the cache
	final Optional<Date> needFailoverWorkerStartTime =
			getServerStartupTime(registryClient.getServerList(NodeType.WORKER), workerHost);
	//根據(jù)異常的worker的信息查詢需要容錯的任務(wù)實例,獲取只屬于當前master節(jié)點需要容錯的任務(wù)實例信息,這里也是和master不同的,并且容錯沒加鎖的原因。
	final List<TaskInstance> needFailoverTaskInstanceList = getNeedFailoverTaskInstance(workerHost);
	if (CollectionUtils.isEmpty(needFailoverTaskInstanceList)) {
		LOGGER.info("Worker[{}] failover finished there are no taskInstance need to failover", workerHost);
		return;
	}
	LOGGER.info(
			"Worker[{}] failover there are {} taskInstance may need to failover, will do a deep check, taskInstanceIds: {}",
			workerHost,
			needFailoverTaskInstanceList.size(),
			needFailoverTaskInstanceList.stream().map(TaskInstance::getId).collect(Collectors.toList()));
	final Map<Integer, ProcessInstance> processInstanceCacheMap = new HashMap<>();
	for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
		LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(), taskInstance.getId());
		try {
			ProcessInstance processInstance = processInstanceCacheMap.computeIfAbsent(
					taskInstance.getProcessInstanceId(), k -> {
						WorkflowExecuteRunnable workflowExecuteRunnable = cacheManager.getByProcessInstanceId(
								taskInstance.getProcessInstanceId());
						if (workflowExecuteRunnable == null) {
							return null;
						}
						return workflowExecuteRunnable.getProcessInstance();
					});
			//這里注意的是需要容錯的任務(wù)是在worker重新啟動之前的任務(wù),之后worker異常重啟后分配的新任務(wù)不要容錯
			if (!checkTaskInstanceNeedFailover(needFailoverWorkerStartTime, processInstance, taskInstance)) {
				LOGGER.info("Worker[{}] the current taskInstance doesn't need to failover", workerHost);
				continue;
			}
			LOGGER.info(
					"Worker[{}] failover: begin to failover taskInstance, will set the status to NEED_FAULT_TOLERANCE",
					workerHost);
			failoverTaskInstance(processInstance, taskInstance);
			LOGGER.info("Worker[{}] failover: Finish failover taskInstance", workerHost);
		} catch (Exception ex) {
			LOGGER.info("Worker[{}] failover taskInstance occur exception", workerHost, ex);
		} finally {
			LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
		}
	}
	failoverTimeCost.stop();
	LOGGER.info("Worker[{}] failover finished, useTime:{}ms",
			workerHost,
			failoverTimeCost.getTime(TimeUnit.MILLISECONDS));
}

4-更新taskInstance的狀態(tài)為TaskExecutionStatus.NEED_FAULT_TOLERANCE。并且構(gòu)造TaskStateEvent事件,設(shè)置其狀態(tài)為需要容TaskExecutionStatus.NEED_FAULT_TOLERANCE的,其類型是TASK_STATE_CHANGE。最后提交需要容錯的event。

private void failoverTaskInstance(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
	TaskMetrics.incTaskInstanceByState("failover");
	boolean isMasterTask = TaskProcessorFactory.isMasterTask(taskInstance.getTaskType());
	taskInstance.setProcessInstance(processInstance);
	if (!isMasterTask) {
		LOGGER.info("The failover taskInstance is not master task");
		TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
				.buildTaskInstanceRelatedInfo(taskInstance)
				.buildProcessInstanceRelatedInfo(processInstance)
				.buildProcessDefinitionRelatedInfo(processInstance.getProcessDefinition())
				.create();
		if (masterConfig.isKillYarnJobWhenTaskFailover()) {
			// only kill yarn job if exists , the local thread has exited
			LOGGER.info("TaskInstance failover begin kill the task related yarn job");
			ProcessUtils.killYarnJob(logClient, taskExecutionContext);
		}
	} else {
		LOGGER.info("The failover taskInstance is a master task");
	}
	taskInstance.setState(TaskExecutionStatus.NEED_FAULT_TOLERANCE);
	taskInstance.setFlag(Flag.NO);
	processService.saveTaskInstance(taskInstance);
        //提交event
	TaskStateEvent stateEvent = TaskStateEvent.builder()
			.processInstanceId(processInstance.getId())
			.taskInstanceId(taskInstance.getId())
			.status(TaskExecutionStatus.NEED_FAULT_TOLERANCE)
			.type(StateEventType.TASK_STATE_CHANGE)
			.build();
	workflowExecuteThreadPool.submitStateEvent(stateEvent);
}

event的提交會去根據(jù)其所屬的工作流實例來選擇其對應(yīng)的WorkflowExecuteRunnable進行提交容錯:

public void submitStateEvent(StateEvent stateEvent) {
	WorkflowExecuteRunnable workflowExecuteThread =
			processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
	if (workflowExecuteThread == null) {
		logger.warn("Submit state event error, cannot from workflowExecuteThread from cache manager, stateEvent:{}",
				stateEvent);
		return;
	}
	workflowExecuteThread.addStateEvent(stateEvent);
	logger.info("Submit state event success, stateEvent: {}", stateEvent);
}

處理容錯event事件

在上面的代碼中已經(jīng)對需要容錯的任務(wù)提交了一個event事件,那么肯定會有線程對這個event進行具體的處理。我們來看WorkflowExecuteRunnable類,submitStateEvent就是將event提交到了這個類中的stateEvents隊列中:

private final ConcurrentLinkedQueue<StateEvent> stateEvents = new ConcurrentLinkedQueue<>();

WorkflowExecuteRunnable在master啟動的時候就已經(jīng)啟動了,并且會不停的從stateEvents中獲取event進行處理:

/**
 * handle event
 */
public void handleEvents() {
	if (!isStart()) {
		logger.info(
				"The workflow instance is not started, will not handle its state event, current state event size: {}",
				stateEvents);
		return;
	}
	StateEvent stateEvent = null;
	while (!this.stateEvents.isEmpty()) {
		try {
			stateEvent = this.stateEvents.peek();
			LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(),
					stateEvent.getTaskInstanceId());
			// if state handle success then will remove this state, otherwise will retry this state next time.
			// The state should always handle success except database error.
			checkProcessInstance(stateEvent);
			StateEventHandler stateEventHandler =
					StateEventHandlerManager.getStateEventHandler(stateEvent.getType())
							.orElseThrow(() -> new StateEventHandleError(
									"Cannot find handler for the given state event"));
			logger.info("Begin to handle state event, {}", stateEvent);
			if (stateEventHandler.handleStateEvent(this, stateEvent)) {
				this.stateEvents.remove(stateEvent);
			}
		} catch (StateEventHandleError stateEventHandleError) {
			logger.error("State event handle error, will remove this event: {}", stateEvent, stateEventHandleError);
			this.stateEvents.remove(stateEvent);
			ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
		} catch (StateEventHandleException stateEventHandleException) {
			logger.error("State event handle error, will retry this event: {}",
					stateEvent,
					stateEventHandleException);
			ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
		} catch (Exception e) {
			// we catch the exception here, since if the state event handle failed, the state event will still keep
			// in the stateEvents queue.
			logger.error("State event handle error, get a unknown exception, will retry this event: {}",
					stateEvent,
					e);
			ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
		} finally {
			LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
		}
	}
}

根據(jù)提交事件的類型StateEventType.TASK_STATE_CHANGE 可以獲取到具體的StateEventHandler實現(xiàn)是TaskStateEventHandler。在TaskStateEventHandler的handleStateEvent方法中主要對需要容錯的任務(wù)做了如下處理:

 if (task.getState().isFinished()) {
		if (completeTaskMap.containsKey(task.getTaskCode())
				&& completeTaskMap.get(task.getTaskCode()) == task.getId()) {
			logger.warn("The task instance is already complete, stateEvent: {}", stateEvent);
			return true;
		}
		workflowExecuteRunnable.taskFinished(task);
		if (task.getTaskGroupId() > 0) {
			logger.info("The task instance need to release task Group: {}", task.getTaskGroupId());
			workflowExecuteRunnable.releaseTaskGroup(task);
		}
		return true;
	}

其中判斷是否完成的具體實現(xiàn)中就包含了是否是容錯的狀態(tài)。

public boolean isFinished() {
	return isSuccess() || isKill() || isFailure() || isPause();
}
public boolean isFailure() {
	return this == TaskExecutionStatus.FAILURE || this == NEED_FAULT_TOLERANCE;
}

接著就會調(diào)用workflowExecuteRunnable.taskFinished(task);方法去處理各種任務(wù)實例狀態(tài)變化后的事件。這里我們只關(guān)注容錯相關(guān)的代碼分支:

} else if (taskInstance.taskCanRetry() && !processInstance.getState().isReadyStop()) {
			// retry task
			logger.info("Retry taskInstance taskInstance state: {}", taskInstance.getState());
			retryTaskInstance(taskInstance);
}
//判斷了是否容錯的狀態(tài),前面對其已經(jīng)進行了更新
public boolean taskCanRetry() {
	if (this.isSubProcess()) {
		return false;
	}
	if (this.getState() == TaskExecutionStatus.NEED_FAULT_TOLERANCE) {
		return true;
	}
	return this.getState() == TaskExecutionStatus.FAILURE && (this.getRetryTimes() < this.getMaxRetryTimes());
}
/**
 * crate new task instance to retry, different objects from the original
 *
 */
private void retryTaskInstance(TaskInstance taskInstance) throws StateEventHandleException {
	if (!taskInstance.taskCanRetry()) {
		return;
	}
	TaskInstance newTaskInstance = cloneRetryTaskInstance(taskInstance);
	if (newTaskInstance == null) {
		logger.error("Retry task fail because new taskInstance is null, task code:{}, task id:{}",
				taskInstance.getTaskCode(),
				taskInstance.getId());
		return;
	}
	waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance);
	if (!taskInstance.retryTaskIntervalOverTime()) {
		logger.info(
				"Failure task will be submitted, process id: {}, task instance code: {}, state: {}, retry times: {} / {}, interval: {}",
				processInstance.getId(), newTaskInstance.getTaskCode(),
				newTaskInstance.getState(), newTaskInstance.getRetryTimes(), newTaskInstance.getMaxRetryTimes(),
				newTaskInstance.getRetryInterval());
		stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, newTaskInstance);
		stateWheelExecuteThread.addTask4RetryCheck(processInstance, newTaskInstance);
	} else {
		addTaskToStandByList(newTaskInstance);
		submitStandByTask();
		waitToRetryTaskInstanceMap.remove(newTaskInstance.getTaskCode());
	}
}

最終將需要容錯的任務(wù)實例重新加入到了readyToSubmitTaskQueue隊列中,重新進行submit:

addTaskToStandByList(newTaskInstance);
submitStandByTask();

后面就是和正常任務(wù)一樣處理了通過submitTaskExec方法提交任務(wù)到具體的worker執(zhí)行。

總結(jié)

對于Worker的容錯流程大致如下:

1-Master基于ZK的監(jiān)聽來感知需要容錯的Worker節(jié)點信息

2-每個Master只負責容錯屬于自己調(diào)度的工作流實例,在容錯前會比較實例的開始時間和服務(wù)節(jié)點的啟動時間,在服務(wù)啟動時間之后的則跳過容錯;

3-需要容錯的任務(wù)實例會重新加入到readyToSubmitTaskQueue,并提交運行。

到此,對于Worker的容錯,就到這里了,更多關(guān)于DolphinScheduler容錯Worker的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 使用IntelliJ IDEA 15和Maven創(chuàng)建Java Web項目(圖文)

    使用IntelliJ IDEA 15和Maven創(chuàng)建Java Web項目(圖文)

    本篇文章主要介紹了使用IntelliJ IDEA 15和Maven創(chuàng)建Java Web項目(圖文),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-09-09
  • Java實現(xiàn)JDBC連接數(shù)據(jù)庫簡單案例

    Java實現(xiàn)JDBC連接數(shù)據(jù)庫簡單案例

    這篇文章主要介紹了Java實現(xiàn)JDBC連接數(shù)據(jù)庫簡單案例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2018-08-08
  • 深入理解Java遺傳算法

    深入理解Java遺傳算法

    這篇文章主要為大家詳細介紹了Java遺傳算法,本文對基因的編碼采用二進制規(guī)則,分享了對Java遺傳算法的理解,感興趣的小伙伴們可以參考一下
    2016-02-02
  • Java如何求交集、并集、差集

    Java如何求交集、并集、差集

    這篇文章主要介紹了Java如何求交集、并集、差集問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-11-11
  • 詳解java Collections.sort的兩種用法

    詳解java Collections.sort的兩種用法

    這篇文章主要介紹了詳解java Collections.sort的兩種用法,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-07-07
  • Java使用easyExcel批量導(dǎo)入數(shù)據(jù)詳解

    Java使用easyExcel批量導(dǎo)入數(shù)據(jù)詳解

    這篇文章主要介紹了Java使用easyExcel批量導(dǎo)入數(shù)據(jù)詳解,通常我們會提供一個模板,此模塊我們可以使用easyExcel導(dǎo)出數(shù)據(jù)生成的一個Excel文件當作模板,提供下載鏈接,用戶在該文件內(nèi)填入規(guī)定的數(shù)據(jù)格式以后可以批量導(dǎo)入數(shù)據(jù)到數(shù)據(jù)庫中,需要的朋友可以參考下
    2023-08-08
  • java時間相關(guān)處理小結(jié)

    java時間相關(guān)處理小結(jié)

    這篇文章介紹了java時間相關(guān)處理,有需要的朋友可以參考一下
    2013-11-11
  • Spring中的接口重試機制解析

    Spring中的接口重試機制解析

    這篇文章主要介紹了Spring中的接口重試機制解析,大家在做項目的時候,往往會遇到一些接口由于網(wǎng)絡(luò)抖動等問題導(dǎo)致接口響應(yīng)超時等,這時候我們會希望能夠按照一定的規(guī)則進行接口請求重試,需要的朋友可以參考下
    2024-01-01
  • logback自定義json日志輸出示例詳解

    logback自定義json日志輸出示例詳解

    這篇文章主要為大家介紹了logback自定義json日志輸出,就是通過logback日志體系以及l(fā)ogstash提供的json?log依賴將數(shù)據(jù)以json格式記錄到日志文件的例子
    2022-03-03
  • java編譯命令基礎(chǔ)知識點

    java編譯命令基礎(chǔ)知識點

    在本篇文章里小編給大家整理的是一篇關(guān)于java編譯命令基礎(chǔ)知識點內(nèi)容,有興趣的朋友們可以學(xué)習(xí)下。
    2021-01-01

最新評論