PowerJob的OmsLogHandler工作流程源碼解析
序
本文主要研究一下PowerJob的OmsLogHandler
OmsLogHandler
tech/powerjob/worker/background/OmsLogHandler.java
@Slf4j
public class OmsLogHandler {
private final String workerAddress;
private final Transporter transporter;
private final ServerDiscoveryService serverDiscoveryService;
// 處理線程,需要通過線程池啟動(dòng)
public final Runnable logSubmitter = new LogSubmitter();
// 上報(bào)鎖,只需要一個(gè)線程上報(bào)即可
private final Lock reportLock = new ReentrantLock();
// 生產(chǎn)者消費(fèi)者模式,異步上傳日志
private final BlockingQueue<InstanceLogContent> logQueue = Queues.newLinkedBlockingQueue(10240);
// 每次上報(bào)攜帶的數(shù)據(jù)條數(shù)
private static final int BATCH_SIZE = 20;
// 本地囤積閾值
private static final int REPORT_SIZE = 1024;
public OmsLogHandler(String workerAddress, Transporter transporter, ServerDiscoveryService serverDiscoveryService) {
this.workerAddress = workerAddress;
this.transporter = transporter;
this.serverDiscoveryService = serverDiscoveryService;
}
/**
* 提交日志
* @param instanceId 任務(wù)實(shí)例ID
* @param logContent 日志內(nèi)容
*/
public void submitLog(long instanceId, LogLevel logLevel, String logContent) {
if (logQueue.size() > REPORT_SIZE) {
// 線程的生命周期是個(gè)不可循環(huán)的過程,一個(gè)線程對(duì)象結(jié)束了不能再次start,只能一直創(chuàng)建和銷毀
new Thread(logSubmitter).start();
}
InstanceLogContent tuple = new InstanceLogContent(instanceId, System.currentTimeMillis(), logLevel.getV(), logContent);
boolean offerRet = logQueue.offer(tuple);
if (!offerRet) {
log.warn("[OmsLogHandler] [{}] submit log failed, maybe your log speed is too fast!", instanceId);
}
}
//......
}OmsLogHandler提供了submitLog方法,它先判斷l(xiāng)ogQueue大小是否超過REPORT_SIZE(1024),超過則通過異步線程執(zhí)行l(wèi)ogSubmitter;接著將內(nèi)容包裝為InstanceLogContent,然后放入到logQueue
LogSubmitter
private class LogSubmitter implements Runnable {
@Override
public void run() {
boolean lockResult = reportLock.tryLock();
if (!lockResult) {
return;
}
try {
final String currentServerAddress = serverDiscoveryService.getCurrentServerAddress();
// 當(dāng)前無(wú)可用 Server
if (StringUtils.isEmpty(currentServerAddress)) {
if (!logQueue.isEmpty()) {
logQueue.clear();
log.warn("[OmsLogHandler] because there is no available server to report logs which leads to queue accumulation, oms discarded all logs.");
}
return;
}
List<InstanceLogContent> logs = Lists.newLinkedList();
while (!logQueue.isEmpty()) {
try {
InstanceLogContent logContent = logQueue.poll(100, TimeUnit.MILLISECONDS);
logs.add(logContent);
if (logs.size() >= BATCH_SIZE) {
WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, Lists.newLinkedList(logs));
// 不可靠請(qǐng)求,WEB日志不追求極致
TransportUtils.reportLogs(req, currentServerAddress, transporter);
logs.clear();
}
}catch (Exception ignore) {
break;
}
}
if (!logs.isEmpty()) {
WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, logs);
TransportUtils.reportLogs(req, currentServerAddress, transporter);
}
}finally {
reportLock.unlock();
}
}
}LogSubmitter實(shí)現(xiàn)了Runnable接口,其run方法先通過reportLock加鎖,成功才繼續(xù),它通過serverDiscoveryService.getCurrentServerAddress()獲取當(dāng)前server的地址,若獲取不到則清空l(shuí)ogQueue;否則while循環(huán),每次從logQueue拉取InstanceLogContent,放到linkedList,超過BATCH_SIZE(20)則創(chuàng)建WorkerLogReportReq,通過TransportUtils.reportLogs(req, currentServerAddress, transporter)上報(bào),然后清空l(shuí)inkedList,跳出循環(huán)之后再上報(bào)剩下的日志,最后釋放鎖
reportLogs
tech/powerjob/worker/common/utils/TransportUtils.java
public static void reportLogs(WorkerLogReportReq req, String address, Transporter transporter) {
final URL url = easyBuildUrl(ServerType.SERVER, S4W_PATH, S4W_HANDLER_REPORT_LOG, address);
transporter.tell(url, req);
}
public static URL easyBuildUrl(ServerType serverType, String rootPath, String handlerPath, String address) {
HandlerLocation handlerLocation = new HandlerLocation()
.setRootPath(rootPath)
.setMethodPath(handlerPath);
return new URL()
.setServerType(serverType)
.setAddress(Address.fromIpv4(address))
.setLocation(handlerLocation);
}reportLogs先通過easyBuildUrl構(gòu)建URL,再通過transporter.tell(url, req)發(fā)送請(qǐng)求,rootPath為server,handlerPath為reportLog
tell
AkkaTransporter
tech/powerjob/remote/akka/AkkaTransporter.java
public void tell(URL url, PowerSerializable request) {
ActorSelection actorSelection = fetchActorSelection(url);
actorSelection.tell(request, null);
}AkkaTransporter直接使用actorSelection發(fā)送請(qǐng)求
VertxTransporter
tech/powerjob/remote/http/vertx/VertxTransporter.java
public void tell(URL url, PowerSerializable request) {
post(url, request, null);
}
private <T> CompletionStage<T> post(URL url, PowerSerializable request, Class<T> clz) {
final String host = url.getAddress().getHost();
final int port = url.getAddress().getPort();
final String path = url.getLocation().toPath();
RequestOptions requestOptions = new RequestOptions()
.setMethod(HttpMethod.POST)
.setHost(host)
.setPort(port)
.setURI(path);
// 獲取遠(yuǎn)程服務(wù)器的HTTP連接
Future<HttpClientRequest> httpClientRequestFuture = httpClient.request(requestOptions);
// 轉(zhuǎn)換 -> 發(fā)送請(qǐng)求獲取響應(yīng)
Future<HttpClientResponse> responseFuture = httpClientRequestFuture.compose(httpClientRequest ->
httpClientRequest
.putHeader(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
.send(JsonObject.mapFrom(request).toBuffer())
);
return responseFuture.compose(httpClientResponse -> {
// throw exception
final int statusCode = httpClientResponse.statusCode();
if (statusCode != HttpResponseStatus.OK.code()) {
// CompletableFuture.get() 時(shí)會(huì)傳遞拋出該異常
throw new RemotingException(String.format("request [host:%s,port:%s,url:%s] failed, status: %d, msg: %s",
host, port, path, statusCode, httpClientResponse.statusMessage()
));
}
return httpClientResponse.body().compose(x -> {
if (clz == null) {
return Future.succeededFuture(null);
}
if (clz.equals(String.class)) {
return Future.succeededFuture((T) x.toString());
}
return Future.succeededFuture(x.toJsonObject().mapTo(clz));
});
}).toCompletionStage();
}VertxTransporter則使用post方法通過vertx的httpClient發(fā)送請(qǐng)求
processWorkerLogReport
tech/powerjob/server/core/handler/AbWorkerRequestHandler.java
@Handler(path = S4W_HANDLER_REPORT_LOG, processType = ProcessType.NO_BLOCKING)
public void processWorkerLogReport(WorkerLogReportReq req) {
WorkerLogReportEvent event = new WorkerLogReportEvent()
.setWorkerAddress(req.getWorkerAddress())
.setLogNum(req.getInstanceLogContents().size());
try {
processWorkerLogReport0(req, event);
event.setStatus(WorkerLogReportEvent.Status.SUCCESS);
} catch (RejectedExecutionException re) {
event.setStatus(WorkerLogReportEvent.Status.REJECTED);
} catch (Throwable t) {
event.setStatus(WorkerLogReportEvent.Status.EXCEPTION);
log.warn("[WorkerRequestHandler] process worker report failed!", t);
} finally {
monitorService.monitor(event);
}
}processWorkerLogReport通過processWorkerLogReport0進(jìn)行處理,最后通過monitorService.monitor(event)上報(bào)監(jiān)控
processWorkerLogReport0
tech/powerjob/server/core/handler/WorkerRequestHandlerImpl.java
@Override
protected void processWorkerLogReport0(WorkerLogReportReq req, WorkerLogReportEvent event) {
// 這個(gè)效率應(yīng)該不會(huì)拉垮吧...也就是一些判斷 + Map#get 吧...
instanceLogService.submitLogs(req.getWorkerAddress(), req.getInstanceLogContents());
}processWorkerLogReport0通過instanceLogService.submitLogs進(jìn)行上報(bào)
submitLogs
tech/powerjob/server/core/instance/InstanceLogService.java
/**
* 提交日志記錄,持久化到本地?cái)?shù)據(jù)庫(kù)中
* @param workerAddress 上報(bào)機(jī)器地址
* @param logs 任務(wù)實(shí)例運(yùn)行時(shí)日志
*/
@Async(value = PJThreadPool.LOCAL_DB_POOL)
public void submitLogs(String workerAddress, List<InstanceLogContent> logs) {
List<LocalInstanceLogDO> logList = logs.stream().map(x -> {
instanceId2LastReportTime.put(x.getInstanceId(), System.currentTimeMillis());
LocalInstanceLogDO y = new LocalInstanceLogDO();
BeanUtils.copyProperties(x, y);
y.setWorkerAddress(workerAddress);
return y;
}).collect(Collectors.toList());
try {
CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.saveAll(logList));
}catch (Exception e) {
log.warn("[InstanceLogService] persistent instance logs failed, these logs will be dropped: {}.", logs, e);
}
}InstanceLogService通過PJThreadPool.LOCAL_DB_POOL線程池進(jìn)行異步,它通過localInstanceLogRepository.saveAll(logList)保存到本地?cái)?shù)據(jù)庫(kù)
monitor
tech/powerjob/server/monitor/PowerJobMonitorService.java
public void monitor(Event event) {
monitors.forEach(m -> m.record(event));
}monitor方法遍歷monitors,挨個(gè)執(zhí)行record
LogMonitor
tech/powerjob/server/monitor/monitors/LogMonitor.java
public void record(Event event) {
MDC.put(MDC_KEY_SERVER_ID, String.valueOf(serverInfo.getId()));
LoggerFactory.getLogger(event.type()).info(event.message());
}LogMonitor的record方法通過日志打印event信息
小結(jié)
PowerJob的OmsLogHandler提供了submitLog方法,它先判斷l(xiāng)ogQueue大小是否超過REPORT_SIZE(1024),超過則通過異步線程執(zhí)行l(wèi)ogSubmitter;接著將內(nèi)容包裝為InstanceLogContent,然后放入到logQueue;logSubmitter主要是執(zhí)行reportLogs,它先通過easyBuildUrl構(gòu)建URL,再通過transporter.tell(url, req)發(fā)送請(qǐng)求,rootPath為server,handlerPath為reportLog;服務(wù)端的processWorkerLogReport通過processWorkerLogReport0進(jìn)行處理(通過localInstanceLogRepository.saveAll(logList)保存到本地?cái)?shù)據(jù)庫(kù)),最后通過monitorService.monitor(event)上報(bào)監(jiān)控。
以上就是PowerJob的OmsLogHandler的詳細(xì)內(nèi)容,更多關(guān)于PowerJob的OmsLogHandler的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- PowerJob的TimingStrategyHandler工作流程源碼解讀
- PowerJob的IdGenerateService工作流程源碼解讀
- PowerJob LockService方法工作流程源碼解讀
- PowerJob的Evaluator方法工作流程源碼解讀
- PowerJob的DatabaseMonitorAspect源碼流程
- PowerJob的AbstractScriptProcessor實(shí)現(xiàn)類工作流程源碼解讀
- PowerJob的WorkerHealthReporter工作流程源碼解讀
- PowerJob的ServerDiscoveryService工作流程源碼解讀
- PowerJob的ProcessorLoader工作流程源碼解讀
- PowerJob的DispatchStrategy方法工作流程源碼解讀
相關(guān)文章
Spring切面優(yōu)先級(jí)與基于xml的AOP實(shí)現(xiàn)方法詳解
這篇文章主要介紹了Spring切面的優(yōu)先級(jí)與基于xml的AOP的詳細(xì)步驟,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-11-11
Springboot啟用多個(gè)監(jiān)聽端口代碼實(shí)例
這篇文章主要介紹了Springboot啟用多個(gè)監(jiān)聽端口代碼實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-06-06
SpringBoot2.x版本中,使用SpringSession踩的坑及解決
這篇文章主要介紹了SpringBoot2.x版本中,使用SpringSession踩的坑及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07
java實(shí)現(xiàn)隊(duì)列queue數(shù)據(jù)結(jié)構(gòu)詳解
大家好,本篇文章主要講的是java實(shí)現(xiàn)隊(duì)列queue數(shù)據(jù)結(jié)構(gòu)詳解,感興趣的同學(xué)趕快來(lái)看一看吧,對(duì)你有幫助的話記得收藏一下2022-02-02
jenkins和sonar實(shí)現(xiàn)代碼檢測(cè)過程詳解
這篇文章主要介紹了jenkins和sonar實(shí)現(xiàn)代碼檢測(cè)過程詳解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-10-10

