reactor-logback的AsyncAppender執(zhí)行流程源碼解讀
序
本文主要研究一下reactor-logback的AsyncAppender
AsyncAppender
reactor-logback/src/main/java/reactor/logback/AsyncAppender.java
public class AsyncAppender extends ContextAwareBase implements Appender<ILoggingEvent>, AppenderAttachable<ILoggingEvent>, CoreSubscriber<ILoggingEvent> { private final AppenderAttachableImpl<ILoggingEvent> aai = new AppenderAttachableImpl<ILoggingEvent>(); private final FilterAttachableImpl<ILoggingEvent> fai = new FilterAttachableImpl<ILoggingEvent>(); private final AtomicReference<Appender<ILoggingEvent>> delegate = new AtomicReference<Appender<ILoggingEvent>>(); private String name; private WorkQueueProcessor<ILoggingEvent> processor; private int backlog = 1024 * 1024; private boolean includeCallerData = false; private boolean started = false; //...... }
AsyncAppender繼承了ContextAwareBase,同時實(shí)現(xiàn)了Appender、AppenderAttachable、CoreSubscriber接口
CoreSubscriber
reactor/core/CoreSubscriber.java
public interface CoreSubscriber<T> extends Subscriber<T> { /** * Request a {@link Context} from dependent components which can include downstream * operators during subscribing or a terminal {@link org.reactivestreams.Subscriber}. * * @return a resolved context or {@link Context#empty()} */ default Context currentContext(){ return Context.empty(); } /** * Implementors should initialize any state used by {@link #onNext(Object)} before * calling {@link Subscription#request(long)}. Should further {@code onNext} related * state modification occur, thread-safety will be required. * <p> * Note that an invalid request {@code <= 0} will not produce an onError and * will simply be ignored or reported through a debug-enabled * {@link reactor.util.Logger}. * * {@inheritDoc} */ @Override void onSubscribe(Subscription s); }
CoreSubscriber繼承了Subscriber接口,Subscriber接口定義了onSubscribe(Subscription s)、onNext、onError、onComplete方法
onSubscribe
public void onSubscribe(Subscription s) { try { doStart(); } catch (Throwable t) { addError(t.getMessage(), t); } finally { started = true; s.request(Long.MAX_VALUE); } } protected void doStart() { }
onSubscribe方法執(zhí)行doStart,標(biāo)記started為true,同時觸發(fā)s.request(Long.MAX_VALUE)
onNext
public void onNext(ILoggingEvent iLoggingEvent) { aai.appendLoopOnAppenders(iLoggingEvent); }
onNext調(diào)用AppenderAttachableImpl的appendLoopOnAppenders方法
onError
public void onError(Throwable t) { addError(t.getMessage(), t); }
onError主要是添加錯誤信息到logback的status
onComplete
public void onComplete() { try { Appender<ILoggingEvent> appender = delegate.getAndSet(null); if (appender != null){ doStop(); appender.stop(); aai.detachAndStopAllAppenders(); } } catch (Throwable t) { addError(t.getMessage(), t); } finally { started = false; } } protected void doStop() { }
onComplete則執(zhí)行doStop、appender.stop()、aai.detachAndStopAllAppenders(),最后標(biāo)記started為false
Appender.doAppend
public void doAppend(ILoggingEvent evt) throws LogbackException { if (getFilterChainDecision(evt) == FilterReply.DENY) { return; } evt.prepareForDeferredProcessing(); if (includeCallerData) { evt.getCallerData(); } try { queueLoggingEvent(evt); } catch (Throwable t) { addError(t.getMessage(), t); } } protected void queueLoggingEvent(ILoggingEvent evt) { if (null != delegate.get()) { processor.onNext(evt); } }
doAppend方法先判斷是否需要DENY,是則直接返回,之后主要執(zhí)行queueLoggingEvent,它在delegate不為null時執(zhí)行processor.onNext(evt)
LifeCycle.start
public void start() { startDelegateAppender(); processor = WorkQueueProcessor.<ILoggingEvent>builder().name("logger") .bufferSize(backlog) .autoCancel(false) .build(); processor.subscribe(this); } private void startDelegateAppender() { Appender<ILoggingEvent> delegateAppender = delegate.get(); if (null != delegateAppender && !delegateAppender.isStarted()) { delegateAppender.start(); } } public void addAppender(Appender<ILoggingEvent> newAppender) { if (delegate.compareAndSet(null, newAppender)) { aai.addAppender(newAppender); } else { throw new IllegalArgumentException(delegate.get() + " already attached."); } }
start方法執(zhí)行startDelegateAppender,然后創(chuàng)建WorkQueueProcessor(默認(rèn)bufferSize為1024 * 1024
),并subscribe當(dāng)前實(shí)例;addAppender方法會設(shè)置delegate,并往AppenderAttachableImpl添加appender
stop
public void stop() { processor.onComplete(); }
stop方法執(zhí)行processor.onComplete()
小結(jié)
reactor-logback基于WorkQueueProcessor提供了另外一種AsyncAppender,它不是基于BlockingQueue而是基于RingBuffer來實(shí)現(xiàn)的。其onSubscribe方法執(zhí)行doStart,標(biāo)記started為true,同時觸發(fā)s.request(Long.MAX_VALUE);onNext調(diào)用AppenderAttachableImpl的appendLoopOnAppenders方法;onComplete則執(zhí)行doStop、appender.stop()、aai.detachAndStopAllAppenders(),最后標(biāo)記started為false;doAppend方法先判斷是否需要DENY,是則直接返回,之后主要執(zhí)行queueLoggingEvent,它在delegate不為null時執(zhí)行processor.onNext(evt)。
以上就是reactor-logback的AsyncAppender執(zhí)行流程源碼解讀的詳細(xì)內(nèi)容,更多關(guān)于reactor-logback AsyncAppender的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Hadoop環(huán)境配置之hive環(huán)境配置詳解
這篇文章主要介紹了Hadoop環(huán)境配置之hive環(huán)境配置,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-12-12Java使用openOffice對于word的轉(zhuǎn)換及遇到的問題解決
開發(fā)過程中經(jīng)常會使用java將office系列文檔轉(zhuǎn)換為PDF, 一般都使用微軟提供的openoffice+jodconverter 實(shí)現(xiàn)轉(zhuǎn)換文檔,下面這篇文章主要給大家介紹了關(guān)于Java通過openOffice對于word的轉(zhuǎn)換及遇到問題的解決方法,需要的朋友可以參考下2018-09-09springboot實(shí)現(xiàn)敏感字段加密存儲解密顯示功能
這篇文章主要介紹了springboot實(shí)現(xiàn)敏感字段加密存儲,解密顯示,通過mybatis,自定義注解+AOP切面,Base64加解密方式實(shí)現(xiàn)功能,本文通過代碼實(shí)現(xiàn)給大家介紹的非常詳細(xì),需要的朋友可以參考下2022-02-02Netty網(wǎng)絡(luò)編程實(shí)戰(zhàn)之開發(fā)聊天室功能
這篇文章主要為大家詳細(xì)介紹了如何利用Netty實(shí)現(xiàn)聊天室功能,文中的示例代碼講解詳細(xì),對我們學(xué)習(xí)Netty網(wǎng)絡(luò)編程有一定幫助,需要的可以參考一下2022-10-10java 遍歷Map及Map轉(zhuǎn)化為二維數(shù)組的實(shí)例
這篇文章主要介紹了java 遍歷Map及Map轉(zhuǎn)化為二維數(shù)組的實(shí)例的相關(guān)資料,希望通過本文能幫助到大家,實(shí)現(xiàn)這樣的功能,需要的朋友可以參考下2017-08-08Java中List、Set、Map的區(qū)別和實(shí)現(xiàn)方式示例代碼
這篇文章主要介紹了Java中List、Set、Map的區(qū)別和實(shí)現(xiàn)方式示例代碼,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-06-06