欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

PowerJob的OmsLogHandler工作流程源碼解析

 更新時間:2023年12月25日 08:30:42   作者:codecraft  
這篇文章主要為大家介紹了PowerJob的OmsLogHandler工作流程源碼解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

本文主要研究一下PowerJob的OmsLogHandler

OmsLogHandler

tech/powerjob/worker/background/OmsLogHandler.java

@Slf4j
public class OmsLogHandler {
    private final String workerAddress;
    private final Transporter transporter;
    private final ServerDiscoveryService serverDiscoveryService;
    // 處理線程,需要通過線程池啟動
    public final Runnable logSubmitter = new LogSubmitter();
    // 上報鎖,只需要一個線程上報即可
    private final Lock reportLock = new ReentrantLock();
    // 生產(chǎn)者消費者模式,異步上傳日志
    private final BlockingQueue<InstanceLogContent> logQueue = Queues.newLinkedBlockingQueue(10240);
    // 每次上報攜帶的數(shù)據(jù)條數(shù)
    private static final int BATCH_SIZE = 20;
    // 本地囤積閾值
    private static final int REPORT_SIZE = 1024;
    public OmsLogHandler(String workerAddress, Transporter transporter, ServerDiscoveryService serverDiscoveryService) {
        this.workerAddress = workerAddress;
        this.transporter = transporter;
        this.serverDiscoveryService = serverDiscoveryService;
    }
    /**
     * 提交日志
     * @param instanceId 任務(wù)實例ID
     * @param logContent 日志內(nèi)容
     */
    public void submitLog(long instanceId, LogLevel logLevel, String logContent) {
        if (logQueue.size() > REPORT_SIZE) {
            // 線程的生命周期是個不可循環(huán)的過程,一個線程對象結(jié)束了不能再次start,只能一直創(chuàng)建和銷毀
            new Thread(logSubmitter).start();
        }
        InstanceLogContent tuple = new InstanceLogContent(instanceId, System.currentTimeMillis(), logLevel.getV(), logContent);
        boolean offerRet = logQueue.offer(tuple);
        if (!offerRet) {
            log.warn("[OmsLogHandler] [{}] submit log failed, maybe your log speed is too fast!", instanceId);
        }
    }
    //......
}
OmsLogHandler提供了submitLog方法,它先判斷l(xiāng)ogQueue大小是否超過REPORT_SIZE(1024),超過則通過異步線程執(zhí)行l(wèi)ogSubmitter;接著將內(nèi)容包裝為InstanceLogContent,然后放入到logQueue

LogSubmitter

private class LogSubmitter implements Runnable {
        @Override
        public void run() {
            boolean lockResult = reportLock.tryLock();
            if (!lockResult) {
                return;
            }
            try {
                final String currentServerAddress = serverDiscoveryService.getCurrentServerAddress();
                // 當(dāng)前無可用 Server
                if (StringUtils.isEmpty(currentServerAddress)) {
                    if (!logQueue.isEmpty()) {
                        logQueue.clear();
                        log.warn("[OmsLogHandler] because there is no available server to report logs which leads to queue accumulation, oms discarded all logs.");
                    }
                    return;
                }
                List<InstanceLogContent> logs = Lists.newLinkedList();
                while (!logQueue.isEmpty()) {
                    try {
                        InstanceLogContent logContent = logQueue.poll(100, TimeUnit.MILLISECONDS);
                        logs.add(logContent);
                        if (logs.size() >= BATCH_SIZE) {
                            WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, Lists.newLinkedList(logs));
                            // 不可靠請求,WEB日志不追求極致
                            TransportUtils.reportLogs(req, currentServerAddress, transporter);
                            logs.clear();
                        }
                    }catch (Exception ignore) {
                        break;
                    }
                }
                if (!logs.isEmpty()) {
                    WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, logs);
                    TransportUtils.reportLogs(req, currentServerAddress, transporter);
                }
            }finally {
                reportLock.unlock();
            }
        }
    }
LogSubmitter實現(xiàn)了Runnable接口,其run方法先通過reportLock加鎖,成功才繼續(xù),它通過serverDiscoveryService.getCurrentServerAddress()獲取當(dāng)前server的地址,若獲取不到則清空logQueue;否則while循環(huán),每次從logQueue拉取InstanceLogContent,放到linkedList,超過BATCH_SIZE(20)則創(chuàng)建WorkerLogReportReq,通過TransportUtils.reportLogs(req, currentServerAddress, transporter)上報,然后清空linkedList,跳出循環(huán)之后再上報剩下的日志,最后釋放鎖

reportLogs

tech/powerjob/worker/common/utils/TransportUtils.java

public static void reportLogs(WorkerLogReportReq req, String address, Transporter transporter) {
        final URL url = easyBuildUrl(ServerType.SERVER, S4W_PATH, S4W_HANDLER_REPORT_LOG, address);
        transporter.tell(url, req);
    }
    public static URL easyBuildUrl(ServerType serverType, String rootPath, String handlerPath, String address) {
        HandlerLocation handlerLocation = new HandlerLocation()
                .setRootPath(rootPath)
                .setMethodPath(handlerPath);
        return new URL()
                .setServerType(serverType)
                .setAddress(Address.fromIpv4(address))
                .setLocation(handlerLocation);
    }
reportLogs先通過easyBuildUrl構(gòu)建URL,再通過transporter.tell(url, req)發(fā)送請求,rootPath為server,handlerPath為reportLog

tell

AkkaTransporter

tech/powerjob/remote/akka/AkkaTransporter.java

public void tell(URL url, PowerSerializable request) {
        ActorSelection actorSelection = fetchActorSelection(url);
        actorSelection.tell(request, null);
    }
AkkaTransporter直接使用actorSelection發(fā)送請求

VertxTransporter

tech/powerjob/remote/http/vertx/VertxTransporter.java

public void tell(URL url, PowerSerializable request) {
        post(url, request, null);
    }

    private &lt;T&gt; CompletionStage&lt;T&gt; post(URL url, PowerSerializable request, Class&lt;T&gt; clz) {
        final String host = url.getAddress().getHost();
        final int port = url.getAddress().getPort();
        final String path = url.getLocation().toPath();
        RequestOptions requestOptions = new RequestOptions()
                .setMethod(HttpMethod.POST)
                .setHost(host)
                .setPort(port)
                .setURI(path);
        // 獲取遠(yuǎn)程服務(wù)器的HTTP連接
        Future&lt;HttpClientRequest&gt; httpClientRequestFuture = httpClient.request(requestOptions);
        // 轉(zhuǎn)換 -&gt; 發(fā)送請求獲取響應(yīng)
        Future&lt;HttpClientResponse&gt; responseFuture = httpClientRequestFuture.compose(httpClientRequest -&gt;
            httpClientRequest
                .putHeader(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
                .send(JsonObject.mapFrom(request).toBuffer())
        );
        return responseFuture.compose(httpClientResponse -&gt; {
            // throw exception
            final int statusCode = httpClientResponse.statusCode();
            if (statusCode != HttpResponseStatus.OK.code()) {
                // CompletableFuture.get() 時會傳遞拋出該異常
                throw new RemotingException(String.format("request [host:%s,port:%s,url:%s] failed, status: %d, msg: %s",
                       host, port, path, statusCode, httpClientResponse.statusMessage()
                        ));
            }

            return httpClientResponse.body().compose(x -&gt; {

                if (clz == null) {
                    return Future.succeededFuture(null);
                }

                if (clz.equals(String.class)) {
                    return Future.succeededFuture((T) x.toString());
                }

                return Future.succeededFuture(x.toJsonObject().mapTo(clz));
            });
        }).toCompletionStage();
    }
VertxTransporter則使用post方法通過vertx的httpClient發(fā)送請求

processWorkerLogReport

tech/powerjob/server/core/handler/AbWorkerRequestHandler.java

@Handler(path = S4W_HANDLER_REPORT_LOG, processType = ProcessType.NO_BLOCKING)
    public void processWorkerLogReport(WorkerLogReportReq req) {
        WorkerLogReportEvent event = new WorkerLogReportEvent()
                .setWorkerAddress(req.getWorkerAddress())
                .setLogNum(req.getInstanceLogContents().size());
        try {
            processWorkerLogReport0(req, event);
            event.setStatus(WorkerLogReportEvent.Status.SUCCESS);
        } catch (RejectedExecutionException re) {
            event.setStatus(WorkerLogReportEvent.Status.REJECTED);
        } catch (Throwable t) {
            event.setStatus(WorkerLogReportEvent.Status.EXCEPTION);
            log.warn("[WorkerRequestHandler] process worker report failed!", t);
        } finally {
            monitorService.monitor(event);
        }
    }
processWorkerLogReport通過processWorkerLogReport0進行處理,最后通過monitorService.monitor(event)上報監(jiān)控

processWorkerLogReport0

tech/powerjob/server/core/handler/WorkerRequestHandlerImpl.java

@Override
    protected void processWorkerLogReport0(WorkerLogReportReq req, WorkerLogReportEvent event) {
        // 這個效率應(yīng)該不會拉垮吧...也就是一些判斷 + Map#get 吧...
        instanceLogService.submitLogs(req.getWorkerAddress(), req.getInstanceLogContents());
    }
processWorkerLogReport0通過instanceLogService.submitLogs進行上報

submitLogs

tech/powerjob/server/core/instance/InstanceLogService.java

/**
     * 提交日志記錄,持久化到本地數(shù)據(jù)庫中
     * @param workerAddress 上報機器地址
     * @param logs 任務(wù)實例運行時日志
     */
    @Async(value = PJThreadPool.LOCAL_DB_POOL)
    public void submitLogs(String workerAddress, List<InstanceLogContent> logs) {

        List<LocalInstanceLogDO> logList = logs.stream().map(x -> {
            instanceId2LastReportTime.put(x.getInstanceId(), System.currentTimeMillis());

            LocalInstanceLogDO y = new LocalInstanceLogDO();
            BeanUtils.copyProperties(x, y);
            y.setWorkerAddress(workerAddress);
            return y;
        }).collect(Collectors.toList());

        try {
            CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.saveAll(logList));
        }catch (Exception e) {
            log.warn("[InstanceLogService] persistent instance logs failed, these logs will be dropped: {}.", logs, e);
        }
    }
InstanceLogService通過PJThreadPool.LOCAL_DB_POOL線程池進行異步,它通過localInstanceLogRepository.saveAll(logList)保存到本地數(shù)據(jù)庫

monitor

tech/powerjob/server/monitor/PowerJobMonitorService.java

public void monitor(Event event) {
        monitors.forEach(m -> m.record(event));
    }
monitor方法遍歷monitors,挨個執(zhí)行record

LogMonitor

tech/powerjob/server/monitor/monitors/LogMonitor.java

public void record(Event event) {
        MDC.put(MDC_KEY_SERVER_ID, String.valueOf(serverInfo.getId()));
        LoggerFactory.getLogger(event.type()).info(event.message());
    }
LogMonitor的record方法通過日志打印event信息

小結(jié)

PowerJob的OmsLogHandler提供了submitLog方法,它先判斷l(xiāng)ogQueue大小是否超過REPORT_SIZE(1024),超過則通過異步線程執(zhí)行l(wèi)ogSubmitter;接著將內(nèi)容包裝為InstanceLogContent,然后放入到logQueue;logSubmitter主要是執(zhí)行reportLogs,它先通過easyBuildUrl構(gòu)建URL,再通過transporter.tell(url, req)發(fā)送請求,rootPath為server,handlerPath為reportLog;服務(wù)端的processWorkerLogReport通過processWorkerLogReport0進行處理(通過localInstanceLogRepository.saveAll(logList)保存到本地數(shù)據(jù)庫),最后通過monitorService.monitor(event)上報監(jiān)控。

以上就是PowerJob的OmsLogHandler的詳細(xì)內(nèi)容,更多關(guān)于PowerJob的OmsLogHandler的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Gson序列化指定忽略字段的三種寫法詳解

    Gson序列化指定忽略字段的三種寫法詳解

    在我們?nèi)粘J褂胘son序列化框架過程中,經(jīng)常會遇到在輸出json字符串時,忽略某些字段,那么在Gson框架中,要想實現(xiàn)這種方式,可以怎么處理呢,本文就來介紹一下
    2021-10-10
  • Spring切面優(yōu)先級與基于xml的AOP實現(xiàn)方法詳解

    Spring切面優(yōu)先級與基于xml的AOP實現(xiàn)方法詳解

    這篇文章主要介紹了Spring切面的優(yōu)先級與基于xml的AOP的詳細(xì)步驟,本文通過實例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2022-11-11
  • Springboot啟用多個監(jiān)聽端口代碼實例

    Springboot啟用多個監(jiān)聽端口代碼實例

    這篇文章主要介紹了Springboot啟用多個監(jiān)聽端口代碼實例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-06-06
  • 淺析JVM垃圾回收的過程

    淺析JVM垃圾回收的過程

    這篇文章主要介紹了JVM垃圾回收的過程,幫助大家更好的理解和學(xué)習(xí)Java中的垃圾回收機制,感興趣的朋友可以了解下
    2020-09-09
  • SpringBoot2.x版本中,使用SpringSession踩的坑及解決

    SpringBoot2.x版本中,使用SpringSession踩的坑及解決

    這篇文章主要介紹了SpringBoot2.x版本中,使用SpringSession踩的坑及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-07-07
  • 解決Tomcat修改get提交請求亂碼問題

    解決Tomcat修改get提交請求亂碼問題

    這篇文章主要介紹了Tomcat修改get提交請求亂碼問題的解決方案,需要的朋友參考下
    2017-04-04
  • java實現(xiàn)隊列queue數(shù)據(jù)結(jié)構(gòu)詳解

    java實現(xiàn)隊列queue數(shù)據(jù)結(jié)構(gòu)詳解

    大家好,本篇文章主要講的是java實現(xiàn)隊列queue數(shù)據(jù)結(jié)構(gòu)詳解,感興趣的同學(xué)趕快來看一看吧,對你有幫助的話記得收藏一下
    2022-02-02
  • jenkins和sonar實現(xiàn)代碼檢測過程詳解

    jenkins和sonar實現(xiàn)代碼檢測過程詳解

    這篇文章主要介紹了jenkins和sonar實現(xiàn)代碼檢測過程詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2019-10-10
  • Java中常用的類型轉(zhuǎn)換(推薦)

    Java中常用的類型轉(zhuǎn)換(推薦)

    這篇文章主要介紹了Java中常用的類型轉(zhuǎn)換(推薦)的相關(guān)資料,非常不錯,具有參考借鑒價值,需要的朋友可以參考下
    2016-06-06
  • java生成圖片驗證碼示例程序

    java生成圖片驗證碼示例程序

    這篇文章主要介紹了java生成圖片驗證碼示例程序,大家參考使用吧
    2013-11-11

最新評論