欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

DolphinScheduler容錯(cuò)Master源碼分析

 更新時(shí)間:2023年02月03日 09:09:56   作者:leo的跟班  
這篇文章主要為大家介紹了DolphinScheduler容錯(cuò)Master源碼分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

引言

最近產(chǎn)品上選擇使用DolphinScheduler作為大數(shù)據(jù)的任務(wù)調(diào)度系統(tǒng)。作為分布式調(diào)度系統(tǒng)DolphinScheduler采用了去中心化的多Master和多Worker服務(wù)對(duì)等架構(gòu),可以避免單Master壓力過大。在這種架構(gòu)下,如果一個(gè)Master或者Worker掛掉,那么相應(yīng)的容錯(cuò)處理則必不可少。下面會(huì)介紹其具體容錯(cuò)處理的方式以及相關(guān)源碼的分析。

容錯(cuò)設(shè)計(jì)

詳細(xì)的設(shè)計(jì)結(jié)構(gòu)我們可以參考官方文檔。

服務(wù)容錯(cuò)設(shè)計(jì)依賴于ZooKeeper的Watcher機(jī)制,Master會(huì)監(jiān)控其他Master和Worker的目錄,如果監(jiān)聽到remove事件,則會(huì)根據(jù)具體的業(yè)務(wù)邏輯進(jìn)行流程實(shí)例容錯(cuò)或者任務(wù)實(shí)例容錯(cuò)。

Zookeeper中主要的目錄名稱如下,這里先簡(jiǎn)單了解一下,后面源碼中會(huì)一一使用到:

public static final String REGISTRY_DOLPHINSCHEDULER_MASTERS = "/nodes/master";
public static final String REGISTRY_DOLPHINSCHEDULER_WORKERS = "/nodes/worker";
public static final String REGISTRY_DOLPHINSCHEDULER_NODE = "/nodes";

官方架構(gòu)圖:

Master容錯(cuò)源碼分析

Master啟動(dòng)入口

MasterServer的run方法中會(huì)有容錯(cuò)的入口:

@PostConstruct
public void run() throws SchedulerException {
	// init rpc server
	this.masterRPCServer.start();
	// install task plugin
	this.taskPluginManager.loadPlugin();
	// self tolerant
	this.masterRegistryClient.start();
	this.masterRegistryClient.setRegistryStoppable(this);
	this.masterSchedulerBootstrap.init();
	this.masterSchedulerBootstrap.start();
	this.eventExecuteService.start();
	this.failoverExecuteThread.start();
	this.schedulerApi.start();
	......
}

Master的啟動(dòng)主要有幾個(gè)步驟:

基于netty的rpc服務(wù)端啟動(dòng)、任務(wù)插件的加載、容錯(cuò)代碼的初始化、任務(wù)調(diào)度的初始化、任務(wù)事件處理線程的啟動(dòng)。

這里我們只關(guān)心容錯(cuò)相關(guān)的代碼 masterRegistryClient.start()

public void start() {
	try {
		this.masterHeartBeatTask = new MasterHeartBeatTask(masterConfig, registryClient);
		// master registry
		registry();
		registryClient.addConnectionStateListener(
				new MasterConnectionStateListener(masterConfig, registryClient, masterConnectStrategy));
		registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener());
	} catch (Exception e) {
		throw new RegistryException("Master registry client start up error", e);
	}
}

在上面的start方法中主要做了三件事,我們一個(gè)個(gè)來看。

Master啟動(dòng)注冊(cè)信息

注冊(cè)當(dāng)前master信息到Zookeeper,并且啟動(dòng)了一個(gè)心跳任務(wù)定時(shí)更新master的信息到Zookeeper。

/**
 * Registry the current master server itself to registry.
 */
void registry() {
	logger.info("Master node : {} registering to registry center", masterConfig.getMasterAddress());
	String masterRegistryPath = masterConfig.getMasterRegistryPath();
	// remove before persist
	registryClient.remove(masterRegistryPath);
	registryClient.persistEphemeral(masterRegistryPath, JSONUtils.toJsonString(masterHeartBeatTask.getHeartBeat()));
	while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.MASTER)) {
		logger.warn("The current master server node:{} cannot find in registry", NetUtils.getHost());
		ThreadUtils.sleep(SLEEP_TIME_MILLIS);
	}
	// sleep 1s, waiting master failover remove
	ThreadUtils.sleep(SLEEP_TIME_MILLIS);
	masterHeartBeatTask.start();
	logger.info("Master node : {} registered to registry center successfully", masterConfig.getMasterAddress());
}

master在ZK注冊(cè)的路徑如下:

masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort()));
//nodes/master+ "/" +ip:listenPort
masterConfig.setMasterRegistryPath(REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + masterConfig.getMasterAddress());

注冊(cè)的信息主要是Master自身的健康狀態(tài)如下,并且會(huì)定時(shí)更新:

@Override
public MasterHeartBeat getHeartBeat() {
	return MasterHeartBeat.builder()
			.startupTime(ServerLifeCycleManager.getServerStartupTime())
			.reportTime(System.currentTimeMillis())
			.cpuUsage(OSUtils.cpuUsage())
			.loadAverage(OSUtils.loadAverage())
			.availablePhysicalMemorySize(OSUtils.availablePhysicalMemorySize())
			.maxCpuloadAvg(masterConfig.getMaxCpuLoadAvg())
			.reservedMemory(masterConfig.getReservedMemory())
			.memoryUsage(OSUtils.memoryUsage())
			.diskAvailable(OSUtils.diskAvailable())
			.processId(processId)
			.build();
}

Master監(jiān)聽和訂閱集群狀態(tài)

監(jiān)聽zk客戶端與集群連接的狀態(tài)變化

@Override
public void addConnectionStateListener(ConnectionListener listener) {
	client.getConnectionStateListenable().addListener(new ZookeeperConnectionStateListener(listener));
}

當(dāng)客戶端和服務(wù)端因?yàn)槟承┰蛑剡B后會(huì)調(diào)用MasterConnectionStateListener相關(guān)的監(jiān)聽事件處理:

@Override
public void onUpdate(ConnectionState state) {
	logger.info("Master received a {} event from registry, the current server state is {}", state,
			ServerLifeCycleManager.getServerStatus());
	switch (state) {
		case CONNECTED:
			break;
		case SUSPENDED:
			break;
		case RECONNECTED:
			masterConnectStrategy.reconnect();
			break;
		case DISCONNECTED:
			masterConnectStrategy.disconnect();
			break;
		default:
	}
}

訂閱Master、Worker注冊(cè)目錄頂級(jí)目錄/nodes相關(guān)的事件,這里主要訂閱Remove事件。

registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener());
@Override
public boolean subscribe(String path, SubscribeListener listener) {
	final TreeCache treeCache = treeCacheMap.computeIfAbsent(path, $ -> new TreeCache(client, path));
	treeCache.getListenable().addListener(($, event) -> listener.notify(new EventAdaptor(event, path)));
	try {
		treeCache.start();
	} catch (Exception e) {
		treeCacheMap.remove(path);
		throw new RegistryException("Failed to subscribe listener for key: " + path, e);
	}
	return true;
}

這里是基于curator客戶端中的TreeCache來實(shí)現(xiàn)訂閱,它允許對(duì)ZK中某個(gè)路徑的數(shù)據(jù)和路徑變更以及其下所有子孫節(jié)點(diǎn)的數(shù)據(jù)和路徑變更進(jìn)行監(jiān)聽。ZK監(jiān)聽數(shù)據(jù)變化后最終會(huì)回調(diào)到MasterRegistryDataListener中的notify方法:

public void notify(Event event) {
        //這里的path就是/node目錄下發(fā)生變化的path信息,可能是/nodes/master/**或者/nodes/worker/**
	final String path = event.path();
	if (Strings.isNullOrEmpty(path)) {
		return;
	}
	//monitor master
	if (path.startsWith(REGISTRY_DOLPHINSCHEDULER_MASTERS + Constants.SINGLE_SLASH)) {
		handleMasterEvent(event);
	} else if (path.startsWith(REGISTRY_DOLPHINSCHEDULER_WORKERS + Constants.SINGLE_SLASH)) {
		//monitor worker
		handleWorkerEvent(event);
	}
}

在notify中會(huì)針對(duì)ZK中/nodes/master/或者/nodes/worker/ 路徑的變動(dòng)做不同的處理。

Master容錯(cuò)流程

這里先來看一下當(dāng)ZK中/nodes/master/路徑下發(fā)生了變動(dòng)之后做了哪些事情:

private void handleMasterEvent(Event event) {
	final String path = event.path();
	switch (event.type()) {
		case ADD:
			logger.info("master node added : {}", path);
			break;
		case REMOVE:
			masterRegistryClient.removeMasterNodePath(path, NodeType.MASTER, true);
			break;
		default:
			break;
	}
}

如果ZK監(jiān)聽到有master node path被刪除,則說明有master節(jié)點(diǎn)異常,此時(shí)需要對(duì)其上面的任務(wù)進(jìn)行容錯(cuò)。

public void removeMasterNodePath(String path, NodeType nodeType, boolean failover) {
	logger.info("{} node deleted : {}", nodeType, path);
	if (StringUtils.isEmpty(path)) {
		logger.error("server down error: empty path: {}, nodeType:{}", path, nodeType);
		return;
	}
        //獲取異常節(jié)點(diǎn)的ip:port 
	String serverHost = registryClient.getHostByEventDataPath(path);
	if (StringUtils.isEmpty(serverHost)) {
		logger.error("server down error: unknown path: {}, nodeType:{}", path, nodeType);
		return;
	}
	try {
		if (!registryClient.exists(path)) {
			logger.info("path: {} not exists", path);
		}
		// failover server
		if (failover) {
			failoverService.failoverServerWhenDown(serverHost, nodeType);
		}
	} catch (Exception e) {
		logger.error("{} server failover failed, host:{}", nodeType, serverHost, e);
	}
}
/**
 * failover server when server down
 *
 * @param serverHost server host
 * @param nodeType   node type
 */
public void failoverServerWhenDown(String serverHost, NodeType nodeType) {
	switch (nodeType) {
		case MASTER:
			LOGGER.info("Master failover starting, masterServer: {}", serverHost);
			masterFailoverService.failoverMaster(serverHost);
			LOGGER.info("Master failover finished, masterServer: {}", serverHost);
			break;
		case WORKER:
			LOGGER.info("Worker failover staring, workerServer: {}", serverHost);
			workerFailoverService.failoverWorker(serverHost);
			LOGGER.info("Worker failover finished, workerServer: {}", serverHost);
			break;
		default:
			break;
	}
}

Master的容錯(cuò)最終會(huì)調(diào)用masterFailoverService.failoverMaster(serverHost);這里的serverHost就是異常Master節(jié)點(diǎn)的ip:port信息。

因?yàn)檫@里可能會(huì)有多個(gè)Master監(jiān)聽到異常Master節(jié)點(diǎn)掛掉的事件,所以會(huì)先拿到分布式鎖然后進(jìn)行容錯(cuò)的操作,拿到鎖的Master會(huì)開始進(jìn)行容錯(cuò):

public void failoverMaster(String masterHost) {
	String failoverPath = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS + "/" + masterHost;
	try {
		registryClient.getLock(failoverPath);
		doFailoverMaster(masterHost);
	} catch (Exception e) {
		LOGGER.error("Master server failover failed, host:{}", masterHost, e);
	} finally {
		registryClient.releaseLock(failoverPath);
	}
}

整個(gè)容錯(cuò)的過程大致如下:

Failover master, will failover process instance and associated task instance. When the process instance belongs to the given masterHost and the restartTime is before the current server start up time,then the process instance will be failovered.

1-首先會(huì)會(huì)根據(jù)異常節(jié)點(diǎn)的masterHost去DB中查詢出所有需要容錯(cuò)的工作流實(shí)例和任務(wù)實(shí)例。

2-其次會(huì)比較工作流實(shí)例啟動(dòng)時(shí)間和當(dāng)前masterHost節(jié)點(diǎn)啟動(dòng)時(shí)間,在服務(wù)啟動(dòng)時(shí)間之后的則跳過容錯(cuò)。如果當(dāng)前節(jié)點(diǎn)還沒有重新啟動(dòng),那么就需要容錯(cuò)所有的實(shí)例。

3-變量工作流實(shí)例下所有的任務(wù)實(shí)例,進(jìn)行容錯(cuò)處理

4-將工作流實(shí)例的Host更新為NULL,并且新增RECOVER_TOLERANCE_FAULT_PROCESS類型的Command到command表中。

private void doFailoverMaster(@NonNull String masterHost) {
	StopWatch failoverTimeCost = StopWatch.createStarted();
	Optional<Date> masterStartupTimeOptional = getServerStartupTime(registryClient.getServerList(NodeType.MASTER),
			masterHost);
        //1-根據(jù) 異常節(jié)點(diǎn)的masterHost查詢所有工作流實(shí)例
	List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(
			masterHost);
	if (CollectionUtils.isEmpty(needFailoverProcessInstanceList)) {
		return;
	}
	LOGGER.info(
			"Master[{}] failover starting there are {} workflowInstance may need to failover, will do a deep check, workflowInstanceIds: {}",
			masterHost,
			needFailoverProcessInstanceList.size(),
			needFailoverProcessInstanceList.stream().map(ProcessInstance::getId).collect(Collectors.toList()));
	for (ProcessInstance processInstance : needFailoverProcessInstanceList) {
		try {
			LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
			LOGGER.info("WorkflowInstance failover starting");
                        //2-校驗(yàn)工作流實(shí)例啟動(dòng)時(shí)間是否滿足容錯(cuò)條件
			if (!checkProcessInstanceNeedFailover(masterStartupTimeOptional, processInstance)) {
				LOGGER.info("WorkflowInstance doesn't need to failover");
				continue;
			}
			// todo: use batch query
			ProcessDefinition processDefinition =
					processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
							processInstance.getProcessDefinitionVersion());
			processInstance.setProcessDefinition(processDefinition);
			int processInstanceId = processInstance.getId();
			List<TaskInstance> taskInstanceList =
					processService.findValidTaskListByProcessId(processInstanceId);
                        //3-對(duì)任務(wù)實(shí)例進(jìn)行容錯(cuò),具體在failoverTaskInstance方法中
			for (TaskInstance taskInstance : taskInstanceList) {
				try {
					LoggerUtils.setTaskInstanceIdMDC(taskInstance.getId());
					LOGGER.info("TaskInstance failover starting");
					if (!checkTaskInstanceNeedFailover(taskInstance)) {
						LOGGER.info("The taskInstance doesn't need to failover");
						continue;
					}
					failoverTaskInstance(processInstance, taskInstance);
					LOGGER.info("TaskInstance failover finished");
				} finally {
					LoggerUtils.removeTaskInstanceIdMDC();
				}
			}
                        //4-insert a failover command
			ProcessInstanceMetrics.incProcessInstanceByState("failover");
			// updateProcessInstance host is null to mark this processInstance has been failover
			// and insert a failover command
			processInstance.setHost(Constants.NULL);
			processService.processNeedFailoverProcessInstances(processInstance);
			LOGGER.info("WorkflowInstance failover finished");
		} finally {
			LoggerUtils.removeWorkflowInstanceIdMDC();
		}
	}
	failoverTimeCost.stop();
	LOGGER.info("Master[{}] failover finished, useTime:{}ms",
			masterHost,
			failoverTimeCost.getTime(TimeUnit.MILLISECONDS));
}

構(gòu)造Command,類型為RECOVER_TOLERANCE_FAULT_PROCESS

/**
 * process need failover process instance
 *
 * @param processInstance processInstance
 */
@Override
@Transactional
public void processNeedFailoverProcessInstances(ProcessInstance processInstance) {
	// 1 update processInstance host is null
	processInstance.setHost(Constants.NULL);
	processInstanceMapper.updateById(processInstance);
	ProcessDefinition processDefinition = findProcessDefinition(processInstance.getProcessDefinitionCode(),
			processInstance.getProcessDefinitionVersion());
	// 2 insert into recover command
	Command cmd = new Command();
	cmd.setProcessDefinitionCode(processDefinition.getCode());
	cmd.setProcessDefinitionVersion(processDefinition.getVersion());
	cmd.setProcessInstanceId(processInstance.getId());
	cmd.setCommandParam(
			String.format("{\"%s\":%d}", CMD_PARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId()));
	cmd.setExecutorId(processInstance.getExecutorId());
	cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
	cmd.setProcessInstancePriority(processInstance.getProcessInstancePriority());
	createCommand(cmd);
}

工作流實(shí)例中任務(wù)實(shí)例的處理,設(shè)置狀態(tài)TaskExecutionStatus.NEED_FAULT_TOLERANCE

private void failoverTaskInstance(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
	TaskMetrics.incTaskInstanceByState("failover");
	boolean isMasterTask = TaskProcessorFactory.isMasterTask(taskInstance.getTaskType());
	taskInstance.setProcessInstance(processInstance);
	if (!isMasterTask) {
		LOGGER.info("The failover taskInstance is not master task");
		TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
				.buildTaskInstanceRelatedInfo(taskInstance)
				.buildProcessInstanceRelatedInfo(processInstance)
				.buildProcessDefinitionRelatedInfo(processInstance.getProcessDefinition())
				.create();
		if (masterConfig.isKillYarnJobWhenTaskFailover()) {
			// only kill yarn job if exists , the local thread has exited
			LOGGER.info("TaskInstance failover begin kill the task related yarn job");
			ProcessUtils.killYarnJob(logClient, taskExecutionContext);
		}
		// kill worker task, When the master failover and worker failover happened in the same time,
		// the task may not be failover if we don't set NEED_FAULT_TOLERANCE.
		// This can be improved if we can load all task when cache a workflowInstance in memory
		sendKillCommandToWorker(taskInstance);
	} else {
		LOGGER.info("The failover taskInstance is a master task");
	}
	taskInstance.setState(TaskExecutionStatus.NEED_FAULT_TOLERANCE);
	processService.saveTaskInstance(taskInstance);
}

容錯(cuò)工作流被重新調(diào)度

前面介紹了在Master啟動(dòng)之后,會(huì)啟動(dòng)一個(gè)MasterSchedulerBootstrap線程對(duì)任務(wù)進(jìn)行調(diào)度。在DolphinScheduler中不管是定時(shí)任務(wù),還是單次任務(wù),或者是容錯(cuò)的任務(wù),如果到了需要執(zhí)行的時(shí)刻都會(huì)生成一個(gè)command命令插入到command表中。而MasterSchedulerBootstrap這個(gè)線程的作用就是不斷從command表中獲取需要被執(zhí)行的command,來進(jìn)行調(diào)度。

/**
 * run of MasterSchedulerService
 */
@Override
public void run() {
	while (!ServerLifeCycleManager.isStopped()) {
		try {
			if (!ServerLifeCycleManager.isRunning()) {
				// the current server is not at running status, cannot consume command.
				logger.warn("The current server {} is not at running status, cannot consumes commands.", this.masterAddress);
				Thread.sleep(Constants.SLEEP_TIME_MILLIS);
			}
			// todo: if the workflow event queue is much, we need to handle the back pressure
			boolean isOverload =
					OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory());
			if (isOverload) {
				logger.warn("The current server {} is overload, cannot consumes commands.", this.masterAddress);
				MasterServerMetrics.incMasterOverload();
				Thread.sleep(Constants.SLEEP_TIME_MILLIS);
				continue;
			}
			List<Command> commands = findCommands();
			if (CollectionUtils.isEmpty(commands)) {
				// indicate that no command ,sleep for 1s
				Thread.sleep(Constants.SLEEP_TIME_MILLIS);
				continue;
			}
			List<ProcessInstance> processInstances = command2ProcessInstance(commands);
			if (CollectionUtils.isEmpty(processInstances)) {
				// indicate that the command transform to processInstance error, sleep for 1s
				Thread.sleep(Constants.SLEEP_TIME_MILLIS);
				continue;
			}
			MasterServerMetrics.incMasterConsumeCommand(commands.size());
			processInstances.forEach(processInstance -> {
				try {
					LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
					if (processInstanceExecCacheManager.contains(processInstance.getId())) {
						logger.error(
								"The workflow instance is already been cached, this case shouldn't be happened");
					}
					WorkflowExecuteRunnable workflowRunnable = new WorkflowExecuteRunnable(processInstance,
							processService,
							processInstanceDao,
							nettyExecutorManager,
							processAlertManager,
							masterConfig,
							stateWheelExecuteThread,
							curingGlobalParamsService);
					processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable);
					workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW,
							processInstance.getId()));
				} finally {
					LoggerUtils.removeWorkflowInstanceIdMDC();
				}
			});
		} catch (InterruptedException interruptedException) {
			logger.warn("Master schedule bootstrap interrupted, close the loop", interruptedException);
			Thread.currentThread().interrupt();
			break;
		} catch (Exception e) {
			logger.error("Master schedule workflow error", e);
			// sleep for 1s here to avoid the database down cause the exception boom
			ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
		}
	}
}

上面的代碼每次會(huì)從DB中批量獲取10個(gè)(默認(rèn))command。然后構(gòu)造成工作流實(shí)例進(jìn)行遍歷處理。

這里需要關(guān)注一下command2ProcessInstance方法,會(huì)將獲取到需要執(zhí)行的command轉(zhuǎn)為工作流實(shí)例。在其內(nèi)部最終會(huì)調(diào)用handleCommand方法中的constructProcessInstance方法來構(gòu)造工作流實(shí)例。在其內(nèi)部會(huì)將當(dāng)前處理此實(shí)例的host節(jié)點(diǎn)信息設(shè)置到實(shí)例信息中,并且會(huì)對(duì)commandType是RECOVER_TOLERANCE_FAULT_PROCESS容錯(cuò)類型的情況進(jìn)行設(shè)置(省略部分代碼):

protected @Nullable ProcessInstance constructProcessInstance(Command command,
															 String host) throws CronParseException, CodeGenerateException {
	ProcessInstance processInstance;
	ProcessDefinition processDefinition;
	CommandType commandType = command.getCommandType();
	processDefinition =
			this.findProcessDefinition(command.getProcessDefinitionCode(), command.getProcessDefinitionVersion());
	if (processDefinition == null) {
		logger.error("cannot find the work process define! define code : {}", command.getProcessDefinitionCode());
		throw new IllegalArgumentException("Cannot find the process definition for this workflowInstance");
	}
	Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam());
	int processInstanceId = command.getProcessInstanceId();
	if (processInstanceId == 0) {
		processInstance = generateNewProcessInstance(processDefinition, command, cmdParam);
	} else {
		processInstance = this.findProcessInstanceDetailById(processInstanceId).orElse(null);
		if (processInstance == null) {
			return null;
		}
	}
	if (cmdParam != null) {
		......
	}
	// reset command parameter
	if (processInstance.getCommandParam() != null) {
		Map<String, String> processCmdParam = JSONUtils.toMap(processInstance.getCommandParam());
		processCmdParam.forEach((key, value) -> {
			if (!cmdParam.containsKey(key)) {
				cmdParam.put(key, value);
			}
		});
	}
	// reset command parameter if sub process
	if (cmdParam.containsKey(CommandKeyConstants.CMD_PARAM_SUB_PROCESS)) {
		processInstance.setCommandParam(command.getCommandParam());
	}
	if (Boolean.FALSE.equals(checkCmdParam(command, cmdParam))) {
		logger.error("command parameter check failed!");
		return null;
	}
	if (command.getScheduleTime() != null) {
		processInstance.setScheduleTime(command.getScheduleTime());
	}
        //設(shè)置處理節(jié)點(diǎn)的host和restartTime。
	processInstance.setHost(host);
	processInstance.setRestartTime(new Date());
	WorkflowExecutionStatus runStatus = WorkflowExecutionStatus.RUNNING_EXECUTION;
	int runTime = processInstance.getRunTimes();
	switch (commandType) {
		case START_PROCESS:
			break;
		case START_FAILURE_TASK_PROCESS:
			// find failed tasks and init these tasks
			......
			break;
		case START_CURRENT_TASK_PROCESS:
			break;
		case RECOVER_WAITING_THREAD:
			break;
		case RECOVER_SUSPENDED_PROCESS:
			// find pause tasks and init task's state
			......
			break;
                //這里對(duì)容錯(cuò)類型的command進(jìn)行設(shè)置Flag.YES
		case RECOVER_TOLERANCE_FAULT_PROCESS:
			// recover tolerance fault process
			processInstance.setRecovery(Flag.YES);
			processInstance.setRunTimes(runTime + 1);
			runStatus = processInstance.getState();
			break;
		case COMPLEMENT_DATA:
			// delete all the valid tasks when complement data if id is not null
			......
			break;
		case REPEAT_RUNNING:
			// delete the recover task names from command parameter
			......
			break;
		case SCHEDULER:
			break;
		default:
			break;
	}
	processInstance.setStateWithDesc(runStatus, commandType.getDescp());
	return processInstance;
}

接著對(duì)于每個(gè)工作流實(shí)例都會(huì)加入到一個(gè)隊(duì)列中。

private static final LinkedBlockingQueue<WorkflowEvent> workflowEventQueue = new LinkedBlockingQueue<>();
/**
 * Add a workflow event.
 */
public void addEvent(WorkflowEvent workflowEvent) {
	workflowEventQueue.add(workflowEvent);
	logger.info("Added workflow event to workflowEvent queue, event: {}", workflowEvent);
}
/**
 * Pool the head of the workflow event queue and wait an workflow event.
 */
public WorkflowEvent poolEvent() throws InterruptedException {
	return workflowEventQueue.take();
}

有添加就會(huì)有消費(fèi),在Master啟動(dòng)之后就已經(jīng)啟動(dòng)了消費(fèi)的線程WorkflowEventLooper。

this.masterSchedulerBootstrap.start(); 啟動(dòng)入口:

@Override
public synchronized void start() {
	logger.info("Master schedule bootstrap starting..");
	super.start();
	workflowEventLooper.start();
	logger.info("Master schedule bootstrap started...");
}

具體消費(fèi)邏輯代碼:

public void run() {
	WorkflowEvent workflowEvent = null;
	while (!ServerLifeCycleManager.isStopped()) {
		try {
			workflowEvent = workflowEventQueue.poolEvent();
			LoggerUtils.setWorkflowInstanceIdMDC(workflowEvent.getWorkflowInstanceId());
			logger.info("Workflow event looper receive a workflow event: {}, will handle this", workflowEvent);
			WorkflowEventHandler workflowEventHandler =
					workflowEventHandlerMap.get(workflowEvent.getWorkflowEventType());
			workflowEventHandler.handleWorkflowEvent(workflowEvent);
		} catch (InterruptedException e) {
			logger.warn("WorkflowEventLooper thread is interrupted, will close this loop", e);
			Thread.currentThread().interrupt();
			break;
		} catch (WorkflowEventHandleException workflowEventHandleException) {
			logger.error("Handle workflow event failed, will add this event to event queue again, event: {}",
					workflowEvent, workflowEventHandleException);
			workflowEventQueue.addEvent(workflowEvent);
			ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
		} catch (WorkflowEventHandleError workflowEventHandleError) {
			logger.error("Handle workflow event error, will drop this event, event: {}",
					workflowEvent,
					workflowEventHandleError);
		} catch (Exception unknownException) {
			logger.error(
					"Handle workflow event failed, get a unknown exception, will add this event to event queue again, event: {}",
					workflowEvent, unknownException);
			workflowEventQueue.addEvent(workflowEvent);
			ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
		} finally {
			LoggerUtils.removeWorkflowInstanceIdMDC();
		}
	}
}

可以看到就是從隊(duì)列中獲取添加的event,然后找對(duì)應(yīng)的handler處理。最終會(huì)進(jìn)入到WorkflowStartEventHandler中:

@Override
public void handleWorkflowEvent(final WorkflowEvent workflowEvent) throws WorkflowEventHandleError {
	logger.info("Handle workflow start event, begin to start a workflow, event: {}", workflowEvent);
	WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager.getByProcessInstanceId(
		workflowEvent.getWorkflowInstanceId());
	if (workflowExecuteRunnable == null) {
		throw new WorkflowEventHandleError(
			"The workflow start event is invalid, cannot find the workflow instance from cache");
	}
	ProcessInstanceMetrics.incProcessInstanceByState("submit");
	ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance();
	CompletableFuture.supplyAsync(workflowExecuteRunnable::call, workflowExecuteThreadPool)
		.thenAccept(workflowSubmitStatue -> {
			if (WorkflowSubmitStatue.SUCCESS == workflowSubmitStatue) {
				// submit failed will resend the event to workflow event queue
				logger.info("Success submit the workflow instance");
				if (processInstance.getTimeout() > 0) {
					stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
				}
			} else {
				logger.error("Failed to submit the workflow instance, will resend the workflow start event: {}",
							 workflowEvent);
				workflowEventQueue.addEvent(workflowEvent);
			}
		});
}

最終就會(huì)對(duì)工作流實(shí)例中的Task進(jìn)行提交處理:

構(gòu)造工作流實(shí)例的DAG,初始化隊(duì)列,提交DAG的頭節(jié)點(diǎn)

@Override
public WorkflowSubmitStatue call() {
	if (isStart()) {
		// This case should not been happened
		logger.warn("[WorkflowInstance-{}] The workflow has already been started", processInstance.getId());
		return WorkflowSubmitStatue.DUPLICATED_SUBMITTED;
	}
	try {
		LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
		if (workflowRunnableStatus == WorkflowRunnableStatus.CREATED) {
			buildFlowDag();
			workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_DAG;
			logger.info("workflowStatue changed to :{}", workflowRunnableStatus);
		}
		if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_DAG) {
			initTaskQueue();
			workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_QUEUE;
			logger.info("workflowStatue changed to :{}", workflowRunnableStatus);
		}
		if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_QUEUE) {
			submitPostNode(null);
			workflowRunnableStatus = WorkflowRunnableStatus.STARTED;
			logger.info("workflowStatue changed to :{}", workflowRunnableStatus);
		}
		return WorkflowSubmitStatue.SUCCESS;
	} catch (Exception e) {
		logger.error("Start workflow error", e);
		return WorkflowSubmitStatue.FAILED;
	} finally {
		LoggerUtils.removeWorkflowInstanceIdMDC();
	}
}

那么針對(duì)需要容錯(cuò)的任務(wù)是在哪里處理的呢? 可以去initTaskQueue方法中瞧一瞧:

首先會(huì)調(diào)用isNewProcessInstance方法來判斷是否是新的工作流實(shí)例,具體代碼如下

private boolean isNewProcessInstance() {
	if (Flag.YES.equals(processInstance.getRecovery())) {
		logger.info("This workInstance will be recover by this execution");
		return false;
	}
	if (WorkflowExecutionStatus.RUNNING_EXECUTION == processInstance.getState()
			&& processInstance.getRunTimes() == 1) {
		return true;
	}
	logger.info(
			"The workflowInstance has been executed before, this execution is to reRun, processInstance status: {}, runTimes: {}",
			processInstance.getState(),
			processInstance.getRunTimes());
	return false;
}

通過上面可以看到如果工作流實(shí)例的recovery熟悉等于Flag.YES,則會(huì)返回false。通過前面我們可以知道在處理容錯(cuò)類型command轉(zhuǎn)換為ProcessInstance的時(shí)候?qū)ζ?strong>recovery屬性設(shè)置了Flag.YES。因此返回false,就會(huì)執(zhí)行如下的處理

 if (!isNewProcessInstance()) {
		logger.info("The workflowInstance is not a newly running instance, runtimes: {}, recover flag: {}",
				processInstance.getRunTimes(),
				processInstance.getRecovery());
		List<TaskInstance> validTaskInstanceList =
				processService.findValidTaskListByProcessId(processInstance.getId());
		for (TaskInstance task : validTaskInstanceList) {
			try {
				LoggerUtils.setWorkflowAndTaskInstanceIDMDC(task.getProcessInstanceId(), task.getId());
				logger.info(
						"Check the taskInstance from a exist workflowInstance, existTaskInstanceCode: {}, taskInstanceStatus: {}",
						task.getTaskCode(),
						task.getState());
				if (validTaskMap.containsKey(task.getTaskCode())) {
					logger.warn("Have same taskCode taskInstance when init task queue, need to check taskExecutionStatus, taskCode:{}",
							task.getTaskCode());
					int oldTaskInstanceId = validTaskMap.get(task.getTaskCode());
					TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId);
					if (!oldTaskInstance.getState().isFinished() && task.getState().isFinished()) {
						task.setFlag(Flag.NO);
						processService.updateTaskInstance(task);
						continue;
					}
				}
				validTaskMap.put(task.getTaskCode(), task.getId());
				taskInstanceMap.put(task.getId(), task);
				if (task.isTaskComplete()) {
					logger.info("TaskInstance is already complete.");
					completeTaskMap.put(task.getTaskCode(), task.getId());
					continue;
				}
				if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()),
						dag)) {
					continue;
				}
				if (task.taskCanRetry()) {
					if (task.getState().isNeedFaultTolerance()) {
						logger.info("TaskInstance needs fault tolerance, will be added to standby list.");
						task.setFlag(Flag.NO);
						processService.updateTaskInstance(task);
						// tolerantTaskInstance add to standby list directly
						TaskInstance tolerantTaskInstance = cloneTolerantTaskInstance(task);
						addTaskToStandByList(tolerantTaskInstance);
					} else {
						logger.info("Retry taskInstance, taskState: {}", task.getState());
						retryTaskInstance(task);
					}
					continue;
				}
				if (task.getState().isFailure()) {
					errorTaskMap.put(task.getTaskCode(), task.getId());
				}
			} finally {
				LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
			}
		}
	} else {
		logger.info("The current workflowInstance is a newly running workflowInstance");
	}

上面就會(huì)對(duì)工作流實(shí)例下的所有任務(wù)實(shí)例進(jìn)行處理,對(duì)已完成的任務(wù)加入到completeTaskMap中。并且通過task.getState().isNeedFaultTolerance()來判斷是否是需要容錯(cuò)的任務(wù)還是重試任務(wù)。容錯(cuò)任務(wù)會(huì)加入到隊(duì)列readyToSubmitTaskQueue中(tolerantTaskInstance add to standby list directly)。

最后通過submitPostNode方法來觸發(fā)工作流實(shí)例中任務(wù)實(shí)例的執(zhí)行。可以發(fā)現(xiàn)和普通任務(wù)沒有什么區(qū)別。

總結(jié)

對(duì)于Master的容錯(cuò)流程大致分為三個(gè)方向:

1-獲取容錯(cuò)范圍:全程會(huì)加鎖獲取,根據(jù)異常節(jié)點(diǎn)的host去獲取哪些工作流實(shí)例需要加速

2-容錯(cuò)處理:包括容錯(cuò)工作流實(shí)例和任務(wù)實(shí)例,在容錯(cuò)前會(huì)比較實(shí)例的開始時(shí)間和服務(wù)節(jié)點(diǎn)的啟動(dòng)時(shí)間,在服務(wù)啟動(dòng)時(shí)間之后的則跳過容錯(cuò);最終生成command命令到表中。

3-容錯(cuò)任務(wù)的調(diào)度:調(diào)度線程遍歷到command命令之后會(huì)重新構(gòu)造為工作流實(shí)例,并根據(jù)容錯(cuò)的類型初始化任務(wù)實(shí)例到對(duì)應(yīng)的隊(duì)列中,然后對(duì)其任務(wù)實(shí)例重新進(jìn)行調(diào)度。

可以參考官方文檔

以上就是DolphinScheduler容錯(cuò)Master源碼分析的詳細(xì)內(nèi)容,更多關(guān)于DolphinScheduler容錯(cuò)Master的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評(píng)論