欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

PowerJob的MapProcessor工作流程源碼解讀

 更新時間:2024年01月31日 08:37:49   作者:codecraft  
這篇文章主要為大家介紹了PowerJob的MapProcessor工作流程源碼解讀,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

本文主要研究一下PowerJob的MapProcessor

powerjob 任務與工作流配置

MapProcessor

tech/powerjob/worker/core/processor/sdk/MapProcessor.java

public interface MapProcessor extends BasicProcessor {

    Logger log = LoggerFactory.getLogger(MapProcessor.class);

    int RECOMMEND_BATCH_SIZE = 200;

    /**
     * 分發(fā)子任務
     * @param taskList 子任務,再次執(zhí)行時可通過 TaskContext#getSubTask 獲取
     * @param taskName 子任務名稱,即子任務處理器中 TaskContext#getTaskName 獲取到的值
     * @throws PowerJobCheckedException map 失敗將拋出異常
     */
    default void map(List<?> taskList, String taskName) throws PowerJobCheckedException {

        if (CollectionUtils.isEmpty(taskList)) {
            return;
        }

        TaskDO task = ThreadLocalStore.getTask();
        WorkerRuntime workerRuntime = ThreadLocalStore.getRuntimeMeta();

        if (taskList.size() > RECOMMEND_BATCH_SIZE) {
            log.warn("[Map-{}] map task size is too large, network maybe overload... please try to split the tasks.", task.getInstanceId());
        }

        // 修復 map 任務命名和根任務名或者最終任務名稱一致導致的問題(無限生成子任務或者直接失敗)
        if (TaskConstant.ROOT_TASK_NAME.equals(taskName) || TaskConstant.LAST_TASK_NAME.equals(taskName)) {
            log.warn("[Map-{}] illegal map task name : {}! please do not use 'OMS_ROOT_TASK' or 'OMS_LAST_TASK' as map task name. as a precaution, it will be renamed 'X-{}' automatically." ,task.getInstanceId() ,taskName , taskName);
            taskName ="X-"+taskName;
        }

        // 1. 構造請求
        ProcessorMapTaskRequest req = new ProcessorMapTaskRequest(task, taskList, taskName);

        // 2. 可靠發(fā)送請求(任務不允許丟失,需要使用 ask 方法,失敗拋異常)
        boolean requestSucceed = TransportUtils.reliableMapTask(req, task.getAddress(), workerRuntime);

        if (requestSucceed) {
            log.info("[Map-{}] map task[name={},num={}] successfully!", task.getInstanceId(), taskName, taskList.size());
        }else {
            throw new PowerJobCheckedException("map failed for task: " + taskName);
        }
    }

    /**
     * 是否為根任務
     * @return true -> 根任務 / false -> 非根任務
     */
    default boolean isRootTask() {
        TaskDO task = ThreadLocalStore.getTask();
        return TaskConstant.ROOT_TASK_NAME.equals(task.getTaskName());
    }
}
MapProcessor接口繼承了BasicProcessor,它提供了默認的map方法用于分發(fā)子任務,它主要是構造了ProcessorMapTaskRequest,通過TransportUtils.reliableMapTask發(fā)送請求;它還提供了isRootTask方法用于判斷當前任務是不是根任務

ProcessorMapTaskRequest

tech/powerjob/worker/pojo/request/ProcessorMapTaskRequest.java

@Getter
@NoArgsConstructor
public class ProcessorMapTaskRequest implements PowerSerializable {
    private Long instanceId;
    private Long subInstanceId;
    private String taskName;
    private List<SubTask> subTasks;
    @Getter
    @NoArgsConstructor
    @AllArgsConstructor
    public static class SubTask {
        private String taskId;
        private byte[] taskContent;
    }
    public ProcessorMapTaskRequest(TaskDO parentTask, List<?> subTaskList, String taskName) {
        this.instanceId = parentTask.getInstanceId();
        this.subInstanceId = parentTask.getSubInstanceId();
        this.taskName = taskName;
        this.subTasks = Lists.newLinkedList();
        subTaskList.forEach(subTask -> {
            // 同一個 Task 內部可能多次 Map,因此還是要確保線程級別的唯一
            String subTaskId = parentTask.getTaskId() + "." + ThreadLocalStore.getTaskIDAddr().getAndIncrement();
            // 寫入類名,方便反序列化
            subTasks.add(new SubTask(subTaskId, SerializerUtils.serialize(subTask)));
        });
    }
}

 ProcessorMapTaskRequest的構造器將subTaskList轉換為一系列的SubTask,它使用SerializerUtils.serialize序列化(Kryo)了用戶定義的subTask

reliableMapTask

tech/powerjob/worker/common/utils/TransportUtils.java

public static boolean reliableMapTask(ProcessorMapTaskRequest req, String address, WorkerRuntime workerRuntime) throws PowerJobCheckedException {
        try {
            return reliableAsk(ServerType.WORKER, WTT_PATH, WTT_HANDLER_MAP_TASK, address, req, workerRuntime.getTransporter()).isSuccess();
        } catch (Throwable throwable) {
            throw new PowerJobCheckedException(throwable);
        }
    }
    private static AskResponse reliableAsk(ServerType t, String rootPath, String handlerPath, String address, PowerSerializable req, Transporter transporter) throws Exception {
        final URL url = easyBuildUrl(t, rootPath, handlerPath, address);
        final CompletionStage<AskResponse> completionStage = transporter.ask(url, req, AskResponse.class);
        return completionStage
                .toCompletableFuture()
                .get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
    }
reliableMapTask方法通過reliableAsk往taskTracker/mapTask接口發(fā)送請求,默認是5s超時

onReceiveProcessorMapTaskRequest

tech/powerjob/worker/actors/TaskTrackerActor.java

/**
     * 子任務 map 處理器
     */
    @Handler(path = WTT_HANDLER_MAP_TASK)
    public AskResponse onReceiveProcessorMapTaskRequest(ProcessorMapTaskRequest req) {
        HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.getTaskTracker(req.getInstanceId());
        if (taskTracker == null) {
            log.warn("[TaskTrackerActor] receive ProcessorMapTaskRequest({}) but system can't find TaskTracker.", req);
            return null;
        }
        boolean success = false;
        List<TaskDO> subTaskList = Lists.newLinkedList();
        try {
            req.getSubTasks().forEach(originSubTask -> {
                TaskDO subTask = new TaskDO();
                subTask.setTaskName(req.getTaskName());
                subTask.setSubInstanceId(req.getSubInstanceId());
                subTask.setTaskId(originSubTask.getTaskId());
                subTask.setTaskContent(originSubTask.getTaskContent());
                subTaskList.add(subTask);
            });
            success = taskTracker.submitTask(subTaskList);
        }catch (Exception e) {
            log.warn("[TaskTrackerActor] process map task(instanceId={}) failed.", req.getInstanceId(), e);
        }
        AskResponse response = new AskResponse();
        response.setSuccess(success);
        return response;
    }
TaskTrackerActor提供了onReceiveProcessorMapTaskRequest方法處理ProcessorMapTaskRequest,它將入參的subTasks轉換為一系列的TaskDO,然后通過taskTracker.submitTask提交

submitTask

tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java

/**
     * 提交Task任務(MapReduce的Map,Broadcast的廣播),上層保證 batchSize,同時插入過多數據可能導致失敗
     *
     * @param newTaskList 新增的子任務列表
     */
    public boolean submitTask(List<TaskDO> newTaskList) {
        if (finished.get()) {
            return true;
        }
        if (CollectionUtils.isEmpty(newTaskList)) {
            return true;
        }
        // 基礎處理(多循環(huán)一次雖然有些浪費,但分布式執(zhí)行中,這點耗時絕不是主要占比,忽略不計?。?
        newTaskList.forEach(task -> {
            task.setInstanceId(instanceId);
            task.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
            task.setFailedCnt(0);
            task.setLastModifiedTime(System.currentTimeMillis());
            task.setCreatedTime(System.currentTimeMillis());
            task.setLastReportTime(-1L);
        });
        log.debug("[TaskTracker-{}] receive new tasks: {}", instanceId, newTaskList);
        return taskPersistenceService.batchSave(newTaskList);
    }
submitTask方法先填充一些字段信息,比如設置status為TaskStatus.WAITING_DISPATCH,然后調用taskPersistenceService.batchSave保存

batchSave

tech/powerjob/worker/persistence/TaskPersistenceService.java

public boolean batchSave(List<TaskDO> tasks) {
        if (CollectionUtils.isEmpty(tasks)) {
            return true;
        }
        try {
            return execute(() -> taskDAO.batchSave(tasks));
        }catch (Exception e) {
            log.error("[TaskPersistenceService] batchSave tasks({}) failed.", tasks, e);
        }
        return false;
    }
    private static  <T> T execute(SupplierPlus<T> executor) throws Exception {
        return CommonUtils.executeWithRetry(executor, RETRY_TIMES, RETRY_INTERVAL_MS);
    }
batchSave通過taskDAO.batchSave報錯,它針對異常會重試3次,每次間隔100ms

TaskDAOImpl.batchSave

tech/powerjob/worker/persistence/TaskDAOImpl.java

public boolean batchSave(Collection<TaskDO> tasks) throws SQLException {
        String insertSql = "insert into task_info(task_id, instance_id, sub_instance_id, task_name, task_content, address, status, result, failed_cnt, created_time, last_modified_time, last_report_time) values (?,?,?,?,?,?,?,?,?,?,?,?)";
        boolean originAutoCommitFlag ;
        try (Connection conn = connectionFactory.getConnection()) {
            originAutoCommitFlag = conn.getAutoCommit();
            conn.setAutoCommit(false);
            try ( PreparedStatement ps = conn.prepareStatement(insertSql)) {
                for (TaskDO task : tasks) {
                    fillInsertPreparedStatement(task, ps);
                    ps.addBatch();
                }
                ps.executeBatch();
                return true;
            } catch (Throwable e) {
                conn.rollback();
                throw e;
            } finally {
                conn.setAutoCommit(originAutoCommitFlag);
            }
        }
    }
TaskDAOImpl的batchSave直接通過jdbc的executeBatch進行批量保存

Dispatcher

tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java

/**
     * 定時掃描數據庫中的task(出于內存占用量考慮,每次最多獲取100個),并將需要執(zhí)行的任務派發(fā)出去
     */
    protected class Dispatcher implements Runnable {
        // 數據庫查詢限制,每次最多查詢幾個任務
        private static final int DB_QUERY_LIMIT = 100;
        @Override
        public void run() {
            if (finished.get()) {
                return;
            }
            Stopwatch stopwatch = Stopwatch.createStarted();
            // 1. 獲取可以派發(fā)任務的 ProcessorTracker
            List<String> availablePtIps = ptStatusHolder.getAvailableProcessorTrackers();
            // 2. 沒有可用 ProcessorTracker,本次不派發(fā)
            if (availablePtIps.isEmpty()) {
                log.debug("[TaskTracker-{}] no available ProcessorTracker now.", instanceId);
                return;
            }
            // 3. 避免大查詢,分批派發(fā)任務
            long currentDispatchNum = 0;
            long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency() * 2L;
            AtomicInteger index = new AtomicInteger(0);
            // 4. 循環(huán)查詢數據庫,獲取需要派發(fā)的任務
            while (maxDispatchNum > currentDispatchNum) {
                int dbQueryLimit = Math.min(DB_QUERY_LIMIT, (int) maxDispatchNum);
                List<TaskDO> needDispatchTasks = taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.WAITING_DISPATCH, dbQueryLimit);
                currentDispatchNum += needDispatchTasks.size();
                needDispatchTasks.forEach(task -> {
                    // 獲取 ProcessorTracker 地址,如果 Task 中自帶了 Address,則使用該 Address
                    String ptAddress = task.getAddress();
                    if (StringUtils.isEmpty(ptAddress) || RemoteConstant.EMPTY_ADDRESS.equals(ptAddress)) {
                        ptAddress = availablePtIps.get(index.getAndIncrement() % availablePtIps.size());
                    }
                    dispatchTask(task, ptAddress);
                });
                // 數量不足 或 查詢失敗,則終止循環(huán)
                if (needDispatchTasks.size() < dbQueryLimit) {
                    break;
                }
            }
            log.debug("[TaskTracker-{}] dispatched {} tasks,using time {}.", instanceId, currentDispatchNum, stopwatch.stop());
        }
    }
HeavyTaskTracker每5s調度一次Dispatcher,其run方法先通過ptStatusHolder.getAvailableProcessorTrackers()獲取可以派發(fā)任務的ProcessorTracker,之后循環(huán)從數據庫拉取一批狀態(tài)為TaskStatus.WAITING_DISPATCH的任務,通過輪詢的方式進行dispatchTask

dispatchTask

tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java

protected void dispatchTask(TaskDO task, String processorTrackerAddress) {
        // 1. 持久化,更新數據庫(如果更新數據庫失敗,可能導致重復執(zhí)行,先不處理)
        TaskDO updateEntity = new TaskDO();
        updateEntity.setStatus(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK.getValue());
        // 寫入處理該任務的 ProcessorTracker
        updateEntity.setAddress(processorTrackerAddress);
        boolean success = taskPersistenceService.updateTask(instanceId, task.getTaskId(), updateEntity);
        if (!success) {
            log.warn("[TaskTracker-{}] dispatch task(taskId={},taskName={}) failed due to update task status failed.", instanceId, task.getTaskId(), task.getTaskName());
            return;
        }
        // 2. 更新 ProcessorTrackerStatus 狀態(tài)
        ptStatusHolder.getProcessorTrackerStatus(processorTrackerAddress).setDispatched(true);
        // 3. 初始化緩存
        taskId2BriefInfo.put(task.getTaskId(), new TaskBriefInfo(task.getTaskId(), TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, -1L));
        // 4. 任務派發(fā)
        TaskTrackerStartTaskReq startTaskReq = new TaskTrackerStartTaskReq(instanceInfo, task, workerRuntime.getWorkerAddress());
        TransportUtils.ttStartPtTask(startTaskReq, processorTrackerAddress, workerRuntime.getTransporter());
        log.debug("[TaskTracker-{}] dispatch task(taskId={},taskName={}) successfully.", instanceId, task.getTaskId(), task.getTaskName());
    }
dispatchTask先更新status為DISPATCH_SUCCESS_WORKER_UNCHECK,更新處理該任務的ProcessorTracker,之后構建TaskTrackerStartTaskReq,通過TransportUtils.ttStartPtTask派送執(zhí)行task的請求

onReceiveTaskTrackerStartTaskReq

tech/powerjob/worker/actors/ProcessorTrackerActor.java

@Handler(path = RemoteConstant.WPT_HANDLER_START_TASK, processType = ProcessType.NO_BLOCKING)
    public void onReceiveTaskTrackerStartTaskReq(TaskTrackerStartTaskReq req) {
        Long instanceId = req.getInstanceInfo().getInstanceId();
        // 創(chuàng)建 ProcessorTracker 一定能成功
        ProcessorTracker processorTracker = ProcessorTrackerManager.getProcessorTracker(
                instanceId,
                req.getTaskTrackerAddress(),
                () -> new ProcessorTracker(req, workerRuntime));
        TaskDO task = new TaskDO();
        task.setTaskId(req.getTaskId());
        task.setTaskName(req.getTaskName());
        task.setTaskContent(req.getTaskContent());
        task.setFailedCnt(req.getTaskCurrentRetryNums());
        task.setSubInstanceId(req.getSubInstanceId());
        processorTracker.submitTask(task);
    }--pre>
ProcessorTrackerActor的onReceiveTaskTrackerStartTaskReq主要是獲取或者創(chuàng)建processorTracker,然后執(zhí)行其submitTask提交任務

ProcessorTracker.submitTask

tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java

public void submitTask(TaskDO newTask) {

        // 一旦 ProcessorTracker 出現異常,所有提交到此處的任務直接返回失敗,防止形成死鎖
        // 死鎖分析:TT創(chuàng)建PT,PT創(chuàng)建失敗,無法定期匯報心跳,TT長時間未收到PT心跳,認為PT宕機(確實宕機了),無法選擇可用的PT再次派發(fā)任務,死鎖形成,GG斯密達 T_T
        if (lethal) {
            ProcessorReportTaskStatusReq report = new ProcessorReportTaskStatusReq()
                    .setInstanceId(instanceId)
                    .setSubInstanceId(newTask.getSubInstanceId())
                    .setTaskId(newTask.getTaskId())
                    .setStatus(TaskStatus.WORKER_PROCESS_FAILED.getValue())
                    .setResult(lethalReason)
                    .setReportTime(System.currentTimeMillis());

            TransportUtils.ptReportTask(report, taskTrackerAddress, workerRuntime);
            return;
        }

        boolean success = false;
        // 1. 設置值并提交執(zhí)行
        newTask.setInstanceId(instanceInfo.getInstanceId());
        newTask.setAddress(taskTrackerAddress);

        HeavyProcessorRunnable heavyProcessorRunnable = new HeavyProcessorRunnable(instanceInfo, taskTrackerAddress, newTask, processorBean, omsLogger, statusReportRetryQueue, workerRuntime);
        try {
            threadPool.submit(heavyProcessorRunnable);
            success = true;
        } catch (RejectedExecutionException ignore) {
            log.warn("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed due to ThreadPool has too much task waiting to process, this task will dispatch to other ProcessorTracker.",
                    instanceId, newTask.getTaskId(), newTask.getTaskName());
        } catch (Exception e) {
            log.error("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed.", instanceId, newTask.getTaskId(), newTask.getTaskName(), e);
        }

        // 2. 回復接收成功
        if (success) {
            ProcessorReportTaskStatusReq reportReq = new ProcessorReportTaskStatusReq();
            reportReq.setInstanceId(instanceId);
            reportReq.setSubInstanceId(newTask.getSubInstanceId());
            reportReq.setTaskId(newTask.getTaskId());
            reportReq.setStatus(TaskStatus.WORKER_RECEIVED.getValue());
            reportReq.setReportTime(System.currentTimeMillis());

            TransportUtils.ptReportTask(reportReq, taskTrackerAddress, workerRuntime);

            log.debug("[ProcessorTracker-{}] submit task(taskId={}, taskName={}) success, current queue size: {}.",
                    instanceId, newTask.getTaskId(), newTask.getTaskName(), threadPool.getQueue().size());
        }
    }
ProcessorTracker的submitTask方法創(chuàng)建HeavyProcessorRunnable,提交到threadPool執(zhí)行,之后回復ProcessorReportTaskStatusReq,告知status為TaskStatus.WORKER_RECEIVED

PowerJob分布式框架任務調度框架

小結

  • MapProcessor接口繼承了BasicProcessor,它提供了默認的map方法用于分發(fā)子任務,它主要是構造了ProcessorMapTaskRequest,通過TransportUtils.reliableMapTask發(fā)送請求,它先把task保存下來,初始的status為TaskStatus.WAITING_DISPATCH;
  • HeavyTaskTracker每5s調度一次Dispatcher,其run方法先通過ptStatusHolder.getAvailableProcessorTrackers()獲取可以派發(fā)任務的ProcessorTracker,之后循環(huán)從數據庫拉取一批狀態(tài)為TaskStatus.WAITING_DISPATCH的任務,通過輪詢的方式進行dispatchTask;
  • dispatchTask先更新status為DISPATCH_SUCCESS_WORKER_UNCHECK,更新處理該任務的ProcessorTracker,之后構建TaskTrackerStartTaskReq,通過TransportUtils.ttStartPtTask派送執(zhí)行task的請求;
  • ProcessorTracker的submitTask方法創(chuàng)建HeavyProcessorRunnable,提交到threadPool執(zhí)行,之后回復ProcessorReportTaskStatusReq,告知status為TaskStatus.WORKER_RECEIVED;
  • 最后通過HeavyProcessorRunnable調用對應的processor.process方法執(zhí)行具體的任務

以上就是PowerJob的MapProcessor工作流程源碼解讀的詳細內容,更多關于PowerJob MapProcessor的資料請關注腳本之家其它相關文章!

相關文章

  • 關于scanner.nextInt()等next()和scanner.nextIine()連用注意事項

    關于scanner.nextInt()等next()和scanner.nextIine()連用注意事項

    這篇文章主要介紹了關于scanner.nextInt()等next()和scanner.nextIine()連用注意事項,具有很好的參考價值,希望對大家有所幫助。
    2023-04-04
  • Java之JNDI注入的實現

    Java之JNDI注入的實現

    JNDI是Java EE的重要部分,本文主要介紹了Java之JNDI注入的實現,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-11-11
  • 基于spring AOP @Around @Before @After的區(qū)別說明

    基于spring AOP @Around @Before @After的區(qū)別說明

    這篇文章主要介紹了基于spring AOP @Around @Before @After的區(qū)別說明,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-02-02
  • Java Validation Api實現原理解析

    Java Validation Api實現原理解析

    這篇文章主要介紹了Java Validation Api實現原理解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-09-09
  • Java中常用的設計模式之觀察者模式詳解

    Java中常用的設計模式之觀察者模式詳解

    這篇文章主要為大家詳細介紹了Java中常用的設計模式之觀察者模式,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助
    2022-02-02
  • java堆排序概念原理介紹

    java堆排序概念原理介紹

    在本篇文章里我們給大家分享了關于java堆排序的概念原理相關知識點內容,有需要的朋友們可以學習下。
    2018-10-10
  • 一文秒懂java到底是值傳遞還是引用傳遞

    一文秒懂java到底是值傳遞還是引用傳遞

    這篇文章主要介紹了java到底是值傳遞還是引用傳遞的相關知識,本文通過幾個例子給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-06-06
  • springcloud 中 zuul 修改請求參數信息的方法

    springcloud 中 zuul 修改請求參數信息的方法

    這篇文章主要介紹了springcloud 中 zuul 修改請求參數信息的方法,需要的朋友可以參考下
    2018-02-02
  • mybatis實現mapper代理模式的方式

    mybatis實現mapper代理模式的方式

    本文向大家講解mybatis的mapper代理模式,以根據ide值查詢單條數據為例編寫xml文件,通過mapper代理的方式進行講解增刪改查,分步驟給大家講解的很詳細,對mybatis mapper代理模式相關知識感興趣的朋友一起看看吧
    2021-06-06
  • Java設置JSON字符串參數編碼的示例詳解

    Java設置JSON字符串參數編碼的示例詳解

    在Java中創(chuàng)建JSON字符串,我們可以使用多個庫,其中最流行的是Jackson、Gson和org.json,,下面給大家分享Java設置JSON字符串參數編碼的示例,感興趣的朋友一起看看吧
    2024-06-06

最新評論