Java集合之Disruptor操作示例
Disruptor簡(jiǎn)介
Disruptor 是一個(gè)開源的高性能內(nèi)存隊(duì)列,由英國(guó)外匯交易公司 LMAX 開發(fā)的,獲得了 2011 年的 Oracle 官方的 Duke's Choice Awards(Duke 選擇大獎(jiǎng))。
Disruptor 提供的功能類似于 Kafka、RocketMQ 這類分布式隊(duì)列,不過(guò),其作為范圍是 JVM(內(nèi)存),Disruptor 解決了 JDK 內(nèi)置線程安全隊(duì)列的性能和內(nèi)存安全問(wèn)題,Disruptor 有個(gè)最大的優(yōu)點(diǎn)就是快
Disruptor被設(shè)計(jì)用于在生產(chǎn)者—消費(fèi)者(producer-consumer problem,簡(jiǎn)稱PCP)問(wèn)題上獲得盡量高的吞吐量(TPS)和盡量低的延遲Disruptor是LMAX在線交易平臺(tái)的關(guān)鍵組成部分,LMAX平臺(tái)使用該框架對(duì)訂單處理速度能達(dá)到600萬(wàn)TPS,除金融領(lǐng)域之外,其他一般的應(yīng)用中都可以用到 Disruptor,它可以帶來(lái)顯著的性能提升。其實(shí) Disruptor 與其說(shuō)是一個(gè)框架,不如說(shuō)是一種設(shè)計(jì)思路,這個(gè)設(shè)計(jì)思路對(duì)于存在并發(fā)、緩沖區(qū)、生產(chǎn)者—消費(fèi)者模型、事務(wù)處理這些元素的程序來(lái)說(shuō),Disruptor 提出了一種大幅提升性能(TPS)的方案。
github 地址
Github 地址:https://github.com/LMAX-Exchange/disruptor
官方教程:https://lmax-exchange.github.io/disruptor/user-guide/index.html
Java中線程安全隊(duì)列
JDK 中常見(jiàn)的線程安全的隊(duì)列如下:
| 隊(duì)列名字 | 鎖 | 是否有界 |
|---|---|---|
| ArrayBlockingQueue | 加鎖(ReentrantLock) | 有界 |
| LinkedBlockingQueue | 加鎖(ReentrantLock) | 有界 |
| LinkedTransferQueue | 無(wú)鎖(CAS) | 無(wú)界 |
| ConcurrentLinkedQueue | 無(wú)鎖(CAS) | 無(wú)界 |
從上表中可以看出:這些隊(duì)列要不就是加鎖有界,要不就是無(wú)鎖無(wú)界。而加鎖的的隊(duì)列勢(shì)必會(huì)影響性能,無(wú)界的隊(duì)列又存在內(nèi)存溢出的風(fēng)險(xiǎn)。
因此,一般情況下,我們都是不建議使用 JDK 內(nèi)置線程安全隊(duì)列。Disruptor 就不一樣了!它在無(wú)鎖的情況下還能保證隊(duì)列有界,并且還是線程安全的。
Disruptor 核心概念
Disruptor 核心概念:
Event:可以把Event理解為存放在隊(duì)列中等待消費(fèi)的消息對(duì)象。
在Disruptor的語(yǔ)義中,生產(chǎn)者和消費(fèi)者之間進(jìn)行交換的數(shù)據(jù)被稱為事件(Event)。它不是一個(gè)被Disruptor定義的特定類型,而是由Disruptor的使用者定義并指定。EventFactory:事件工廠用于生產(chǎn)事件,我們?cè)诔跏蓟?Disruptor類的時(shí)候需要用到。EventHandler:Event在對(duì)應(yīng)的Handler中被處理,你可以將其理解為生產(chǎn)消費(fèi)者模型中的消費(fèi)者。Disruptor定義的事件處理接口,由用戶實(shí)現(xiàn),用于處理事件,是Consumer的真正實(shí)現(xiàn)EventProcessor:EventProcessor持有特定消費(fèi)者(Consumer)的Sequence,并提供用于調(diào)用事件處理實(shí)現(xiàn)的事件循環(huán)(Event Loop)Disruptor:事件的生產(chǎn)和消費(fèi)需要用到Disruptor對(duì)象。RingBuffer:RingBuffer(環(huán)形數(shù)組)用于保存事件。
如其名,環(huán)形的緩沖區(qū)。曾經(jīng)RingBuffer是Disruptor中的最主要的對(duì)象,但從3.0版本開始,其職責(zé)被簡(jiǎn)化為僅僅負(fù)責(zé)對(duì)通過(guò)Disruptor進(jìn)行交換的數(shù)據(jù)(事件)進(jìn)行存儲(chǔ)和更新。在一些更高級(jí)的應(yīng)用場(chǎng)景中,Ring Buffer可以由用戶的自定義實(shí)現(xiàn)來(lái)完全替代。WaitStrategy:等待策略。決定了沒(méi)有事件可以消費(fèi)的時(shí)候,事件消費(fèi)者如何等待新事件的到來(lái)。定義Consumer如何進(jìn)行等待下一個(gè)事件的策略。(注:Disruptor定義了多種不同的策略,針對(duì)不同的場(chǎng)景,提供了不一樣的性能表現(xiàn))Producer:生產(chǎn)者,只是泛指調(diào)用Disruptor發(fā)布事件的用戶代碼,Disruptor沒(méi)有定義特定接口或類型ProducerType:指定是單個(gè)事件發(fā)布者模式還是多個(gè)事件發(fā)布者模式(發(fā)布者和生產(chǎn)者的意思類似)。Sequencer:Sequencer是Disruptor的真正核心。此接口有兩個(gè)實(shí)現(xiàn)類 -SingleProducerSequencer、MultiProducerSequencer,它們定義在生產(chǎn)者和消費(fèi)者之間快速、正確地傳遞數(shù)據(jù)的并發(fā)算法。Sequence Disruptor:通過(guò)順序遞增的序號(hào)來(lái)編號(hào)管理通過(guò)其進(jìn)行交換的數(shù)據(jù)(事件),對(duì)數(shù)據(jù)(事件)的處理過(guò)程總是沿著序號(hào)逐個(gè)遞增處理。一個(gè)Sequence用于跟蹤標(biāo)識(shí)某個(gè)特定的事件處理者(RingBuffer/Consumer)的處理進(jìn)度。
雖然一個(gè)AtomicLong也可以用于標(biāo)識(shí)進(jìn)度,但定義Sequence來(lái)負(fù)責(zé)該問(wèn)題還有另一個(gè)目的,那就是防止不同的Sequence之間的CPU緩存?zhèn)喂蚕?Flase Sharing)問(wèn)題。(注:這是Disruptor實(shí)現(xiàn)高性能的關(guān)鍵點(diǎn)之一)Sequence Barrier:用于保持對(duì)RingBuffer的main published Sequence和Consumer依賴的其它Consumer的Sequence的引用。Sequence Barrier還定義了決定Consumer是否還有可處理的事件的邏輯。

操作
坐標(biāo)依賴
pom.xml
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.4</version>
</dependency>Gradle:
implementation 'com.lmax:disruptor:3.4.4'
創(chuàng)建事件
我們先來(lái)定義一個(gè)代表日志事件的類:LogEvent 。
事件中包含了一些和事件相關(guān)的屬性,比如我們這里定義的 LogEvent 對(duì)象中就有一個(gè)用來(lái)表示日志消息內(nèi)容的屬性:message。
@Data
public class LogEvent {
private String message;
}我們這里只是為了演示,實(shí)際項(xiàng)目中,一個(gè)標(biāo)準(zhǔn)日志事件對(duì)象所包含的屬性肯定不是只有一個(gè) message
創(chuàng)建事件工廠
創(chuàng)建一個(gè)工廠類 LogEventFactory 用來(lái)創(chuàng)建 LogEvent 對(duì)象。
LogEventFactory 繼承 EventFactory 接口并實(shí)現(xiàn)了 newInstance() 方法 。
public class LogEventFactory implements EventFactory<LogEvent> {
@Override
public LogEvent newInstance() {
return new LogEvent();
}
}創(chuàng)建處理事件Handler--消費(fèi)者
創(chuàng)建一個(gè)用于處理后續(xù)發(fā)布的事件的類:LogEventHandler 。
LogEventHandler 繼承 EventHandler 接口并實(shí)現(xiàn)了 onEvent() 方法 。
public class LogEventHandler implements EventHandler<LogEvent> {
@Override
public void onEvent(LogEvent logEvent, long sequence, boolean endOfBatch) throws Exception {
System.out.println(logEvent.getMessage());
}
}EventHandler 接口的 onEvent() 方法共有 3 個(gè)參數(shù):
event:待消費(fèi)/處理的事件sequence:正在處理的事件在環(huán)形數(shù)組(RingBuffer)中的位置endOfBatch:表示這是否是來(lái)自環(huán)形數(shù)組(RingBuffer)中一個(gè)批次的最后一個(gè)事件(批量處理事件)
初始化 Disruptor
靜態(tài)類
我們這里定義一個(gè)方法用于獲取 Disruptor 對(duì)象
private static Disruptor<LogEvent> getLogEventDisruptor() {
// 創(chuàng)建 LogEvent 的工廠
LogEventFactory logEventFactory = new LogEventFactory();
// Disruptor 的 RingBuffer 緩存大小
int bufferSize = 1024 * 1024;
// 生產(chǎn)者的線程工廠
ThreadFactory threadFactory = new ThreadFactory() {
final AtomicInteger threadNum = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "LogEventThread" + " [#" + threadNum.incrementAndGet() + "]");
}
};
//實(shí)例化 Disruptor
return new Disruptor<>(
logEventFactory,
bufferSize,
threadFactory,
// 單生產(chǎn)者
ProducerType.SINGLE,
// 阻塞等待策略
new BlockingWaitStrategy());
}配置類
使用配置類的方式
@Configuration
public class MQManager {
@Bean("messageModel")
public RingBuffer<LogEvent> messageModelRingBuffer() {
//定義用于事件處理的線程池, Disruptor通過(guò)java.util.concurrent.ExecutorSerivce提供的線程來(lái)觸發(fā)consumer的事件處理
// 生產(chǎn)者的線程工廠
ThreadFactory threadFactory = new ThreadFactory() {
final AtomicInteger threadNum = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "LogEventThread" + " [#" + threadNum.incrementAndGet() + "]");
}
};
//指定事件工廠
LogEventFactory factory = new LogEventFactory();
//指定ringbuffer字節(jié)大小,必須為2的N次方(能將求模運(yùn)算轉(zhuǎn)為位運(yùn)算提高效率),否則將影響效率
int bufferSize = 1024 * 256;
//單線程模式,獲取額外的性能
Disruptor<LogEvent> disruptor = new Disruptor<>(factory,
bufferSize,
threadFactory,
ProducerType.SINGLE,
new BlockingWaitStrategy());
//設(shè)置事件業(yè)務(wù)處理器---消費(fèi)者
//Disruptor 的 handleEventsWith 方法來(lái)綁定處理事件的 Handler 對(duì)象。
disruptor.handleEventsWith(new LogEventHandler ());
// Disruptor 可以設(shè)置多個(gè)處理事件的 Handler,并且可以靈活的設(shè)置消費(fèi)者的處理順序,串行,并行都是可以的。
//就比如下面的代碼表示 Handler1 和 Handler2 是并行執(zhí)行,最后再執(zhí)行 Handler3 。
//disruptor.handleEventsWith(new Handler1(), new Handler2()).handleEventsWith(new Handler3());
// 啟動(dòng)disruptor線程
disruptor.start();
//獲取ringbuffer環(huán),用于接取生產(chǎn)者生產(chǎn)的事件
RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
return ringBuffer;
}Disruptor 構(gòu)造函數(shù)講解
Disruptor 的推薦使用的構(gòu)造函數(shù)如下:
public class Disruptor<T> {
public Disruptor(
final EventFactory<T> eventFactory,
final int ringBufferSize,
final ThreadFactory threadFactory,
final ProducerType producerType,
final WaitStrategy waitStrategy)
{
this(
RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
new BasicExecutor(threadFactory));
}
......
}我們需要傳遞 5 個(gè)參數(shù):
eventFactory:我們自定義的事件工廠。ringBufferSize:指定RingBuffer的容量大小。threadFactory:自定義的線程工廠。Disruptor 的默認(rèn)線程池是自定義的,我們只需要傳入線程工廠即可。producerType:指定是單個(gè)事件發(fā)布者模式還是多個(gè)事件發(fā)布者模式(發(fā)布者和生產(chǎn)者的意思類似,我個(gè)人比較喜歡用發(fā)布者)。waitStrategy:等待策略,決定了沒(méi)有事件可以消費(fèi)的時(shí)候,事件消費(fèi)者如何等待新事件的到來(lái)。
ProducerType 的源碼如下,它是一個(gè)包含兩個(gè)變量的枚舉類型
SINGLE:?jiǎn)蝹€(gè)事件發(fā)布者模式,不需要保證線程安全。MULTI:多個(gè)事件發(fā)布者模式,基于 CAS 來(lái)保證線程安全。
WaitStrategy (等待策略)接口的實(shí)現(xiàn)類中只有兩個(gè)方法:
waitFor():等待新事件的到來(lái)。signalAllWhenBlocking():?jiǎn)拘阉械却南M(fèi)者。
public interface WaitStrategy
{
long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier)
throws AlertException, InterruptedException, TimeoutException;
void signalAllWhenBlocking();
}
WaitStrategy 的實(shí)現(xiàn)類共有 8 個(gè),也就是說(shuō)共有 8 種等待策略可供選擇。

除了上面介紹的這個(gè)構(gòu)造函數(shù)之外,Disruptor 還有一個(gè)只有 3 個(gè)參數(shù)構(gòu)造函數(shù)。
使用這個(gè)構(gòu)造函數(shù)創(chuàng)建的 Disruptor 對(duì)象會(huì)默認(rèn)使用 ProducerType.MULTI(多個(gè)事件發(fā)布者模式)和 BlockingWaitStrategy(阻塞等待策略) 。
public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory)
{
this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory));
}發(fā)布事件main方法測(cè)試
//獲取 Disruptor 對(duì)象
Disruptor<LogEvent> disruptor = getLogEventDisruptor();
//綁定處理事件的Handler對(duì)象
disruptor.handleEventsWith(new LogEventHandler());
//啟動(dòng) Disruptor
disruptor.start();
//獲取保存事件的環(huán)形數(shù)組(RingBuffer)
RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
//發(fā)布 10w 個(gè)事件
for (int i = 1; i <= 100000; i++) {
// 通過(guò)調(diào)用 RingBuffer 的 next() 方法獲取下一個(gè)空閑事件槽的序號(hào)
long sequence = ringBuffer.next();
try {
LogEvent logEvent = ringBuffer.get(sequence);
// 初始化 Event,對(duì)其賦值
logEvent.setMessage("這是第%d條日志消息".formatted(i));
} finally {
// 發(fā)布事件
ringBuffer.publish(sequence);
}
}
// 關(guān)閉 Disruptor
disruptor.shutdown();使用配置方式
public interface DisruptorMqService {
/**
* 消息
* @param message
*/
void sayHelloMq(String message);
}
@Slf4j
@Component
@Service
public class DisruptorMqServiceImpl implements DisruptorMqService {
@Autowired
private RingBuffer<LogEvent> messageModelRingBuffer;
@Override
public void sayHelloMq(String message) {
log.info("record the message: {}",message);
//獲取下一個(gè)Event槽的下標(biāo)
long sequence = messageModelRingBuffer.next();
try {
//給Event填充數(shù)據(jù)
MessageModel event = messageModelRingBuffer.get(sequence);
event.setMessage(message);
log.info("往消息隊(duì)列中添加消息:{}", event);
} catch (Exception e) {
log.error("failed to add event to messageModelRingBuffer for : e = {},{}",e,e.getMessage());
} finally {
//發(fā)布Event,激活觀察者去消費(fèi),將sequence傳遞給改消費(fèi)者
//注意最后的publish方法必須放在finally中以確保必須得到調(diào)用;如果某個(gè)請(qǐng)求的sequence未被提交將會(huì)堵塞后續(xù)的發(fā)布操作或者其他的producer
messageModelRingBuffer.publish(sequence);
}
}
}以上就是Java集合之Disruptor操作示例的詳細(xì)內(nèi)容,更多關(guān)于Java集合Disruptor的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Springboot文件上傳功能簡(jiǎn)單測(cè)試
這篇文章主要介紹了Springboot文件上傳功能簡(jiǎn)單測(cè)試,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-05-05
SpringBoot中@Transiactional注解沒(méi)有效果的解決
這篇文章主要介紹了SpringBoot中@Transiactional注解沒(méi)有效果的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08
SpringBoot切面實(shí)現(xiàn)token權(quán)限校驗(yàn)詳解
這篇文章主要介紹了SpringBoot切面實(shí)現(xiàn)token權(quán)限校驗(yàn)詳解,要實(shí)現(xiàn)權(quán)限校驗(yàn),首先數(shù)據(jù)表和實(shí)體類上需要有權(quán)限字段,我的表中permission和gender是通過(guò)外鍵約束permission表和gender表實(shí)現(xiàn)枚舉的,因?yàn)榭赏卣剐愿?需要的朋友可以參考下2024-01-01
解決"XML Parser Error on line 1: 前言中不允許有內(nèi)容"錯(cuò)誤
解決用windows自帶的記事編輯xml文件后出現(xiàn) "XML Parser Error on line 1: 前言中不允許有內(nèi)容。"的錯(cuò)誤2018-02-02
Spring 中使用Quartz實(shí)現(xiàn)任務(wù)調(diào)度
這篇文章主要介紹了Spring 中使用Quartz實(shí)現(xiàn)任務(wù)調(diào)度,Spring中使用Quartz 有兩種方式,感興趣的小伙伴們可以參考一下。2017-02-02

