從log4j2到Disruptor詳解
- log4j2實現(xiàn)原理可查看://www.dbjr.com.cn/article/232602.htm
- 文章同樣基于log4j-2.7版本,disruptor-3.3.6
相信看過log4j2的源碼后大家應(yīng)該明白為什么第二代日志性能會提升那么多,這其中最大的功臣莫過于Disruptor并發(fā)編程框架。
下面我們就跟著log4j2來走進(jìn)Disruptor這個神奇的框(wang)架(zhan)
log4j2異步日志簡要回顧
從日志工廠(Log4jLoggerFactory)中獲取日志Logger實例
從日志上下文工廠(Log4jContextFactory)獲取日志上下文
啟用日志上下文(AsyncLoggerContext)
啟動Disruptor(AsyncLoggerDisruptor)
序列號屏障(ProcessingSequenceBarrier)等待序列號發(fā)布
等待策略(WaitStrategy)等待序列號
返回Logger等待序列號(即等待日志寫入)
異步日志(AsyncLogger)寫入
日志內(nèi)容與轉(zhuǎn)化者(RingBufferLogEventTranslator)綁定
Disruptor嘗試發(fā)布轉(zhuǎn)化者tryPublish
RingBuffer嘗試發(fā)布事件tryPublishEvent
獲取下一個可用序號
轉(zhuǎn)化并發(fā)布序號,日志與序號對應(yīng)的事件綁定,并發(fā)布序號
RingBuffer發(fā)布序號,MultiProducerSequencer發(fā)布
等待策略(waitStrategy)喚醒阻塞
Disruptor在log4j2中的應(yīng)用
AsyncLoggerDisruptor
異步日志Disruptor啟動
創(chuàng)建事件工廠EventFactory
計算ringBufferSize:AsyncLogger.RingBufferSize屬性
創(chuàng)建等待策略:AsyncLogger.WaitStrategy屬性
創(chuàng)建守護(hù)線程執(zhí)行器executor
創(chuàng)建異步隊列滿時處理策略AsyncQueueFullPolicy(非Disruptor步驟)
創(chuàng)建Disruptor
- 創(chuàng)建RingBuffer與Disruptor綁定
- RingBuffer根據(jù)生產(chǎn)者類型創(chuàng)建對應(yīng)的實例,例如多生產(chǎn)者:MultiProducerSequencer
- 創(chuàng)建多生產(chǎn)者序號(bufferSize,waitStrategy)
綁定異常句柄(Disruptor.handleExceptionsWith)
綁定事件處理句柄(Disruptor.handleEventsWith)
- 根據(jù)handle列表創(chuàng)建事件處理器createEventProcessors
- RingBuffer為Sequence(MultiProducerSequencer)序列創(chuàng)建序列屏障ProcessingSequenceBarrier
- 創(chuàng)建事件批處理器BatchEventProcessor
- 為事件批處理器綁定異常處理句柄
- 消費者倉庫(consumerRepository)添加消費者,創(chuàng)建事件處理信息EventProcessorInfo添加至消費者信息列表consumerInfos
- RingBuffer添加處理序列號列表processorSequences為序列號閘
- 如果存在序列號屏障,從閘門中移除屏障序列號并標(biāo)識endOfChain為false
啟動Disruptor
- 遍歷消費者倉庫放入執(zhí)行器中執(zhí)行消費者EventProcessorInfo
- 啟動事件批處理器BatchEventProcessor
- 事件批處理器序列號自增1
- 死循環(huán)
- 序列號屏障ProcessingSequenceBarrier等待下個有效序列號,默認(rèn)為超時等待策略,超時會繼續(xù)下輪循環(huán)
- 事件批處理器序列號如果小于等于有效序列號
- 從RingBuffer中按照序列號獲取event事件
- 通知回調(diào)事件句柄eventHandler.onEvent如果當(dāng)前消費下標(biāo)等于有效序列號availableSequence說明是當(dāng)前批次的最后一個消息,endOfBatch為true:eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
- 事件批處理器序列號設(shè)置為有效序列號
異步日志Disruptor寫入
嘗試發(fā)布tryPublish事件轉(zhuǎn)化器EventTranslator:RingBufferLogEventTranslator
Disruptor獲取RingBuffer嘗試發(fā)布事件tryPublishEvent
序列號獲取下個有效序號,步進(jìn)為1,例如:MultiProducerSequencer.tryNext
游標(biāo)按照步進(jìn)移動
判斷是否有足夠的空間,沒有則拋出InsufficientCapacityException異常
返回有效序列號
轉(zhuǎn)化器轉(zhuǎn)化消息為對應(yīng)有效序列號的事件放入entries
發(fā)布序列號
- 設(shè)置有效序列號至緩存availableBuffer
- 等待策略喚醒阻塞waitStrategy.signalAllWhenBlocking
架構(gòu)及流程
紅色數(shù)字標(biāo)識流程為獲取logger時Disruptor創(chuàng)建消費者流程
黑色數(shù)字標(biāo)識流程為logger寫入日志時Disruptor創(chuàng)建事件并通知消費者流程
RingBuffer對于所有消費者、生產(chǎn)者是同一個實例
- 環(huán)形隊列,dataProvide,數(shù)據(jù)的存儲與提供者
Sequencer:生產(chǎn)者
- 對于所有消費者、生產(chǎn)者(可能是多生產(chǎn)者序列類型對于Multi類型)是同一個實例,包含一個游標(biāo)序列號Sequence
SequenceBarrier:序列號屏障
- 對于所有消費者、生產(chǎn)者也是同一個實例,序列號屏障包含一個等待策略、一個RingBuffer引用、一個游標(biāo)序列號、一個依賴序列號(可能是組序列號類型)
BatchEventProcessor:消費者
- 消費者包含一個RingBuffer引用
- 一個序列號屏障,可以包含多個屏障序列號,默認(rèn)為0個則使用RingBuffer的MultiProducerSequencer的游標(biāo)序列號Sequence
- 一個EventHandler:RingBufferLogEventHandler
- 遍歷EventHandler列表將其封裝為BatchEventProcessor,將其與原始eventHandler、barrier屏障注冊至消費者資源庫consumerRepository。
- 獲取batchEventProcessor序列號默認(rèn)為-1,將其緩存至processorSequences標(biāo)識正在處理,并將processorSequences、disruptor、consumerRepository綁定至EventHandlerGroup。Disruptor啟動遍歷消費者資源庫啟動消費者:BatchEventProcessor
消費者入口
- 消費者消費前先自增本地序列號(即-1+1=0序號),向序列號屏障申請該序列號的消費,默認(rèn)為Timeout策略申請。
- 屏障收到申請waitFor序列號,當(dāng)前屏障游標(biāo)序列號小于申請的消費序列號,等待生產(chǎn)者生產(chǎn)至當(dāng)前序列號,如果超時則拋出異常(本地序列號不更新繼續(xù)重試);如果沒有超時,將屏障的dependentSequence序列號(如果不是非多序列號屏障類型,log4j2使用的是非多序列號屏障,則是屏障的本地游標(biāo))賦值為availableSequence返回。
- 如果availableSequence有效的序列號(即屏障的游標(biāo)序列號)小于申請要消費的序列號直接返回availableSequence(即消費超出的生產(chǎn)的速度,消費者申請的序列號向后回移至有效序列號)。否則getHighestPublishedSequence判斷申請的序列號至availableSequence序列號之間的每個序列號對應(yīng)的消息事件均是有效的則返回有效序列號(即生產(chǎn)者生產(chǎn)很快,消費者申請消費的序列號很小,向前移動至有效的,可能是本身也可能會跳躍多個下標(biāo)),根據(jù)生產(chǎn)者的availableBuffer判斷是否有效,因為生產(chǎn)者先發(fā)布序列號再寫入數(shù)據(jù),此處避免了讀取數(shù)據(jù)異常,如果數(shù)據(jù)沒有寫入,有效序列號緩存標(biāo)識沒有寫入(即無效),消費者會進(jìn)行剛剛所說的“重試”,如果之間存在無效序列號則返回申請序列號-1(即回滾一個值,進(jìn)入邏輯時增加了一個值,也就是回滾至申請前的點,可以理解為與超時相同,即重試)
- 如果申請的序列號小于等于有效的序列號,則消費序列號對應(yīng)的消息事件并更新本地BatchEventProcessor的序列號,按照下標(biāo)去dataProvide(RingBuffer.entries)中提取對應(yīng)位置的數(shù)據(jù)消費
- 如果申請的序列號大于有效的序列號,則將消費者本地序列號設(shè)置為有效序列號(即消費超出的生產(chǎn)的速度,消費的序列號向后回移)
- 如果期間出現(xiàn)任何未catch住的異常則會跳過當(dāng)前下標(biāo),異常出現(xiàn)時的下標(biāo)及對應(yīng)的事件會交由exceptionHandler處理,默認(rèn)為AsyncLoggerDefaultExceptionHandler異步處理,會將異常事件輸出至系統(tǒng)的標(biāo)準(zhǔn)錯誤管道,雖然是異步也是會占用消費者線程池資源
Disruptor:生產(chǎn)者入口
- 獲取RingBuffer嘗試發(fā)布消息,生產(chǎn)者(例如:MultiProducerSequencer)
- 生產(chǎn)者游標(biāo)序列號嘗試自增,判斷當(dāng)前是否有足夠的空間,當(dāng)前游標(biāo)+步進(jìn)-bufferSize是否大于最小的閘門序列號(gatingSequences,即:所有消費者的本地游標(biāo)序列號processorSequences列表),最小序列號會緩存至本地gatingSequenceCache用于下次判斷減少進(jìn)行所有閘門序列號的遍歷次數(shù),如果是說明已經(jīng)沒有空間(因為生產(chǎn)者生產(chǎn)申請的序列號已經(jīng)追上了消費者消費序列號的最小值。RingBuffer是一個環(huán)形隊列結(jié)構(gòu)。上面已經(jīng)講到消費者序列號會與生產(chǎn)者序列號同步,同步指消費者申請序列號小于有效序列號時會前進(jìn)至有效序列號,即使有延遲也保證了有大于等于buffer值的緩沖空間供生產(chǎn)者生產(chǎn)),如果沒有空間返回false進(jìn)入下一輪生產(chǎn)
- 自增成功后,將消息轉(zhuǎn)化為對應(yīng)序列號下標(biāo)位置的事件數(shù)據(jù)
- Sequencer發(fā)布序列號,將當(dāng)前序列號設(shè)置為有效(availableBuffer),并根據(jù)等待策略喚醒等待的消費者,被喚醒的消費者根據(jù)發(fā)布的序列號獲取相應(yīng)下標(biāo)處事件數(shù)據(jù)進(jìn)行處理
Disruptor為什么這么快?
Disruptor采用無鎖并發(fā)編程,框架中主要使用CAS與volatile關(guān)鍵字保證并發(fā)安全
使用環(huán)形數(shù)據(jù)結(jié)構(gòu)(另一個典型的應(yīng)用是時鐘算法也是使用的環(huán)形數(shù)據(jù)結(jié)構(gòu)),為環(huán)形結(jié)構(gòu)添加序列號屏障來控制對環(huán)形隊列讀寫操作,保證存儲數(shù)據(jù)的并發(fā)安全
另一個點便是神奇的緩沖行填充了
Log4j2為什么這么快?
使用Disruptor并發(fā)編程框架
使用NIO寫入日志數(shù)據(jù)
當(dāng)然log4j2中有很多細(xì)節(jié),如果我們想要獲取線程棧信息,可以同樣學(xué)習(xí)一下這樣的寫法
// LOG4J2-1029 new Throwable().getStackTrace is faster than Thread.currentThread().getStackTrace(). final StackTraceElement[] stackTrace = new Throwable().getStackTrace(); StackTraceElement last = null; for (int i = stackTrace.length - 1; i > 0; i--) { final String className = stackTrace[i].getClassName(); if (fqcnOfLogger.equals(className)) { return last; } last = stackTrace[i]; }
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
IDEA設(shè)置字體隨鼠標(biāo)滾動放大縮小的實現(xiàn)
這篇文章主要介紹了IDEA設(shè)置字體隨鼠標(biāo)滾動放大縮小的實現(xiàn)方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-01-01sonar-scanner連接sonarquebe7的sonar.java.binaries問題的解決方案
今天小編就為大家分享一篇關(guān)于sonar-scanner連接sonarquebe7的sonar.java.binaries問題的解決方案,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2018-12-12java開發(fā)之基于Validator接口的SpringMVC數(shù)據(jù)校驗方式
這篇文章主要介紹了java開發(fā)之基于Validator接口的SpringMVC數(shù)據(jù)校驗方式,文中附含詳細(xì)示例代碼,有需要的朋友可以借鑒參考下2021-09-09Java中生成隨機數(shù)的實現(xiàn)方法總結(jié)
這篇文章主要介紹了Java中生成隨機數(shù)的實現(xiàn)方法總結(jié),其中多線程并發(fā)的實現(xiàn)方式尤為exciting,需要的朋友可以參考下2015-11-11idea中使用maven?archetype新建項目時卡住問題解決方案
這篇文章主要介紹了idea中使用maven?archetype新建項目時卡住,解決本問題的方法,就是在maven的runner加上參數(shù)-DarchetypeCatalog=local就可以了,不需要下載xml文件再放到指定目錄,需要的朋友可以參考下2023-08-08