java Disruptor構(gòu)建高性能內(nèi)存隊(duì)列使用詳解
Java中有哪些隊(duì)列
ArrayBlockingQueue
使用ReentrantLockLinkedBlockingQueue
使用ReentrantLockConcurrentLinkedQueue
使用CAS
等等
我們清楚使用鎖的性能比較低,盡量使用無鎖設(shè)計(jì)。接下來就我們來認(rèn)識(shí)下
Disruptor簡(jiǎn)單使用
github地址:github.com/LMAX-Exchan…
先簡(jiǎn)單介紹下:
- Disruptor它是一個(gè)開源的并發(fā)框架,并獲得2011 Duke’s程序框架創(chuàng)新獎(jiǎng)【Oracle】,能夠在無鎖的情況下實(shí)現(xiàn)網(wǎng)絡(luò)的Queue并發(fā)操作。英國(guó)外匯交易公司LMAX開發(fā)的一個(gè)高性能隊(duì)列,號(hào)稱單線程能支撐每秒600萬訂單~
- 日志框架Log4j2 異步模式采用了Disruptor來處理
- 局限呢,他就是個(gè)內(nèi)存隊(duì)列,也就是說無法支撐分布式場(chǎng)景。
簡(jiǎn)單使用
數(shù)據(jù)傳輸對(duì)象
@Data public class EventData { private Long value; }
消費(fèi)者
public class EventConsumer implements WorkHandler<EventData> { /** * 消費(fèi)回調(diào) * @param eventData * @throws Exception */ @Override public void onEvent(EventData eventData) throws Exception { Thread.sleep(5000); System.out.println(Thread.currentThread() + ", eventData:" + eventData.getValue()); } }
生產(chǎn)者
public class EventProducer { private final RingBuffer<EventData> ringBuffer; public EventProducer(RingBuffer<EventData> ringBuffer) { this.ringBuffer = ringBuffer; } public void sendData(Long v){ // cas展位 long next = ringBuffer.next(); try { EventData eventData = ringBuffer.get(next); eventData.setValue(v); } finally { // 通知等待的消費(fèi)者 System.out.println("EventProducer send success, sequence:"+next); ringBuffer.publish(next); } } }
測(cè)試類
public class DisruptorTest { public static void main(String[] args) { // 2的n次方 int bufferSize = 8; Disruptor<EventData> disruptor = new Disruptor<EventData>( () -> new EventData(), // 事件工廠 bufferSize, // 環(huán)形數(shù)組大小 Executors.defaultThreadFactory(), // 線程池工廠 ProducerType.MULTI, // 支持多事件發(fā)布者 new BlockingWaitStrategy()); // 等待策略 // 設(shè)置消費(fèi)者 disruptor.handleEventsWithWorkerPool( new EventConsumer(), new EventConsumer(), new EventConsumer(), new EventConsumer()); disruptor.start(); RingBuffer<EventData> ringBuffer = disruptor.getRingBuffer(); EventProducer eventProducer = new EventProducer(ringBuffer); long i = 0; for(;;){ i++; eventProducer.sendData(i); try { Thread.sleep(1500); } catch (InterruptedException e) { e.printStackTrace(); } } } }
核心組件
基于上面簡(jiǎn)單例子來看確實(shí)很簡(jiǎn)單,Disruptor幫我們封裝好了生產(chǎn)消費(fèi)模型的實(shí)現(xiàn),接下來我們來看下他是基于哪些核心組件來支撐起一個(gè)高性能無鎖隊(duì)列呢?
RingBuffer: 環(huán)形數(shù)組,底層使用數(shù)組entries,在初始化時(shí)填充數(shù)組,避免不斷新建對(duì)象帶來的開銷。后續(xù)只會(huì)對(duì)entries做更新操作
Sequencer: 核心管家
- 定義生產(chǎn)同步的實(shí)現(xiàn):
SingleProducerSequencer
單生產(chǎn)、MultiProducerSequencer
多生產(chǎn) - 當(dāng)前寫的進(jìn)度Sequence cursor
- 所有消費(fèi)者進(jìn)度的數(shù)組
Sequence[] gatingSequences
MultiProducerSequencer
可用區(qū)availableBuffer
【利用空間換取查詢效率】
Sequence: 本身就是一個(gè)序號(hào)器用來標(biāo)識(shí)處理進(jìn)度,也可以當(dāng)做是一個(gè)atomicInteger
; 還有另外一個(gè)特點(diǎn),為了解決偽共享問題而引入的:緩存行填充。這個(gè)在后面介紹。
workProcessor: 處理Event的循環(huán),在循環(huán)中獲取Disruptor的事件,然后把事件分配給各個(gè)handler
EventHandler: 負(fù)責(zé)業(yè)務(wù)邏輯的handler,自己實(shí)現(xiàn)。
WaitStrategy: 消費(fèi)者 如何等待 事件的策略,定義了如下策略
leepingWaitStrategy
:自旋 + yield + sleep
BlockingWaitStrategy
:加鎖,適合CPU資源緊張(不需要切換線程),系統(tǒng)吞吐量無要求的
YieldingWaitStrategy
:自旋 + yield + 自旋
BusySpinWaitStrategy
:自旋,減少線程之前切換
PhasedBackoffWaitStrategy
:自旋 + yield + 自定義策略
帶著問題來解析代碼?
1、多生產(chǎn)者如何保證消息生產(chǎn)不會(huì)相互覆蓋。【如何達(dá)到互斥效果】
每個(gè)線程獲取不同的一段數(shù)組空間,然后通過CAS判斷這段空間是否已經(jīng)分配出去。
接下來我們看下多生產(chǎn)類MultiProducerSequencer
中next方法【獲取生產(chǎn)序號(hào)】
// 消費(fèi)者上一次消費(fèi)的最小序號(hào) // 后續(xù)第二點(diǎn)會(huì)講到 private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); // 當(dāng)前進(jìn)度的序號(hào) protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); // 所有消費(fèi)者的序號(hào) //后續(xù)第二點(diǎn)會(huì)講到 protected volatile Sequence[] gatingSequences = new Sequence[0]; public long next(int n) { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } long current; long next; do { // 當(dāng)前進(jìn)度的序號(hào),Sequence的value具有可見性,保證多線程間線程之間能感知到可申請(qǐng)的最新值 current = cursor.get(); // 要申請(qǐng)的序號(hào)空間:最大序列號(hào) next = current + n; long wrapPoint = next - bufferSize; // 消費(fèi)者最小序列號(hào) long cachedGatingSequence = gatingSequenceCache.get(); // 大于一圈 || 最小消費(fèi)序列號(hào)>當(dāng)前進(jìn)度 if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) { long gatingSequence = Util.getMinimumSequence(gatingSequences, current); // 說明大于1圈,并沒有多余空間可以申請(qǐng) if (wrapPoint > gatingSequence) { LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy? continue; } // 更新最小值到Sequence的value中 gatingSequenceCache.set(gatingSequence); } // CAS成功后更新當(dāng)前Sequence的value else if (cursor.compareAndSet(current, next)) { break; } } while (true); return next; }
2、生產(chǎn)者向序號(hào)器申請(qǐng)寫的序號(hào),如序號(hào)正在被消費(fèi),Sequencer是如何知道哪些序號(hào)是可以被寫入的呢?【未消費(fèi)則被覆蓋如何處理】
從gatingSequences中取得最小的序號(hào),生產(chǎn)者最多能寫到這個(gè)序號(hào)的后一位。通俗來講就是申請(qǐng)的序號(hào)不能大于最小消費(fèi)者序號(hào)一圈【申請(qǐng)到最大序列號(hào)-buffersize 要小于/等于 最小消費(fèi)的序列號(hào)】的時(shí)候, 才能申請(qǐng)到當(dāng)前寫的序號(hào)
public final EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers) { return createWorkerPool(new Sequence[0], workHandlers); } EventHandlerGroup<T> createWorkerPool( final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers) { final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences); final WorkerPool<T> workerPool = new WorkerPool<>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers); consumerRepository.add(workerPool, sequenceBarrier); final Sequence[] workerSequences = workerPool.getWorkerSequences(); updateGatingSequencesForNextInChain(barrierSequences, workerSequences); return new EventHandlerGroup<>(this, consumerRepository, workerSequences); } private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences) { if (processorSequences.length > 0) { // 消費(fèi)者啟動(dòng)后就會(huì)將所有消費(fèi)者存放入AbstractSequencer中g(shù)atingSequences ringBuffer.addGatingSequences(processorSequences); for (final Sequence barrierSequence : barrierSequences) { ringBuffer.removeGatingSequence(barrierSequence); } consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences); } }
3、在多生產(chǎn)者情況下,生產(chǎn)者是申請(qǐng)到一段可寫入的序號(hào),然后再寫入這些序號(hào)中,那么消費(fèi)者是如何感知哪些序號(hào)是可以被消費(fèi)的呢?【借問提1圖說明】
這個(gè)前提是多生產(chǎn)者情況下,第一點(diǎn)我們說過每個(gè)線程獲取不同的一段數(shù)組空間,那么現(xiàn)在單單通過序號(hào)已經(jīng)不夠用了,MultiProducerSequencer
使用了int 數(shù)組 【availableBuffer
】來標(biāo)識(shí)當(dāng)前序號(hào)是否可用。當(dāng)生產(chǎn)者成功生產(chǎn)事件后會(huì)將availableBuffer
中當(dāng)前序列號(hào)置為1標(biāo)識(shí)可以讀取。
如此消費(fèi)者可以讀取的的最大序號(hào)就是我們availableBuffer
中第一個(gè)不可用序號(hào)-1。
初始化availableBuffer
流程
public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); // 初始化可用數(shù)組 availableBuffer = new int[bufferSize]; indexMask = bufferSize - 1; indexShift = Util.log2(bufferSize); initialiseAvailableBuffer(); } // 初始化默認(rèn)availableBuffer為-1 private void initialiseAvailableBuffer() { for (int i = availableBuffer.length - 1; i != 0; i--) { setAvailableBufferValue(i, -1); } setAvailableBufferValue(0, -1); } // 生產(chǎn)者成功生產(chǎn)事件將可用區(qū)數(shù)組置為1 public void publish(final long sequence) { setAvailable(sequence); waitStrategy.signalAllWhenBlocking(); } private void setAvailableBufferValue(int index, int flag) { long bufferAddress = (index * SCALE) + BASE; UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag); }
消費(fèi)者消費(fèi)流程
WorkProcessor類中消費(fèi)run方法 public void run() { boolean processedSequence = true; long cachedAvailableSequence = Long.MIN_VALUE; long nextSequence = sequence.get(); T event = null; while (true) { try { // 先通過cas獲取消費(fèi)事件的占有權(quán) if (processedSequence) { processedSequence = false; do { nextSequence = workSequence.get() + 1L; sequence.set(nextSequence - 1L); } while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence)); } // 數(shù)據(jù)就緒,可以消費(fèi) if (cachedAvailableSequence >= nextSequence) { event = ringBuffer.get(nextSequence); // 觸發(fā)回調(diào)函數(shù) workHandler.onEvent(event); processedSequence = true; } else { // 獲取可以被讀取的下標(biāo) cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence); } } // ....省略 } notifyShutdown(); running.set(false); } public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException { checkAlert(); // 這個(gè)值獲取的current write 下標(biāo),可以認(rèn)為全局消費(fèi)下標(biāo)。此處與每一段的write1和write2下標(biāo)區(qū)分開 long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this); if (availableSequence < sequence) { return availableSequence; } // 通過availableBuffer篩選出第一個(gè)不可用序號(hào) -1 return sequencer.getHighestPublishedSequence(sequence, availableSequence); } public long getHighestPublishedSequence(long lowerBound, long availableSequence) { // 從current read下標(biāo)開始, 循環(huán)至 current write,如果碰到availableBuffer 為-1 直接返回 for (long sequence = lowerBound; sequence <= availableSequence; sequence++) { if (!isAvailable(sequence)) { return sequence - 1; } } return availableSequence; }
解決偽共享問題
什么是偽共享問題呢?
為了提高CPU的速度,Cpu有高速緩存Cache,該緩存最小單位為緩存行CacheLine,他是從主內(nèi)存復(fù)制的Cache的最小單位,通常是64字節(jié)。一個(gè)Java的long類型是8字節(jié),因此在一個(gè)緩存行中可以存8個(gè)long類型的變量。如果你訪問一個(gè)long數(shù)組,當(dāng)數(shù)組中的一個(gè)值被加載到緩存中,它會(huì)額外加載另外7個(gè)。因此你能非??斓乇闅v這個(gè)數(shù)組。
偽共享問題是指,當(dāng)多個(gè)線程共享某份數(shù)據(jù)時(shí),線程1可能拉到線程2的數(shù)據(jù)在其cache line中,此時(shí)線程1修改數(shù)據(jù),線程2取其數(shù)據(jù)時(shí)就要重新從內(nèi)存中拉取,兩個(gè)線程互相影響,導(dǎo)致數(shù)據(jù)雖然在cache line中,每次卻要去內(nèi)存中拉取。
Disruptor是如何解決的呢?
在value前后統(tǒng)一都加入7個(gè)Long類型進(jìn)行填充,線程拉取時(shí),不論如何都會(huì)占滿整個(gè)緩存
回顧總結(jié):Disuptor為何能稱之為高性能的無鎖隊(duì)列框架呢?
- 緩存行填充,避免緩存頻繁失效。【java8中也引入
@sun.misc.Contended
注解來避免偽共享】 - 無鎖競(jìng)爭(zhēng):通過CAS 【二階段提交】
- 環(huán)形數(shù)組:數(shù)據(jù)都是覆蓋,避免GC
- 底層更多的使用位運(yùn)算來提升效率
以上就是java Disruptor構(gòu)建高性能內(nèi)存隊(duì)列使用詳解的詳細(xì)內(nèi)容,更多關(guān)于java Disruptor構(gòu)建內(nèi)存隊(duì)列的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
詳解如何在SpringBoot中優(yōu)雅地重試調(diào)用第三方API
作為后端程序員,我們的日常工作就是調(diào)用一些第三方服務(wù),將數(shù)據(jù)存入數(shù)據(jù)庫,返回信息給前端。本文為大家介紹了如何在SpringBoot中優(yōu)雅地重試調(diào)用第三方API,需要的可以參考一下2022-12-12Java設(shè)計(jì)模式之迭代模式(Iterator模式)介紹
這篇文章主要介紹了Java設(shè)計(jì)模式之迭代模式(Iterator模式)介紹,本文用一個(gè)老師點(diǎn)名的現(xiàn)象描述了迭代模式的使用,需要的朋友可以參考下2015-03-03Mybatis使用typeHandler加密的實(shí)現(xiàn)
本文詳細(xì)介紹了如何在Mybatis中使用typeHandler對(duì)特定字段進(jìn)行加密處理,涵蓋了從引入依賴、配置Mybatis,到實(shí)現(xiàn)typeHandler繼承類和配置mapper層的詳細(xì)步驟,為需要在項(xiàng)目中實(shí)現(xiàn)字段加密的開發(fā)者提供了參考和借鑒2024-09-09Spring Boot實(shí)現(xiàn)qq郵箱驗(yàn)證碼注冊(cè)和登錄驗(yàn)證功能
這篇文章主要給大家介紹了關(guān)于Spring Boot實(shí)現(xiàn)qq郵箱驗(yàn)證碼注冊(cè)和登錄驗(yàn)證功能的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-12-12Java的DataInputStream和DataOutputStream數(shù)據(jù)輸入輸出流
這里我們來看一下Java的DataInputStream和DataOutputStream數(shù)據(jù)輸入輸出流的使用示例,兩個(gè)類分別繼承于FilterInputStream和FilterOutputStream:2016-06-06Java線程讓步y(tǒng)ield用法實(shí)例分析
這篇文章主要介紹了Java線程讓步y(tǒng)ield用法,結(jié)合實(shí)例形式分析了java中yield()方法的功能、原理及線程讓步操作的相關(guān)實(shí)現(xiàn)技巧,需要的朋友可以參考下2019-09-09