LoggingEventAsyncDisruptorAppender類執(zhí)行流程源碼解讀
序
本文主要研究一下LoggingEventAsyncDisruptorAppender
LoggingEventAsyncDisruptorAppender
net/logstash/logback/appender/LoggingEventAsyncDisruptorAppender.java
public class LoggingEventAsyncDisruptorAppender extends DelegatingAsyncDisruptorAppender<ILoggingEvent, AppenderListener<ILoggingEvent>> { /** * Set to true if the caller data should be captured before publishing the event * to the {@link RingBuffer} */ private boolean includeCallerData; protected void prepareForDeferredProcessing(ILoggingEvent event) { super.prepareForDeferredProcessing(event); if (includeCallerData) { event.getCallerData(); } } public boolean isIncludeCallerData() { return includeCallerData; } public void setIncludeCallerData(boolean includeCallerData) { this.includeCallerData = includeCallerData; } }
LoggingEventAsyncDisruptorAppender繼承了DelegatingAsyncDisruptorAppender,它定義了includeCallerData屬性,其prepareForDeferredProcessing在includeCallerData為true時執(zhí)行event.getCallerData()
DelegatingAsyncDisruptorAppender
net/logstash/logback/appender/DelegatingAsyncDisruptorAppender.java
public abstract class DelegatingAsyncDisruptorAppender<Event extends DeferredProcessingAware, Listener extends AppenderListener<Event>> extends AsyncDisruptorAppender<Event, Listener> implements AppenderAttachable<Event> { /** * The delegate appenders. */ private final AppenderAttachableImpl<Event> appenders = new AppenderAttachableImpl<>(); @Override protected EventHandler<LogEvent<Event>> createEventHandler() { return new DelegatingEventHandler(); } @Override public void start() { startDelegateAppenders(); super.start(); } @Override public void stop() { if (!isStarted()) { return; } super.stop(); stopDelegateAppenders(); } private void startDelegateAppenders() { for (Iterator<Appender<Event>> appenderIter = appenders.iteratorForAppenders(); appenderIter.hasNext();) { Appender<Event> appender = appenderIter.next(); if (appender.getContext() == null) { appender.setContext(getContext()); } if (!appender.isStarted()) { appender.start(); } } } private void stopDelegateAppenders() { for (Iterator<Appender<Event>> appenderIter = appenders.iteratorForAppenders(); appenderIter.hasNext();) { Appender<Event> appender = appenderIter.next(); if (appender.isStarted()) { appender.stop(); } } } @Override public void addAppender(Appender<Event> newAppender) { appenders.addAppender(newAppender); } @Override public Iterator<Appender<Event>> iteratorForAppenders() { return appenders.iteratorForAppenders(); } @Override public Appender<Event> getAppender(String name) { return appenders.getAppender(name); } @Override public boolean isAttached(Appender<Event> appender) { return appenders.isAttached(appender); } @Override public void detachAndStopAllAppenders() { appenders.detachAndStopAllAppenders(); } @Override public boolean detachAppender(Appender<Event> appender) { return appenders.detachAppender(appender); } @Override public boolean detachAppender(String name) { return appenders.detachAppender(name); } }
DelegatingAsyncDisruptorAppender繼承了AsyncDisruptorAppender,它定義了AppenderAttachableImpl,其createEventHandler創(chuàng)建的是DelegatingEventHandler,其start方法會執(zhí)行startDelegateAppenders,其stop方法會執(zhí)行stopDelegateAppenders,其addAppender會添加appender到AppenderAttachableImpl中
DelegatingEventHandler
private class DelegatingEventHandler implements EventHandler<LogEvent<Event>> { /** * Whether exceptions should be reported with a error status or not. */ private boolean silentError; @Override public void onEvent(LogEvent<Event> logEvent, long sequence, boolean endOfBatch) throws Exception { boolean exceptionThrown = false; for (Iterator<Appender<Event>> it = appenders.iteratorForAppenders(); it.hasNext();) { Appender<Event> appender = it.next(); try { appender.doAppend(logEvent.event); /* * Optimization: * * If any of the delegate appenders are instances of OutputStreamAppender or Flushable, * then flush them at the end of the batch. */ if (endOfBatch) { flushAppender(appender); } } catch (Exception e) { exceptionThrown = true; if (!this.silentError) { addError(String.format("Unable to forward event to appender [%s]: %s", appender.getName(), e.getMessage()), e); } } } this.silentError = exceptionThrown; } private void flushAppender(Appender<Event> appender) throws IOException { // Similar to #doAppend() - don't flush if appender is stopped if (!appender.isStarted()) { return; } if (appender instanceof Flushable) { flushAppender((Flushable) appender); } else if (appender instanceof OutputStreamAppender) { flushAppender((OutputStreamAppender<Event>) appender); } } private void flushAppender(OutputStreamAppender<Event> appender) throws IOException { if (!appender.isImmediateFlush()) { OutputStream os = appender.getOutputStream(); if (os != null) { os.flush(); } } } private void flushAppender(Flushable appender) throws IOException { appender.flush(); } }
DelegatingEventHandler實現(xiàn)了EventHandler接口,其onEvent方法主要是遍歷AppenderAttachableImpl,挨個執(zhí)行appender.doAppend(logEvent.event),在endOfBatch的時候會執(zhí)行flushAppender
AsyncDisruptorAppender
net/logstash/logback/appender/AsyncDisruptorAppender.java
public abstract class AsyncDisruptorAppender<Event extends DeferredProcessingAware, Listener extends AppenderListener<Event>> extends UnsynchronizedAppenderBase<Event> { /** * Time in nanos to wait between drain attempts during the shutdown phase */ private static final long SLEEP_TIME_DURING_SHUTDOWN = 50 * 1_000_000L; // 50ms protected static final String APPENDER_NAME_FORMAT = "%1$s"; protected static final String THREAD_INDEX_FORMAT = "%2$d"; public static final String DEFAULT_THREAD_NAME_FORMAT = "logback-appender-" + APPENDER_NAME_FORMAT + "-" + THREAD_INDEX_FORMAT; public static final int DEFAULT_RING_BUFFER_SIZE = 8192; public static final ProducerType DEFAULT_PRODUCER_TYPE = ProducerType.MULTI; public static final WaitStrategy DEFAULT_WAIT_STRATEGY = new BlockingWaitStrategy(); public static final int DEFAULT_DROPPED_WARN_FREQUENCY = 1000; private static final RingBufferFullException RING_BUFFER_FULL_EXCEPTION = new RingBufferFullException(); static { RING_BUFFER_FULL_EXCEPTION.setStackTrace(new StackTraceElement[] {new StackTraceElement(AsyncDisruptorAppender.class.getName(), "append(..)", null, -1)}); } /** * The size of the {@link RingBuffer}. * Defaults to {@value #DEFAULT_RING_BUFFER_SIZE}. * <p> * Must be a positive power of 2. */ private int ringBufferSize = DEFAULT_RING_BUFFER_SIZE; /** * The {@link ProducerType} to use to configure the Disruptor. * Only set to {@link ProducerType#SINGLE} if only one thread * will ever be appending to this appender. */ private ProducerType producerType = DEFAULT_PRODUCER_TYPE; /** * The {@link WaitStrategy} to used by the RingBuffer * when pulling events to be processed by {@link #eventHandler}. * <p> * By default, a {@link BlockingWaitStrategy} is used, which is the most * CPU conservative, but results in a higher latency. * If you need lower latency (at the cost of higher CPU usage), * consider using a {@link SleepingWaitStrategy} or a {@link PhasedBackoffWaitStrategy}. */ private WaitStrategy waitStrategy = DEFAULT_WAIT_STRATEGY; /** * Pattern used by the {@link WorkerThreadFactory} to set the * handler thread name. * Defaults to {@value #DEFAULT_THREAD_NAME_FORMAT}. * <p> * * If you change the {@link #threadFactory}, then this * value may not be honored. * <p> * * The string is a format pattern understood by {@link Formatter#format(String, Object...)}. * {@link Formatter#format(String, Object...)} is used to * construct the actual thread name prefix. * The first argument (%1$s) is the string appender name. * The second argument (%2$d) is the numerical thread index. * Other arguments can be made available by subclasses. */ private String threadNameFormat = DEFAULT_THREAD_NAME_FORMAT; /** * When true, child threads created by this appender will be daemon threads, * and therefore allow the JVM to exit gracefully without * needing to explicitly shut down the appender. * Note that in this case, it is possible for log events to not * be handled. * <p> * * When false, child threads created by this appender will not be daemon threads, * and therefore will prevent the JVM from shutting down * until the appender is explicitly shut down. * Set this to false if you want to ensure that every log event * prior to shutdown is handled. * <p> * * If you change the {@link #threadFactory}, then this * value may not be honored. */ private boolean useDaemonThread = true; /** * When true, if no status listener is registered, then a default {@link OnConsoleStatusListener} * will be registered, so that error messages are seen on the console. */ private boolean addDefaultStatusListener = true; /** * For every droppedWarnFrequency consecutive dropped events, log a warning. * Defaults to {@value #DEFAULT_DROPPED_WARN_FREQUENCY}. */ private int droppedWarnFrequency = DEFAULT_DROPPED_WARN_FREQUENCY; /** * The {@link ThreadFactory} used to create the handler thread. */ private ThreadFactory threadFactory = new WorkerThreadFactory(); /** * The {@link Disruptor} containing the {@link RingBuffer} onto * which to publish events. */ private Disruptor<LogEvent<Event>> disruptor; /** * Sets the {@link LogEvent#event} to the logback Event. * Used when publishing events to the {@link RingBuffer}. */ private EventTranslatorOneArg<LogEvent<Event>, Event> eventTranslator = new LogEventTranslator<>(); /** * Defines what happens when there is an exception during * {@link RingBuffer} processing. */ private ExceptionHandler<LogEvent<Event>> exceptionHandler = new LogEventExceptionHandler(); /** * Consecutive number of dropped events. */ private final AtomicLong consecutiveDroppedCount = new AtomicLong(); /** * The {@link EventFactory} used to create {@link LogEvent}s for the RingBuffer. */ private LogEventFactory<Event> eventFactory = new LogEventFactory<>(); /** * Incrementor number used as part of thread names for uniqueness. */ private final AtomicInteger threadNumber = new AtomicInteger(1); /** * These listeners will be notified when certain events occur on this appender. */ protected final List<Listener> listeners = new ArrayList<>(); /** * Maximum time to wait when appending events to the ring buffer when full before the event * is dropped. Use the following values: * <ul> * <li>{@code -1} to disable timeout and wait until space becomes available. * <li>{@code 0} for no timeout and drop the event immediately when the buffer is full. * <li>{@code > 0} to retry during the specified amount of time. * </ul> */ private Duration appendTimeout = Duration.buildByMilliseconds(0); /** * Delay between consecutive attempts to append an event in the ring buffer when * full. */ private Duration appendRetryFrequency = Duration.buildByMilliseconds(5); /** * How long to wait for in-flight events during shutdown. */ private Duration shutdownGracePeriod = Duration.buildByMinutes(1); /** * Lock used to limit the number of concurrent threads retrying at the same time */ private final ReentrantLock lock = new ReentrantLock(); //...... }
AsyncDisruptorAppender繼承了logback的UnsynchronizedAppenderBase,它使用了Disruptor的RingBuffer來進(jìn)行異步,其默認(rèn)的WaitStrategy為BlockingWaitStrategy,默認(rèn)的ringBufferSize為8192,默認(rèn)的producerType為ProducerType.MULTI,droppedWarnFrequency為1000
start
public void start() { if (addDefaultStatusListener && getStatusManager() != null && getStatusManager().getCopyOfStatusListenerList().isEmpty()) { LevelFilteringStatusListener statusListener = new LevelFilteringStatusListener(); statusListener.setLevelValue(Status.WARN); statusListener.setDelegate(new OnConsoleStatusListener()); statusListener.setContext(getContext()); statusListener.start(); getStatusManager().add(statusListener); } this.disruptor = new Disruptor<>( this.eventFactory, this.ringBufferSize, this.threadFactory, this.producerType, this.waitStrategy); /* * Define the exceptionHandler first, so that it applies * to all future eventHandlers. */ this.disruptor.setDefaultExceptionHandler(this.exceptionHandler); this.disruptor.handleEventsWith(new EventClearingEventHandler<>(createEventHandler())); this.disruptor.start(); super.start(); fireAppenderStarted(); }
其start方法根據(jù)eventFactory、ringBufferSize、threadFactory、producerType、waitStrategy創(chuàng)建Disruptor,然后設(shè)置defaultExceptionHandler,設(shè)置EventHandler為EventClearingEventHandler,然后執(zhí)行disruptor.start(),再執(zhí)行super.start()
stop
public void stop() { /* * Check super.isStarted() instead of isStarted() because subclasses * might override isStarted() to perform other comparisons that we don't * want to check here. Those should be checked by subclasses * prior to calling super.stop() */ if (!super.isStarted()) { return; } /* * Don't allow any more events to be appended. */ super.stop(); /* * Shutdown Disruptor * * Calling Disruptor#shutdown() will wait until all enqueued events are fully processed, * but this waiting happens in a busy-spin. To avoid wasting CPU we wait for at most the configured * grace period before asking the Disruptor for an immediate shutdown. */ long deadline = getShutdownGracePeriod().getMilliseconds() < 0 ? Long.MAX_VALUE : System.currentTimeMillis() + getShutdownGracePeriod().getMilliseconds(); while (!isRingBufferEmpty() && (System.currentTimeMillis() < deadline)) { LockSupport.parkNanos(SLEEP_TIME_DURING_SHUTDOWN); } this.disruptor.halt(); if (!isRingBufferEmpty()) { addWarn("Some queued events have not been logged due to requested shutdown"); } fireAppenderStopped(); }
stop方法先執(zhí)行super.stop()不讓event再進(jìn)來,然后根據(jù)shutdownGracePeriod計算deadline,在isRingBufferEmpty為false的時候進(jìn)行等待,最后執(zhí)行disruptor.halt()
append
protected void append(Event event) { long startTime = System.nanoTime(); try { prepareForDeferredProcessing(event); } catch (RuntimeException e) { addWarn("Unable to prepare event for deferred processing. Event output might be missing data.", e); } try { if (enqueue(event)) { // Log warning if we had drop before // long consecutiveDropped = this.consecutiveDroppedCount.get(); if (consecutiveDropped != 0 && this.consecutiveDroppedCount.compareAndSet(consecutiveDropped, 0L)) { addWarn("Dropped " + consecutiveDropped + " total events due to ring buffer at max capacity [" + this.ringBufferSize + "]"); } // Notify listeners // fireEventAppended(event, System.nanoTime() - startTime); } else { // Log a warning status about the failure // long consecutiveDropped = this.consecutiveDroppedCount.incrementAndGet(); if ((consecutiveDropped % this.droppedWarnFrequency) == 1) { addWarn("Dropped " + consecutiveDropped + " events (and counting...) due to ring buffer at max capacity [" + this.ringBufferSize + "]"); } // Notify listeners // fireEventAppendFailed(event, RING_BUFFER_FULL_EXCEPTION); } } catch (ShutdownInProgressException e) { // Same message as if Appender#append is called after the appender is stopped... addWarn("Attempted to append to non started appender [" + this.getName() + "]."); } catch (InterruptedException e) { // be silent but re-interrupt the thread Thread.currentThread().interrupt(); } }
其append方法先執(zhí)行prepareForDeferredProcessing,再執(zhí)行enqueue,enqueue不成功則遞增consecutiveDroppedCount,再根據(jù)droppedWarnFrequency判斷是否需要打印warning日志
enqueue
private boolean enqueue(Event event) throws ShutdownInProgressException, InterruptedException { // Try enqueue the "normal" way // if (this.disruptor.getRingBuffer().tryPublishEvent(this.eventTranslator, event)) { return true; } // Drop event immediately when no retry // if (this.appendTimeout.getMilliseconds() == 0) { return false; } // Limit retries to a single thread at once to avoid burning CPU cycles "for nothing" // in CPU constraint environments. // long deadline = Long.MAX_VALUE; if (this.appendTimeout.getMilliseconds() < 0) { lock.lockInterruptibly(); } else { deadline = System.currentTimeMillis() + this.appendTimeout.getMilliseconds(); if (!lock.tryLock(this.appendTimeout.getMilliseconds(), TimeUnit.MILLISECONDS)) { return false; } } // Retry until deadline // long backoff = 1L; long backoffLimit = TimeUnit.MILLISECONDS.toNanos(this.appendRetryFrequency.getMilliseconds()); try { do { if (!isStarted()) { throw new ShutdownInProgressException(); } if (deadline <= System.currentTimeMillis()) { return false; } if (Thread.currentThread().isInterrupted()) { throw new InterruptedException(); } LockSupport.parkNanos(backoff); backoff = Math.min(backoff * 2, backoffLimit); } while (!this.disruptor.getRingBuffer().tryPublishEvent(this.eventTranslator, event)); return true; } finally { lock.unlock(); } }
enqueue方法先執(zhí)行disruptor.getRingBuffer().tryPublishEvent,成功則返回true,否則根據(jù)appendTimeout判斷是否需要重試,為0則返回false,否則根據(jù)appendTimeout計算deadline,然后進(jìn)行l(wèi)ock.tryLock,再循環(huán)嘗試disruptor.getRingBuffer().tryPublishEvent,直到成功或者deadline小于等于當(dāng)前時間
小結(jié)
reactor-logback已經(jīng)不在維護(hù)了EOL reactor-logback in 3.3+ #204,官方推薦使用logstash-logback-encoder。LoggingEventAsyncDisruptorAppender繼承了DelegatingAsyncDisruptorAppender,主要是根據(jù)includeCallerData屬性判斷是否需要計算callerData;DelegatingAsyncDisruptorAppender繼承了AsyncDisruptorAppender,它主要是組合了AppenderAttachableImpl實現(xiàn)了AppenderAttachable接口;AsyncDisruptorAppender則是使用了Disruptor的RingBuffer來進(jìn)行異步,其默認(rèn)的WaitStrategy為BlockingWaitStrategy,默認(rèn)的ringBufferSize為8192,默認(rèn)的producerType為ProducerType.MULTI,droppedWarnFrequency為1000。
以上就是聊聊LoggingEventAsyncDisruptorAppender的詳細(xì)內(nèi)容,更多關(guān)于LoggingEventAsyncDisruptorAppender的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
通過實例了解Java 8創(chuàng)建Stream流的5種方法
這篇文章主要介紹了通過實例了解Java 8創(chuàng)建Stream流的5種方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-12-12Spring數(shù)據(jù)庫多數(shù)據(jù)源路由配置過程圖解
這篇文章主要介紹了Spring數(shù)據(jù)庫多數(shù)據(jù)源路由配置過程圖解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-06-06深入分析@Resource和@Autowired注解區(qū)別
這篇文章主要為大家介紹了深入分析@Resource和@Autowired注解區(qū)別,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-04-04Java后端產(chǎn)生驗證碼后臺驗證功能的實現(xiàn)代碼
這篇文章主要介紹了Java后臺產(chǎn)生驗證碼后臺驗證功能,本文文字結(jié)合實例代碼給大家介紹的非常詳細(xì),具有一定的參考借鑒價值,需要的朋友可以參考下2019-06-06關(guān)于idea中出現(xiàn)nbsp和zwsp的完美解決辦法
本文給大家介紹關(guān)于idea中出現(xiàn)nbsp和zwsp的解決辦法,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧2022-06-06Java的線程池ThreadPoolExecutor及多種線程池實現(xiàn)詳解
這篇文章主要介紹了Java的線程池ThreadPoolExecutor及多種線程池實現(xiàn)詳解,ThreadPoolExecutor 使用 int 的高 3 位來表示線程池狀態(tài),低 29 位表示線程數(shù)量,之所以將信息存儲在一個變量中,是為了保證原子性,需要的朋友可以參考下2024-01-01Spring boot 默認(rèn)靜態(tài)資源路徑與手動配置訪問路徑的方法
這篇文章主要介紹了Spring boot 默認(rèn)靜態(tài)資源路徑與手動配置訪問路徑的方法,非常不錯,具有參考借鑒價值,需要的朋友可以參考下2017-05-05springboot + mybatis + druid + 多數(shù)據(jù)源的問題詳解
這篇文章主要介紹了springboot + mybatis + druid + 多數(shù)據(jù)源的問題詳解,示例代碼文字相結(jié)合給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-09-09Java中Map與JSON數(shù)據(jù)之間的互相轉(zhuǎn)化
我們在開發(fā)中難免和JSON打交道,這不小編最近遇到了,需要把一些信息轉(zhuǎn)成JSON字符串,下面這篇文章主要給大家介紹了關(guān)于Java中Map與JSON數(shù)據(jù)之間的互相轉(zhuǎn)化,需要的朋友可以參考下2023-04-04Spark操作之a(chǎn)ggregate、aggregateByKey詳解
這篇文章主要介紹了Spark操作之a(chǎn)ggregate、aggregateByKey詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-06-06