欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java集合之Disruptor操作示例

 更新時間:2023年08月20日 10:49:33   作者:上善若淚  
這篇文章主要為大家介紹了Java集合之Disruptor操作示例介紹,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

Disruptor簡介

Disruptor 是一個開源的高性能內(nèi)存隊列,由英國外匯交易公司 LMAX 開發(fā)的,獲得了 2011 年的 Oracle 官方的 Duke's Choice Awards(Duke 選擇大獎)。

Disruptor 提供的功能類似于 Kafka、RocketMQ 這類分布式隊列,不過,其作為范圍是 JVM(內(nèi)存),Disruptor 解決了 JDK 內(nèi)置線程安全隊列的性能和內(nèi)存安全問題,Disruptor 有個最大的優(yōu)點就是快

Disruptor被設(shè)計用于在生產(chǎn)者消費者producer-consumer problem,簡稱PCP)問題上獲得盡量高的吞吐量(TPS)和盡量低的延遲
Disruptor是LMAX在線交易平臺的關(guān)鍵組成部分,LMAX平臺使用該框架對訂單處理速度能達到600萬TPS,除金融領(lǐng)域之外,其他一般的應(yīng)用中都可以用到 Disruptor,它可以帶來顯著的性能提升。其實 Disruptor 與其說是一個框架,不如說是一種設(shè)計思路,這個設(shè)計思路對于存在并發(fā)、緩沖區(qū)、生產(chǎn)者—消費者模型、事務(wù)處理這些元素的程序來說,Disruptor 提出了一種大幅提升性能(TPS)的方案。

github 地址

Github 地址:https://github.com/LMAX-Exchange/disruptor

官方教程:https://lmax-exchange.github.io/disruptor/user-guide/index.html

Java中線程安全隊列

JDK 中常見的線程安全的隊列如下:

隊列名字是否有界
ArrayBlockingQueue加鎖(ReentrantLock)有界
LinkedBlockingQueue加鎖(ReentrantLock)有界
LinkedTransferQueue無鎖(CAS)無界
ConcurrentLinkedQueue無鎖(CAS)無界

從上表中可以看出:這些隊列要不就是加鎖有界,要不就是無鎖無界。而加鎖的的隊列勢必會影響性能,無界的隊列又存在內(nèi)存溢出的風(fēng)險。
因此,一般情況下,我們都是不建議使用 JDK 內(nèi)置線程安全隊列。
Disruptor 就不一樣了!它在無鎖的情況下還能保證隊列有界,并且還是線程安全的。

Disruptor 核心概念

Disruptor 核心概念:

  • Event:可以把 Event 理解為存放在隊列中等待消費的消息對象。
    Disruptor 的語義中,生產(chǎn)者和消費者之間進行交換的數(shù)據(jù)被稱為事件(Event)。它不是一個被 Disruptor 定義的特定類型,而是由 Disruptor 的使用者定義并指定。
  • EventFactory:事件工廠用于生產(chǎn)事件,我們在初始化 Disruptor 類的時候需要用到。
  • EventHandlerEvent 在對應(yīng)的 Handler 中被處理,你可以將其理解為生產(chǎn)消費者模型中的消費者。
    Disruptor 定義的事件處理接口,由用戶實現(xiàn),用于處理事件,是 Consumer 的真正實現(xiàn)
  • EventProcessorEventProcessor 持有特定消費者(Consumer)的 Sequence,并提供用于調(diào)用事件處理實現(xiàn)的事件循環(huán)(Event Loop)
  • Disruptor:事件的生產(chǎn)和消費需要用到 Disruptor 對象。
  • RingBufferRingBuffer(環(huán)形數(shù)組)用于保存事件
    如其名,環(huán)形的緩沖區(qū)。曾經(jīng) RingBufferDisruptor 中的最主要的對象,但從3.0版本開始,其職責被簡化為僅僅負責對通過 Disruptor 進行交換的數(shù)據(jù)(事件)進行存儲和更新。在一些更高級的應(yīng)用場景中,Ring Buffer 可以由用戶的自定義實現(xiàn)來完全替代。
  • WaitStrategy:等待策略。決定了沒有事件可以消費的時候,事件消費者如何等待新事件的到來。定義 Consumer 如何進行等待下一個事件的策略。(注:Disruptor 定義了多種不同的策略,針對不同的場景,提供了不一樣的性能表現(xiàn))
  • Producer:生產(chǎn)者,只是泛指調(diào)用 Disruptor 發(fā)布事件的用戶代碼,Disruptor 沒有定義特定接口或類型
  • ProducerType:指定是單個事件發(fā)布者模式還是多個事件發(fā)布者模式(發(fā)布者和生產(chǎn)者的意思類似)。
  • SequencerSequencerDisruptor 的真正核心。此接口有兩個實現(xiàn)類 - SingleProducerSequencerMultiProducerSequencer ,它們定義在生產(chǎn)者和消費者之間快速、正確地傳遞數(shù)據(jù)的并發(fā)算法。
  • Sequence Disruptor:通過順序遞增的序號來編號管理通過其進行交換的數(shù)據(jù)(事件),對數(shù)據(jù)(事件)的處理過程總是沿著序號逐個遞增處理。一個 Sequence 用于跟蹤標識某個特定的事件處理者( RingBuffer/Consumer )的處理進度。
    雖然一個 AtomicLong 也可以用于標識進度,但定義 Sequence 來負責該問題還有另一個目的,那就是防止不同的 Sequence 之間的 CPU 緩存?zhèn)喂蚕?Flase Sharing)問題。(注:這是 Disruptor 實現(xiàn)高性能的關(guān)鍵點之一)
  • Sequence Barrier:用于保持對 RingBuffermain published SequenceConsumer 依賴的其它 ConsumerSequence 的引用。Sequence Barrier 還定義了決定 Consumer 是否還有可處理的事件的邏輯。

操作

坐標依賴

pom.xml

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.4</version>
</dependency>

Gradle:

implementation 'com.lmax:disruptor:3.4.4'

創(chuàng)建事件

我們先來定義一個代表日志事件的類:LogEvent 。

事件中包含了一些和事件相關(guān)的屬性,比如我們這里定義的 LogEvent 對象中就有一個用來表示日志消息內(nèi)容的屬性:message。

@Data
public class LogEvent {
    private String message;
}

我們這里只是為了演示,實際項目中,一個標準日志事件對象所包含的屬性肯定不是只有一個 message

創(chuàng)建事件工廠

創(chuàng)建一個工廠類 LogEventFactory 用來創(chuàng)建 LogEvent 對象。
LogEventFactory 繼承 EventFactory 接口并實現(xiàn)了 newInstance() 方法 。

public class LogEventFactory implements EventFactory<LogEvent> {
    @Override
    public LogEvent newInstance() {
        return new LogEvent();
    }
}

創(chuàng)建處理事件Handler--消費者

創(chuàng)建一個用于處理后續(xù)發(fā)布的事件的類:LogEventHandler 。
LogEventHandler 繼承 EventHandler 接口并實現(xiàn)了 onEvent() 方法 。

public class LogEventHandler implements EventHandler<LogEvent> {
    @Override
    public void onEvent(LogEvent logEvent, long sequence, boolean endOfBatch) throws Exception {
        System.out.println(logEvent.getMessage());
    }
}

EventHandler 接口的 onEvent() 方法共有 3 個參數(shù):

  • event:待消費/處理的事件
  • sequence:正在處理的事件在環(huán)形數(shù)組(RingBuffer)中的位置
  • endOfBatch:表示這是否是來自環(huán)形數(shù)組(RingBuffer)中一個批次的最后一個事件(批量處理事件)

初始化 Disruptor

靜態(tài)類

我們這里定義一個方法用于獲取 Disruptor 對象

private static Disruptor<LogEvent> getLogEventDisruptor() {
    // 創(chuàng)建 LogEvent 的工廠
    LogEventFactory logEventFactory = new LogEventFactory();
    // Disruptor 的 RingBuffer 緩存大小
    int bufferSize = 1024 * 1024;
    // 生產(chǎn)者的線程工廠
    ThreadFactory threadFactory = new ThreadFactory() {
        final AtomicInteger threadNum = new AtomicInteger(0);
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "LogEventThread" + " [#" + threadNum.incrementAndGet() + "]");
        }
    };
    //實例化 Disruptor
    return new Disruptor<>(
            logEventFactory,
            bufferSize,
            threadFactory,
            // 單生產(chǎn)者
            ProducerType.SINGLE,
            // 阻塞等待策略
            new BlockingWaitStrategy());
}

配置類

使用配置類的方式

@Configuration
public class MQManager {
    @Bean("messageModel")
    public RingBuffer<LogEvent> messageModelRingBuffer() {
        //定義用于事件處理的線程池, Disruptor通過java.util.concurrent.ExecutorSerivce提供的線程來觸發(fā)consumer的事件處理
        // 生產(chǎn)者的線程工廠
    ThreadFactory threadFactory = new ThreadFactory() {
        final AtomicInteger threadNum = new AtomicInteger(0);
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "LogEventThread" + " [#" + threadNum.incrementAndGet() + "]");
        }
    };
        //指定事件工廠
        LogEventFactory factory = new LogEventFactory();
        //指定ringbuffer字節(jié)大小,必須為2的N次方(能將求模運算轉(zhuǎn)為位運算提高效率),否則將影響效率
        int bufferSize = 1024 * 256;
        //單線程模式,獲取額外的性能
        Disruptor<LogEvent> disruptor = new Disruptor<>(factory,
             bufferSize, 
             threadFactory,
             ProducerType.SINGLE, 
             new BlockingWaitStrategy());
        //設(shè)置事件業(yè)務(wù)處理器---消費者
        //Disruptor 的 handleEventsWith 方法來綁定處理事件的 Handler 對象。
        disruptor.handleEventsWith(new LogEventHandler ());
      // Disruptor 可以設(shè)置多個處理事件的 Handler,并且可以靈活的設(shè)置消費者的處理順序,串行,并行都是可以的。
       //就比如下面的代碼表示 Handler1 和 Handler2 是并行執(zhí)行,最后再執(zhí)行 Handler3 。
       //disruptor.handleEventsWith(new Handler1(), new Handler2()).handleEventsWith(new Handler3());
        // 啟動disruptor線程
        disruptor.start();
        //獲取ringbuffer環(huán),用于接取生產(chǎn)者生產(chǎn)的事件
        RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
        return ringBuffer;
    }

Disruptor 構(gòu)造函數(shù)講解

Disruptor 的推薦使用的構(gòu)造函數(shù)如下:

public class Disruptor<T> {
  public Disruptor(
          final EventFactory<T> eventFactory,
          final int ringBufferSize,
          final ThreadFactory threadFactory,
          final ProducerType producerType,
          final WaitStrategy waitStrategy)
  {
      this(
          RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
          new BasicExecutor(threadFactory));
  }
......
}

我們需要傳遞 5 個參數(shù):

  • eventFactory:我們自定義的事件工廠。
  • ringBufferSize:指定 RingBuffer 的容量大小。
  • threadFactory:自定義的線程工廠。Disruptor 的默認線程池是自定義的,我們只需要傳入線程工廠即可。
  • producerType:指定是單個事件發(fā)布者模式還是多個事件發(fā)布者模式(發(fā)布者和生產(chǎn)者的意思類似,我個人比較喜歡用發(fā)布者)。
  • waitStrategy:等待策略,決定了沒有事件可以消費的時候,事件消費者如何等待新事件的到來。

ProducerType 的源碼如下,它是一個包含兩個變量的枚舉類型

  • SINGLE:單個事件發(fā)布者模式,不需要保證線程安全。
  • MULTI:多個事件發(fā)布者模式,基于 CAS 來保證線程安全。

WaitStrategy (等待策略)接口的實現(xiàn)類中只有兩個方法:

  • waitFor():等待新事件的到來。
  • signalAllWhenBlocking():喚醒所有等待的消費者。
public interface WaitStrategy
{
    long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier)
        throws AlertException, InterruptedException, TimeoutException;
    void signalAllWhenBlocking();
}

WaitStrategy 的實現(xiàn)類共有 8 個,也就是說共有 8 種等待策略可供選擇。

除了上面介紹的這個構(gòu)造函數(shù)之外,Disruptor 還有一個只有 3 個參數(shù)構(gòu)造函數(shù)。

使用這個構(gòu)造函數(shù)創(chuàng)建的 Disruptor 對象會默認使用 ProducerType.MULTI(多個事件發(fā)布者模式)和 BlockingWaitStrategy(阻塞等待策略) 。

public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory)
{
    this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory));
}

發(fā)布事件main方法測試

//獲取 Disruptor 對象
Disruptor<LogEvent> disruptor = getLogEventDisruptor();
//綁定處理事件的Handler對象
disruptor.handleEventsWith(new LogEventHandler());
//啟動 Disruptor
disruptor.start();
//獲取保存事件的環(huán)形數(shù)組(RingBuffer)
RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
//發(fā)布 10w 個事件
for (int i = 1; i <= 100000; i++) {
    // 通過調(diào)用 RingBuffer 的 next() 方法獲取下一個空閑事件槽的序號
    long sequence = ringBuffer.next();
    try {
        LogEvent logEvent = ringBuffer.get(sequence);
        // 初始化 Event,對其賦值
        logEvent.setMessage("這是第%d條日志消息".formatted(i));
    } finally {
        // 發(fā)布事件
        ringBuffer.publish(sequence);
    }
}
// 關(guān)閉 Disruptor
disruptor.shutdown();

使用配置方式

public interface DisruptorMqService {
    /**
     * 消息
     * @param message
     */
    void sayHelloMq(String message);
}
@Slf4j
@Component
@Service
public class DisruptorMqServiceImpl implements DisruptorMqService {
    @Autowired
    private RingBuffer<LogEvent> messageModelRingBuffer;
    @Override
    public void sayHelloMq(String message) {
        log.info("record the message: {}",message);
        //獲取下一個Event槽的下標
        long sequence = messageModelRingBuffer.next();
        try {
            //給Event填充數(shù)據(jù)
            MessageModel event = messageModelRingBuffer.get(sequence);
            event.setMessage(message);
            log.info("往消息隊列中添加消息:{}", event);
        } catch (Exception e) {
            log.error("failed to add event to messageModelRingBuffer for : e = {},{}",e,e.getMessage());
        } finally {
            //發(fā)布Event,激活觀察者去消費,將sequence傳遞給改消費者
            //注意最后的publish方法必須放在finally中以確保必須得到調(diào)用;如果某個請求的sequence未被提交將會堵塞后續(xù)的發(fā)布操作或者其他的producer
            messageModelRingBuffer.publish(sequence);
        }
    }
}

以上就是Java集合之Disruptor操作示例的詳細內(nèi)容,更多關(guān)于Java集合Disruptor的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • java獲取中文拼音首字母的實例

    java獲取中文拼音首字母的實例

    下面小編就為大家?guī)硪黄猨ava獲取中文拼音首字母的實例。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-09-09
  • Springboot文件上傳功能簡單測試

    Springboot文件上傳功能簡單測試

    這篇文章主要介紹了Springboot文件上傳功能簡單測試,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-05-05
  • SpringBoot中@Transiactional注解沒有效果的解決

    SpringBoot中@Transiactional注解沒有效果的解決

    這篇文章主要介紹了SpringBoot中@Transiactional注解沒有效果的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-08-08
  • Java中Shiro安全框架的權(quán)限管理

    Java中Shiro安全框架的權(quán)限管理

    這篇文章主要介紹了Java中Shiro安全框架的權(quán)限管理,Apache?Shiro是Java的一個安全框架,Shiro可以非常容易的開發(fā)出足夠好的應(yīng)用,其不僅可以用在JavaSE環(huán)境,也可以用在JavaEE環(huán)境,需要的朋友可以參考下
    2023-08-08
  • Java位運算知識點詳解

    Java位運算知識點詳解

    這篇文章給大家分享了關(guān)于Java位運算的相關(guān)知識點內(nèi)容,有興趣的朋友們可以學(xué)習(xí)參考下。
    2018-09-09
  • SpringBoot切面實現(xiàn)token權(quán)限校驗詳解

    SpringBoot切面實現(xiàn)token權(quán)限校驗詳解

    這篇文章主要介紹了SpringBoot切面實現(xiàn)token權(quán)限校驗詳解,要實現(xiàn)權(quán)限校驗,首先數(shù)據(jù)表和實體類上需要有權(quán)限字段,我的表中permission和gender是通過外鍵約束permission表和gender表實現(xiàn)枚舉的,因為可拓展性更好,需要的朋友可以參考下
    2024-01-01
  • 解決

    解決"XML Parser Error on line 1: 前言中不允許有內(nèi)容"錯誤

    解決用windows自帶的記事編輯xml文件后出現(xiàn) "XML Parser Error on line 1: 前言中不允許有內(nèi)容。"的錯誤
    2018-02-02
  • idea如何通過maven指定JDK版本

    idea如何通過maven指定JDK版本

    這篇文章主要介紹了idea如何通過maven指定JDK版本問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-08-08
  • Spring 中使用Quartz實現(xiàn)任務(wù)調(diào)度

    Spring 中使用Quartz實現(xiàn)任務(wù)調(diào)度

    這篇文章主要介紹了Spring 中使用Quartz實現(xiàn)任務(wù)調(diào)度,Spring中使用Quartz 有兩種方式,感興趣的小伙伴們可以參考一下。
    2017-02-02
  • mybatis輸出SQL格式化方式

    mybatis輸出SQL格式化方式

    這篇文章主要介紹了mybatis輸出SQL格式化方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-11-11

最新評論