欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

PowerJob的OmsLogHandler工作流程源碼解析

 更新時(shí)間:2023年12月25日 08:30:42   作者:codecraft  
這篇文章主要為大家介紹了PowerJob的OmsLogHandler工作流程源碼解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

本文主要研究一下PowerJob的OmsLogHandler

OmsLogHandler

tech/powerjob/worker/background/OmsLogHandler.java

@Slf4j
public class OmsLogHandler {
    private final String workerAddress;
    private final Transporter transporter;
    private final ServerDiscoveryService serverDiscoveryService;
    // 處理線程,需要通過線程池啟動(dòng)
    public final Runnable logSubmitter = new LogSubmitter();
    // 上報(bào)鎖,只需要一個(gè)線程上報(bào)即可
    private final Lock reportLock = new ReentrantLock();
    // 生產(chǎn)者消費(fèi)者模式,異步上傳日志
    private final BlockingQueue<InstanceLogContent> logQueue = Queues.newLinkedBlockingQueue(10240);
    // 每次上報(bào)攜帶的數(shù)據(jù)條數(shù)
    private static final int BATCH_SIZE = 20;
    // 本地囤積閾值
    private static final int REPORT_SIZE = 1024;
    public OmsLogHandler(String workerAddress, Transporter transporter, ServerDiscoveryService serverDiscoveryService) {
        this.workerAddress = workerAddress;
        this.transporter = transporter;
        this.serverDiscoveryService = serverDiscoveryService;
    }
    /**
     * 提交日志
     * @param instanceId 任務(wù)實(shí)例ID
     * @param logContent 日志內(nèi)容
     */
    public void submitLog(long instanceId, LogLevel logLevel, String logContent) {
        if (logQueue.size() > REPORT_SIZE) {
            // 線程的生命周期是個(gè)不可循環(huán)的過程,一個(gè)線程對(duì)象結(jié)束了不能再次start,只能一直創(chuàng)建和銷毀
            new Thread(logSubmitter).start();
        }
        InstanceLogContent tuple = new InstanceLogContent(instanceId, System.currentTimeMillis(), logLevel.getV(), logContent);
        boolean offerRet = logQueue.offer(tuple);
        if (!offerRet) {
            log.warn("[OmsLogHandler] [{}] submit log failed, maybe your log speed is too fast!", instanceId);
        }
    }
    //......
}
OmsLogHandler提供了submitLog方法,它先判斷l(xiāng)ogQueue大小是否超過REPORT_SIZE(1024),超過則通過異步線程執(zhí)行l(wèi)ogSubmitter;接著將內(nèi)容包裝為InstanceLogContent,然后放入到logQueue

LogSubmitter

private class LogSubmitter implements Runnable {
        @Override
        public void run() {
            boolean lockResult = reportLock.tryLock();
            if (!lockResult) {
                return;
            }
            try {
                final String currentServerAddress = serverDiscoveryService.getCurrentServerAddress();
                // 當(dāng)前無可用 Server
                if (StringUtils.isEmpty(currentServerAddress)) {
                    if (!logQueue.isEmpty()) {
                        logQueue.clear();
                        log.warn("[OmsLogHandler] because there is no available server to report logs which leads to queue accumulation, oms discarded all logs.");
                    }
                    return;
                }
                List<InstanceLogContent> logs = Lists.newLinkedList();
                while (!logQueue.isEmpty()) {
                    try {
                        InstanceLogContent logContent = logQueue.poll(100, TimeUnit.MILLISECONDS);
                        logs.add(logContent);
                        if (logs.size() >= BATCH_SIZE) {
                            WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, Lists.newLinkedList(logs));
                            // 不可靠請(qǐng)求,WEB日志不追求極致
                            TransportUtils.reportLogs(req, currentServerAddress, transporter);
                            logs.clear();
                        }
                    }catch (Exception ignore) {
                        break;
                    }
                }
                if (!logs.isEmpty()) {
                    WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, logs);
                    TransportUtils.reportLogs(req, currentServerAddress, transporter);
                }
            }finally {
                reportLock.unlock();
            }
        }
    }
LogSubmitter實(shí)現(xiàn)了Runnable接口,其run方法先通過reportLock加鎖,成功才繼續(xù),它通過serverDiscoveryService.getCurrentServerAddress()獲取當(dāng)前server的地址,若獲取不到則清空logQueue;否則while循環(huán),每次從logQueue拉取InstanceLogContent,放到linkedList,超過BATCH_SIZE(20)則創(chuàng)建WorkerLogReportReq,通過TransportUtils.reportLogs(req, currentServerAddress, transporter)上報(bào),然后清空linkedList,跳出循環(huán)之后再上報(bào)剩下的日志,最后釋放鎖

reportLogs

tech/powerjob/worker/common/utils/TransportUtils.java

public static void reportLogs(WorkerLogReportReq req, String address, Transporter transporter) {
        final URL url = easyBuildUrl(ServerType.SERVER, S4W_PATH, S4W_HANDLER_REPORT_LOG, address);
        transporter.tell(url, req);
    }
    public static URL easyBuildUrl(ServerType serverType, String rootPath, String handlerPath, String address) {
        HandlerLocation handlerLocation = new HandlerLocation()
                .setRootPath(rootPath)
                .setMethodPath(handlerPath);
        return new URL()
                .setServerType(serverType)
                .setAddress(Address.fromIpv4(address))
                .setLocation(handlerLocation);
    }
reportLogs先通過easyBuildUrl構(gòu)建URL,再通過transporter.tell(url, req)發(fā)送請(qǐng)求,rootPath為server,handlerPath為reportLog

tell

AkkaTransporter

tech/powerjob/remote/akka/AkkaTransporter.java

public void tell(URL url, PowerSerializable request) {
        ActorSelection actorSelection = fetchActorSelection(url);
        actorSelection.tell(request, null);
    }
AkkaTransporter直接使用actorSelection發(fā)送請(qǐng)求

VertxTransporter

tech/powerjob/remote/http/vertx/VertxTransporter.java

public void tell(URL url, PowerSerializable request) {
        post(url, request, null);
    }

    private &lt;T&gt; CompletionStage&lt;T&gt; post(URL url, PowerSerializable request, Class&lt;T&gt; clz) {
        final String host = url.getAddress().getHost();
        final int port = url.getAddress().getPort();
        final String path = url.getLocation().toPath();
        RequestOptions requestOptions = new RequestOptions()
                .setMethod(HttpMethod.POST)
                .setHost(host)
                .setPort(port)
                .setURI(path);
        // 獲取遠(yuǎn)程服務(wù)器的HTTP連接
        Future&lt;HttpClientRequest&gt; httpClientRequestFuture = httpClient.request(requestOptions);
        // 轉(zhuǎn)換 -&gt; 發(fā)送請(qǐng)求獲取響應(yīng)
        Future&lt;HttpClientResponse&gt; responseFuture = httpClientRequestFuture.compose(httpClientRequest -&gt;
            httpClientRequest
                .putHeader(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
                .send(JsonObject.mapFrom(request).toBuffer())
        );
        return responseFuture.compose(httpClientResponse -&gt; {
            // throw exception
            final int statusCode = httpClientResponse.statusCode();
            if (statusCode != HttpResponseStatus.OK.code()) {
                // CompletableFuture.get() 時(shí)會(huì)傳遞拋出該異常
                throw new RemotingException(String.format("request [host:%s,port:%s,url:%s] failed, status: %d, msg: %s",
                       host, port, path, statusCode, httpClientResponse.statusMessage()
                        ));
            }

            return httpClientResponse.body().compose(x -&gt; {

                if (clz == null) {
                    return Future.succeededFuture(null);
                }

                if (clz.equals(String.class)) {
                    return Future.succeededFuture((T) x.toString());
                }

                return Future.succeededFuture(x.toJsonObject().mapTo(clz));
            });
        }).toCompletionStage();
    }
VertxTransporter則使用post方法通過vertx的httpClient發(fā)送請(qǐng)求

processWorkerLogReport

tech/powerjob/server/core/handler/AbWorkerRequestHandler.java

@Handler(path = S4W_HANDLER_REPORT_LOG, processType = ProcessType.NO_BLOCKING)
    public void processWorkerLogReport(WorkerLogReportReq req) {
        WorkerLogReportEvent event = new WorkerLogReportEvent()
                .setWorkerAddress(req.getWorkerAddress())
                .setLogNum(req.getInstanceLogContents().size());
        try {
            processWorkerLogReport0(req, event);
            event.setStatus(WorkerLogReportEvent.Status.SUCCESS);
        } catch (RejectedExecutionException re) {
            event.setStatus(WorkerLogReportEvent.Status.REJECTED);
        } catch (Throwable t) {
            event.setStatus(WorkerLogReportEvent.Status.EXCEPTION);
            log.warn("[WorkerRequestHandler] process worker report failed!", t);
        } finally {
            monitorService.monitor(event);
        }
    }
processWorkerLogReport通過processWorkerLogReport0進(jìn)行處理,最后通過monitorService.monitor(event)上報(bào)監(jiān)控

processWorkerLogReport0

tech/powerjob/server/core/handler/WorkerRequestHandlerImpl.java

@Override
    protected void processWorkerLogReport0(WorkerLogReportReq req, WorkerLogReportEvent event) {
        // 這個(gè)效率應(yīng)該不會(huì)拉垮吧...也就是一些判斷 + Map#get 吧...
        instanceLogService.submitLogs(req.getWorkerAddress(), req.getInstanceLogContents());
    }
processWorkerLogReport0通過instanceLogService.submitLogs進(jìn)行上報(bào)

submitLogs

tech/powerjob/server/core/instance/InstanceLogService.java

/**
     * 提交日志記錄,持久化到本地?cái)?shù)據(jù)庫中
     * @param workerAddress 上報(bào)機(jī)器地址
     * @param logs 任務(wù)實(shí)例運(yùn)行時(shí)日志
     */
    @Async(value = PJThreadPool.LOCAL_DB_POOL)
    public void submitLogs(String workerAddress, List<InstanceLogContent> logs) {

        List<LocalInstanceLogDO> logList = logs.stream().map(x -> {
            instanceId2LastReportTime.put(x.getInstanceId(), System.currentTimeMillis());

            LocalInstanceLogDO y = new LocalInstanceLogDO();
            BeanUtils.copyProperties(x, y);
            y.setWorkerAddress(workerAddress);
            return y;
        }).collect(Collectors.toList());

        try {
            CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.saveAll(logList));
        }catch (Exception e) {
            log.warn("[InstanceLogService] persistent instance logs failed, these logs will be dropped: {}.", logs, e);
        }
    }
InstanceLogService通過PJThreadPool.LOCAL_DB_POOL線程池進(jìn)行異步,它通過localInstanceLogRepository.saveAll(logList)保存到本地?cái)?shù)據(jù)庫

monitor

tech/powerjob/server/monitor/PowerJobMonitorService.java

public void monitor(Event event) {
        monitors.forEach(m -> m.record(event));
    }
monitor方法遍歷monitors,挨個(gè)執(zhí)行record

LogMonitor

tech/powerjob/server/monitor/monitors/LogMonitor.java

public void record(Event event) {
        MDC.put(MDC_KEY_SERVER_ID, String.valueOf(serverInfo.getId()));
        LoggerFactory.getLogger(event.type()).info(event.message());
    }
LogMonitor的record方法通過日志打印event信息

小結(jié)

PowerJob的OmsLogHandler提供了submitLog方法,它先判斷l(xiāng)ogQueue大小是否超過REPORT_SIZE(1024),超過則通過異步線程執(zhí)行l(wèi)ogSubmitter;接著將內(nèi)容包裝為InstanceLogContent,然后放入到logQueue;logSubmitter主要是執(zhí)行reportLogs,它先通過easyBuildUrl構(gòu)建URL,再通過transporter.tell(url, req)發(fā)送請(qǐng)求,rootPath為server,handlerPath為reportLog;服務(wù)端的processWorkerLogReport通過processWorkerLogReport0進(jìn)行處理(通過localInstanceLogRepository.saveAll(logList)保存到本地?cái)?shù)據(jù)庫),最后通過monitorService.monitor(event)上報(bào)監(jiān)控。

以上就是PowerJob的OmsLogHandler的詳細(xì)內(nèi)容,更多關(guān)于PowerJob的OmsLogHandler的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評(píng)論