PowerJob的MapProcessor工作流程源碼解讀
序
本文主要研究一下PowerJob的MapProcessor
powerjob 任務(wù)與工作流配置

MapProcessor
tech/powerjob/worker/core/processor/sdk/MapProcessor.java
public interface MapProcessor extends BasicProcessor {
Logger log = LoggerFactory.getLogger(MapProcessor.class);
int RECOMMEND_BATCH_SIZE = 200;
/**
* 分發(fā)子任務(wù)
* @param taskList 子任務(wù),再次執(zhí)行時可通過 TaskContext#getSubTask 獲取
* @param taskName 子任務(wù)名稱,即子任務(wù)處理器中 TaskContext#getTaskName 獲取到的值
* @throws PowerJobCheckedException map 失敗將拋出異常
*/
default void map(List<?> taskList, String taskName) throws PowerJobCheckedException {
if (CollectionUtils.isEmpty(taskList)) {
return;
}
TaskDO task = ThreadLocalStore.getTask();
WorkerRuntime workerRuntime = ThreadLocalStore.getRuntimeMeta();
if (taskList.size() > RECOMMEND_BATCH_SIZE) {
log.warn("[Map-{}] map task size is too large, network maybe overload... please try to split the tasks.", task.getInstanceId());
}
// 修復(fù) map 任務(wù)命名和根任務(wù)名或者最終任務(wù)名稱一致導(dǎo)致的問題(無限生成子任務(wù)或者直接失敗)
if (TaskConstant.ROOT_TASK_NAME.equals(taskName) || TaskConstant.LAST_TASK_NAME.equals(taskName)) {
log.warn("[Map-{}] illegal map task name : {}! please do not use 'OMS_ROOT_TASK' or 'OMS_LAST_TASK' as map task name. as a precaution, it will be renamed 'X-{}' automatically." ,task.getInstanceId() ,taskName , taskName);
taskName ="X-"+taskName;
}
// 1. 構(gòu)造請求
ProcessorMapTaskRequest req = new ProcessorMapTaskRequest(task, taskList, taskName);
// 2. 可靠發(fā)送請求(任務(wù)不允許丟失,需要使用 ask 方法,失敗拋異常)
boolean requestSucceed = TransportUtils.reliableMapTask(req, task.getAddress(), workerRuntime);
if (requestSucceed) {
log.info("[Map-{}] map task[name={},num={}] successfully!", task.getInstanceId(), taskName, taskList.size());
}else {
throw new PowerJobCheckedException("map failed for task: " + taskName);
}
}
/**
* 是否為根任務(wù)
* @return true -> 根任務(wù) / false -> 非根任務(wù)
*/
default boolean isRootTask() {
TaskDO task = ThreadLocalStore.getTask();
return TaskConstant.ROOT_TASK_NAME.equals(task.getTaskName());
}
}MapProcessor接口繼承了BasicProcessor,它提供了默認(rèn)的map方法用于分發(fā)子任務(wù),它主要是構(gòu)造了ProcessorMapTaskRequest,通過TransportUtils.reliableMapTask發(fā)送請求;它還提供了isRootTask方法用于判斷當(dāng)前任務(wù)是不是根任務(wù)
ProcessorMapTaskRequest
tech/powerjob/worker/pojo/request/ProcessorMapTaskRequest.java
@Getter
@NoArgsConstructor
public class ProcessorMapTaskRequest implements PowerSerializable {
private Long instanceId;
private Long subInstanceId;
private String taskName;
private List<SubTask> subTasks;
@Getter
@NoArgsConstructor
@AllArgsConstructor
public static class SubTask {
private String taskId;
private byte[] taskContent;
}
public ProcessorMapTaskRequest(TaskDO parentTask, List<?> subTaskList, String taskName) {
this.instanceId = parentTask.getInstanceId();
this.subInstanceId = parentTask.getSubInstanceId();
this.taskName = taskName;
this.subTasks = Lists.newLinkedList();
subTaskList.forEach(subTask -> {
// 同一個 Task 內(nèi)部可能多次 Map,因此還是要確保線程級別的唯一
String subTaskId = parentTask.getTaskId() + "." + ThreadLocalStore.getTaskIDAddr().getAndIncrement();
// 寫入類名,方便反序列化
subTasks.add(new SubTask(subTaskId, SerializerUtils.serialize(subTask)));
});
}
} ProcessorMapTaskRequest的構(gòu)造器將subTaskList轉(zhuǎn)換為一系列的SubTask,它使用SerializerUtils.serialize序列化(Kryo)了用戶定義的subTask
reliableMapTask
tech/powerjob/worker/common/utils/TransportUtils.java
public static boolean reliableMapTask(ProcessorMapTaskRequest req, String address, WorkerRuntime workerRuntime) throws PowerJobCheckedException {
try {
return reliableAsk(ServerType.WORKER, WTT_PATH, WTT_HANDLER_MAP_TASK, address, req, workerRuntime.getTransporter()).isSuccess();
} catch (Throwable throwable) {
throw new PowerJobCheckedException(throwable);
}
}
private static AskResponse reliableAsk(ServerType t, String rootPath, String handlerPath, String address, PowerSerializable req, Transporter transporter) throws Exception {
final URL url = easyBuildUrl(t, rootPath, handlerPath, address);
final CompletionStage<AskResponse> completionStage = transporter.ask(url, req, AskResponse.class);
return completionStage
.toCompletableFuture()
.get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
}reliableMapTask方法通過reliableAsk往taskTracker/mapTask接口發(fā)送請求,默認(rèn)是5s超時
onReceiveProcessorMapTaskRequest
tech/powerjob/worker/actors/TaskTrackerActor.java
/**
* 子任務(wù) map 處理器
*/
@Handler(path = WTT_HANDLER_MAP_TASK)
public AskResponse onReceiveProcessorMapTaskRequest(ProcessorMapTaskRequest req) {
HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.getTaskTracker(req.getInstanceId());
if (taskTracker == null) {
log.warn("[TaskTrackerActor] receive ProcessorMapTaskRequest({}) but system can't find TaskTracker.", req);
return null;
}
boolean success = false;
List<TaskDO> subTaskList = Lists.newLinkedList();
try {
req.getSubTasks().forEach(originSubTask -> {
TaskDO subTask = new TaskDO();
subTask.setTaskName(req.getTaskName());
subTask.setSubInstanceId(req.getSubInstanceId());
subTask.setTaskId(originSubTask.getTaskId());
subTask.setTaskContent(originSubTask.getTaskContent());
subTaskList.add(subTask);
});
success = taskTracker.submitTask(subTaskList);
}catch (Exception e) {
log.warn("[TaskTrackerActor] process map task(instanceId={}) failed.", req.getInstanceId(), e);
}
AskResponse response = new AskResponse();
response.setSuccess(success);
return response;
}TaskTrackerActor提供了onReceiveProcessorMapTaskRequest方法處理ProcessorMapTaskRequest,它將入?yún)⒌膕ubTasks轉(zhuǎn)換為一系列的TaskDO,然后通過taskTracker.submitTask提交
submitTask
tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java
/**
* 提交Task任務(wù)(MapReduce的Map,Broadcast的廣播),上層保證 batchSize,同時插入過多數(shù)據(jù)可能導(dǎo)致失敗
*
* @param newTaskList 新增的子任務(wù)列表
*/
public boolean submitTask(List<TaskDO> newTaskList) {
if (finished.get()) {
return true;
}
if (CollectionUtils.isEmpty(newTaskList)) {
return true;
}
// 基礎(chǔ)處理(多循環(huán)一次雖然有些浪費,但分布式執(zhí)行中,這點耗時絕不是主要占比,忽略不計?。?
newTaskList.forEach(task -> {
task.setInstanceId(instanceId);
task.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
task.setFailedCnt(0);
task.setLastModifiedTime(System.currentTimeMillis());
task.setCreatedTime(System.currentTimeMillis());
task.setLastReportTime(-1L);
});
log.debug("[TaskTracker-{}] receive new tasks: {}", instanceId, newTaskList);
return taskPersistenceService.batchSave(newTaskList);
}submitTask方法先填充一些字段信息,比如設(shè)置status為TaskStatus.WAITING_DISPATCH,然后調(diào)用taskPersistenceService.batchSave保存
batchSave
tech/powerjob/worker/persistence/TaskPersistenceService.java
public boolean batchSave(List<TaskDO> tasks) {
if (CollectionUtils.isEmpty(tasks)) {
return true;
}
try {
return execute(() -> taskDAO.batchSave(tasks));
}catch (Exception e) {
log.error("[TaskPersistenceService] batchSave tasks({}) failed.", tasks, e);
}
return false;
}
private static <T> T execute(SupplierPlus<T> executor) throws Exception {
return CommonUtils.executeWithRetry(executor, RETRY_TIMES, RETRY_INTERVAL_MS);
}batchSave通過taskDAO.batchSave報錯,它針對異常會重試3次,每次間隔100ms
TaskDAOImpl.batchSave
tech/powerjob/worker/persistence/TaskDAOImpl.java
public boolean batchSave(Collection<TaskDO> tasks) throws SQLException {
String insertSql = "insert into task_info(task_id, instance_id, sub_instance_id, task_name, task_content, address, status, result, failed_cnt, created_time, last_modified_time, last_report_time) values (?,?,?,?,?,?,?,?,?,?,?,?)";
boolean originAutoCommitFlag ;
try (Connection conn = connectionFactory.getConnection()) {
originAutoCommitFlag = conn.getAutoCommit();
conn.setAutoCommit(false);
try ( PreparedStatement ps = conn.prepareStatement(insertSql)) {
for (TaskDO task : tasks) {
fillInsertPreparedStatement(task, ps);
ps.addBatch();
}
ps.executeBatch();
return true;
} catch (Throwable e) {
conn.rollback();
throw e;
} finally {
conn.setAutoCommit(originAutoCommitFlag);
}
}
}TaskDAOImpl的batchSave直接通過jdbc的executeBatch進(jìn)行批量保存
Dispatcher
tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java
/**
* 定時掃描數(shù)據(jù)庫中的task(出于內(nèi)存占用量考慮,每次最多獲取100個),并將需要執(zhí)行的任務(wù)派發(fā)出去
*/
protected class Dispatcher implements Runnable {
// 數(shù)據(jù)庫查詢限制,每次最多查詢幾個任務(wù)
private static final int DB_QUERY_LIMIT = 100;
@Override
public void run() {
if (finished.get()) {
return;
}
Stopwatch stopwatch = Stopwatch.createStarted();
// 1. 獲取可以派發(fā)任務(wù)的 ProcessorTracker
List<String> availablePtIps = ptStatusHolder.getAvailableProcessorTrackers();
// 2. 沒有可用 ProcessorTracker,本次不派發(fā)
if (availablePtIps.isEmpty()) {
log.debug("[TaskTracker-{}] no available ProcessorTracker now.", instanceId);
return;
}
// 3. 避免大查詢,分批派發(fā)任務(wù)
long currentDispatchNum = 0;
long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency() * 2L;
AtomicInteger index = new AtomicInteger(0);
// 4. 循環(huán)查詢數(shù)據(jù)庫,獲取需要派發(fā)的任務(wù)
while (maxDispatchNum > currentDispatchNum) {
int dbQueryLimit = Math.min(DB_QUERY_LIMIT, (int) maxDispatchNum);
List<TaskDO> needDispatchTasks = taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.WAITING_DISPATCH, dbQueryLimit);
currentDispatchNum += needDispatchTasks.size();
needDispatchTasks.forEach(task -> {
// 獲取 ProcessorTracker 地址,如果 Task 中自帶了 Address,則使用該 Address
String ptAddress = task.getAddress();
if (StringUtils.isEmpty(ptAddress) || RemoteConstant.EMPTY_ADDRESS.equals(ptAddress)) {
ptAddress = availablePtIps.get(index.getAndIncrement() % availablePtIps.size());
}
dispatchTask(task, ptAddress);
});
// 數(shù)量不足 或 查詢失敗,則終止循環(huán)
if (needDispatchTasks.size() < dbQueryLimit) {
break;
}
}
log.debug("[TaskTracker-{}] dispatched {} tasks,using time {}.", instanceId, currentDispatchNum, stopwatch.stop());
}
}HeavyTaskTracker每5s調(diào)度一次Dispatcher,其run方法先通過ptStatusHolder.getAvailableProcessorTrackers()獲取可以派發(fā)任務(wù)的ProcessorTracker,之后循環(huán)從數(shù)據(jù)庫拉取一批狀態(tài)為TaskStatus.WAITING_DISPATCH的任務(wù),通過輪詢的方式進(jìn)行dispatchTask
dispatchTask
tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java
protected void dispatchTask(TaskDO task, String processorTrackerAddress) {
// 1. 持久化,更新數(shù)據(jù)庫(如果更新數(shù)據(jù)庫失敗,可能導(dǎo)致重復(fù)執(zhí)行,先不處理)
TaskDO updateEntity = new TaskDO();
updateEntity.setStatus(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK.getValue());
// 寫入處理該任務(wù)的 ProcessorTracker
updateEntity.setAddress(processorTrackerAddress);
boolean success = taskPersistenceService.updateTask(instanceId, task.getTaskId(), updateEntity);
if (!success) {
log.warn("[TaskTracker-{}] dispatch task(taskId={},taskName={}) failed due to update task status failed.", instanceId, task.getTaskId(), task.getTaskName());
return;
}
// 2. 更新 ProcessorTrackerStatus 狀態(tài)
ptStatusHolder.getProcessorTrackerStatus(processorTrackerAddress).setDispatched(true);
// 3. 初始化緩存
taskId2BriefInfo.put(task.getTaskId(), new TaskBriefInfo(task.getTaskId(), TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, -1L));
// 4. 任務(wù)派發(fā)
TaskTrackerStartTaskReq startTaskReq = new TaskTrackerStartTaskReq(instanceInfo, task, workerRuntime.getWorkerAddress());
TransportUtils.ttStartPtTask(startTaskReq, processorTrackerAddress, workerRuntime.getTransporter());
log.debug("[TaskTracker-{}] dispatch task(taskId={},taskName={}) successfully.", instanceId, task.getTaskId(), task.getTaskName());
}dispatchTask先更新status為DISPATCH_SUCCESS_WORKER_UNCHECK,更新處理該任務(wù)的ProcessorTracker,之后構(gòu)建TaskTrackerStartTaskReq,通過TransportUtils.ttStartPtTask派送執(zhí)行task的請求
onReceiveTaskTrackerStartTaskReq
tech/powerjob/worker/actors/ProcessorTrackerActor.java
@Handler(path = RemoteConstant.WPT_HANDLER_START_TASK, processType = ProcessType.NO_BLOCKING)
public void onReceiveTaskTrackerStartTaskReq(TaskTrackerStartTaskReq req) {
Long instanceId = req.getInstanceInfo().getInstanceId();
// 創(chuàng)建 ProcessorTracker 一定能成功
ProcessorTracker processorTracker = ProcessorTrackerManager.getProcessorTracker(
instanceId,
req.getTaskTrackerAddress(),
() -> new ProcessorTracker(req, workerRuntime));
TaskDO task = new TaskDO();
task.setTaskId(req.getTaskId());
task.setTaskName(req.getTaskName());
task.setTaskContent(req.getTaskContent());
task.setFailedCnt(req.getTaskCurrentRetryNums());
task.setSubInstanceId(req.getSubInstanceId());
processorTracker.submitTask(task);
}--pre>ProcessorTrackerActor的onReceiveTaskTrackerStartTaskReq主要是獲取或者創(chuàng)建processorTracker,然后執(zhí)行其submitTask提交任務(wù)
ProcessorTracker.submitTask
tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java
public void submitTask(TaskDO newTask) {
// 一旦 ProcessorTracker 出現(xiàn)異常,所有提交到此處的任務(wù)直接返回失敗,防止形成死鎖
// 死鎖分析:TT創(chuàng)建PT,PT創(chuàng)建失敗,無法定期匯報心跳,TT長時間未收到PT心跳,認(rèn)為PT宕機(jī)(確實宕機(jī)了),無法選擇可用的PT再次派發(fā)任務(wù),死鎖形成,GG斯密達(dá) T_T
if (lethal) {
ProcessorReportTaskStatusReq report = new ProcessorReportTaskStatusReq()
.setInstanceId(instanceId)
.setSubInstanceId(newTask.getSubInstanceId())
.setTaskId(newTask.getTaskId())
.setStatus(TaskStatus.WORKER_PROCESS_FAILED.getValue())
.setResult(lethalReason)
.setReportTime(System.currentTimeMillis());
TransportUtils.ptReportTask(report, taskTrackerAddress, workerRuntime);
return;
}
boolean success = false;
// 1. 設(shè)置值并提交執(zhí)行
newTask.setInstanceId(instanceInfo.getInstanceId());
newTask.setAddress(taskTrackerAddress);
HeavyProcessorRunnable heavyProcessorRunnable = new HeavyProcessorRunnable(instanceInfo, taskTrackerAddress, newTask, processorBean, omsLogger, statusReportRetryQueue, workerRuntime);
try {
threadPool.submit(heavyProcessorRunnable);
success = true;
} catch (RejectedExecutionException ignore) {
log.warn("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed due to ThreadPool has too much task waiting to process, this task will dispatch to other ProcessorTracker.",
instanceId, newTask.getTaskId(), newTask.getTaskName());
} catch (Exception e) {
log.error("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed.", instanceId, newTask.getTaskId(), newTask.getTaskName(), e);
}
// 2. 回復(fù)接收成功
if (success) {
ProcessorReportTaskStatusReq reportReq = new ProcessorReportTaskStatusReq();
reportReq.setInstanceId(instanceId);
reportReq.setSubInstanceId(newTask.getSubInstanceId());
reportReq.setTaskId(newTask.getTaskId());
reportReq.setStatus(TaskStatus.WORKER_RECEIVED.getValue());
reportReq.setReportTime(System.currentTimeMillis());
TransportUtils.ptReportTask(reportReq, taskTrackerAddress, workerRuntime);
log.debug("[ProcessorTracker-{}] submit task(taskId={}, taskName={}) success, current queue size: {}.",
instanceId, newTask.getTaskId(), newTask.getTaskName(), threadPool.getQueue().size());
}
}ProcessorTracker的submitTask方法創(chuàng)建HeavyProcessorRunnable,提交到threadPool執(zhí)行,之后回復(fù)ProcessorReportTaskStatusReq,告知status為TaskStatus.WORKER_RECEIVED
PowerJob分布式框架任務(wù)調(diào)度框架

小結(jié)
- MapProcessor接口繼承了BasicProcessor,它提供了默認(rèn)的map方法用于分發(fā)子任務(wù),它主要是構(gòu)造了ProcessorMapTaskRequest,通過TransportUtils.reliableMapTask發(fā)送請求,它先把task保存下來,初始的status為TaskStatus.WAITING_DISPATCH;
- HeavyTaskTracker每5s調(diào)度一次Dispatcher,其run方法先通過ptStatusHolder.getAvailableProcessorTrackers()獲取可以派發(fā)任務(wù)的ProcessorTracker,之后循環(huán)從數(shù)據(jù)庫拉取一批狀態(tài)為TaskStatus.WAITING_DISPATCH的任務(wù),通過輪詢的方式進(jìn)行dispatchTask;
- dispatchTask先更新status為DISPATCH_SUCCESS_WORKER_UNCHECK,更新處理該任務(wù)的ProcessorTracker,之后構(gòu)建TaskTrackerStartTaskReq,通過TransportUtils.ttStartPtTask派送執(zhí)行task的請求;
- ProcessorTracker的submitTask方法創(chuàng)建HeavyProcessorRunnable,提交到threadPool執(zhí)行,之后回復(fù)ProcessorReportTaskStatusReq,告知status為TaskStatus.WORKER_RECEIVED;
- 最后通過HeavyProcessorRunnable調(diào)用對應(yīng)的processor.process方法執(zhí)行具體的任務(wù)
以上就是PowerJob的MapProcessor工作流程源碼解讀的詳細(xì)內(nèi)容,更多關(guān)于PowerJob MapProcessor的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
關(guān)于scanner.nextInt()等next()和scanner.nextIine()連用注意事項
這篇文章主要介紹了關(guān)于scanner.nextInt()等next()和scanner.nextIine()連用注意事項,具有很好的參考價值,希望對大家有所幫助。2023-04-04
基于spring AOP @Around @Before @After的區(qū)別說明
這篇文章主要介紹了基于spring AOP @Around @Before @After的區(qū)別說明,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-02-02
Java Validation Api實現(xiàn)原理解析
這篇文章主要介紹了Java Validation Api實現(xiàn)原理解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-09-09
springcloud 中 zuul 修改請求參數(shù)信息的方法
這篇文章主要介紹了springcloud 中 zuul 修改請求參數(shù)信息的方法,需要的朋友可以參考下2018-02-02
Java設(shè)置JSON字符串參數(shù)編碼的示例詳解
在Java中創(chuàng)建JSON字符串,我們可以使用多個庫,其中最流行的是Jackson、Gson和org.json,,下面給大家分享Java設(shè)置JSON字符串參數(shù)編碼的示例,感興趣的朋友一起看看吧2024-06-06

