欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot基于Disruptor實(shí)現(xiàn)高效的消息隊(duì)列?

 更新時(shí)間:2024年02月22日 09:04:48   作者:wx59bcc77095d22  
Disruptor是一個(gè)開源的Java框架,它被設(shè)計(jì)用于在生產(chǎn)者-消費(fèi)者問題上獲得盡量高的吞吐量和盡量低的延遲,本文主要介紹了SpringBoot基于Disruptor實(shí)現(xiàn)高效的消息隊(duì)列?,具有一定的參考價(jià)值,感興趣的可以了解一下

一、前言

Disruptor是一個(gè)開源的Java框架,它被設(shè)計(jì)用于在生產(chǎn)者-消費(fèi)者問題上獲得盡量高的吞吐量和盡量低的延遲,從功能上來看Disruptor是實(shí)現(xiàn)了隊(duì)列的功能,而且是一個(gè)有界隊(duì)列。那么它的應(yīng)用場(chǎng)景自然就是“生產(chǎn)者-消費(fèi)者”模型的應(yīng)用場(chǎng)合了。Disruptor 是在內(nèi)存中以隊(duì)列的方式去實(shí)現(xiàn)的,而且是無鎖的。這也是 Disruptor 為什么高效的原因。

二、SpringBoot整合Disruptor

1.添加依賴

<!--Disruptor-->
<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.4</version>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>

2.創(chuàng)建消息體實(shí)體

package com.example.aopdemo.disruptor;

import lombok.Data;

/**
 * @author qx
 * @date 2024/2/21
 * @des 消息體
 */
@Data
public class MessageModel {

    private String message;

}

3.創(chuàng)建事件工廠類

package com.example.aopdemo.disruptor;

import com.lmax.disruptor.EventFactory;

/**
 * @author qx
 * @date 2024/2/21
 * @des 事件工廠類
 */
public class MessageEventFactory implements EventFactory<MessageModel> {
    @Override
    public MessageModel newInstance() {
        return new MessageModel();
    }
}

4.創(chuàng)建消費(fèi)者

package com.example.aopdemo.disruptor;

import com.lmax.disruptor.EventHandler;
import lombok.extern.slf4j.Slf4j;

/**
 * @author qx
 * @date 2024/2/21
 * @des 消息消費(fèi)者
 */
@Slf4j
public class MessageEventHandler implements EventHandler<MessageModel> {
    @Override
    public void onEvent(MessageModel messageModel, long sequence, boolean endOfBatch) {
        log.info("消費(fèi)者獲取消息:{}", messageModel);
    }
}

5.構(gòu)造BeanManager

package com.example.aopdemo.disruptor;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

/**
 * @author qx
 * @date 2024/2/21
 * @des
 */
@Component
public class BeanManager implements ApplicationContextAware {

    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        BeanManager.applicationContext = applicationContext;
    }

    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    public static Object getBean(String name) {
        return applicationContext.getBean(name);
    }

    public static <T> T getBean(Class<T> clazz) {
        return applicationContext.getBean(clazz);
    }
}

6.創(chuàng)建消息管理器

package com.example.aopdemo.disruptor;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author qx
 * @date 2024/2/21
 * @des 事件管理器
 */
@Configuration
public class MessageManager {

    @Bean("messageModel")
    public RingBuffer<MessageModel> messageModelRingBuffer() {
        // 定義線程池
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        // 指定事件工廠
        MessageEventFactory factory = new MessageEventFactory();

        // 指定ringbuffer字節(jié)大小,必須為2的N次方(能將求模運(yùn)算轉(zhuǎn)為位運(yùn)算提高效率),否則將影響效率
        int bufferSize = 1024 * 256;

        //單線程模式,獲取額外的性能
        Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executorService, ProducerType.SINGLE, new BlockingWaitStrategy());

        //設(shè)置事件業(yè)務(wù)處理器---消費(fèi)者
        disruptor.handleEventsWith(new MessageEventHandler());

        //啟動(dòng)disruptor線程
        disruptor.start();

        //獲取ringbuffer環(huán),用于接取生產(chǎn)者生產(chǎn)的事件
        return disruptor.getRingBuffer();
    }

}

7.創(chuàng)建生產(chǎn)者

package com.example.aopdemo.disruptor;

import com.lmax.disruptor.RingBuffer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @author qx
 * @date 2024/2/21
 * @des 生產(chǎn)者
 */
@Service
@Slf4j
public class DisruptorService {

    @Autowired
    private RingBuffer<MessageModel> messageModelRingBuffer;

    public void sayMessage(String message) {
        // 獲取下一個(gè)Event槽的下標(biāo)
        long sequence = messageModelRingBuffer.next();
        try {
            // 填充數(shù)據(jù)
            MessageModel messageModel = messageModelRingBuffer.get(sequence);
            messageModel.setMessage(message);
            log.info("往消息隊(duì)列中添加消息:{}", messageModel);
        } catch (Exception e) {
            log.error("failed to add event to messageModelRingBuffer for : e = {},{}", e, e.getMessage());
        } finally {
            //發(fā)布Event,激活觀察者去消費(fèi),將sequence傳遞給改消費(fèi)者
            //注意最后的publish方法必須放在finally中以確保必須得到調(diào)用;如果某個(gè)請(qǐng)求的sequence未被提交將會(huì)堵塞后續(xù)的發(fā)布操作或者其他的producer
            messageModelRingBuffer.publish(sequence);
        }

    }

}

8.創(chuàng)建測(cè)試類

package com.example.aopdemo.controller;

import com.example.aopdemo.disruptor.DisruptorService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author qx
 * @date 2024/2/21
 * @des Disruptor測(cè)試
 */
@RestController
public class DisruptorController {

    @Autowired
    private DisruptorService disruptorService;

    @GetMapping("/disruptor")
    public String disruptorTest(String message) {
        disruptorService.sayMessage(message);
        return "發(fā)送消息成功";
    }
}

9.測(cè)試

啟動(dòng)程序,在瀏覽器訪問請(qǐng)求連接進(jìn)行測(cè)試。

我們?cè)诳刂婆_(tái)上可以獲取到消息的發(fā)送和接收信息。

2024-02-21 15:22:16.059  INFO 6788 --- [nio-8080-exec-1] c.e.aopdemo.disruptor.DisruptorService   : 往消息隊(duì)列中添加消息:MessageModel(message=hello)
2024-02-21 15:22:16.060  INFO 6788 --- [pool-1-thread-1] c.e.a.disruptor.MessageEventHandler      : 消費(fèi)者獲取消息:MessageModel(message=hello)

到此這篇關(guān)于SpringBoot基于Disruptor實(shí)現(xiàn)高效的消息隊(duì)列 的文章就介紹到這了,更多相關(guān)SpringBoot Disruptor消息隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java實(shí)現(xiàn)數(shù)組去除重復(fù)數(shù)據(jù)的方法詳解

    Java實(shí)現(xiàn)數(shù)組去除重復(fù)數(shù)據(jù)的方法詳解

    這篇文章主要介紹了Java實(shí)現(xiàn)數(shù)組去除重復(fù)數(shù)據(jù)的方法,結(jié)合實(shí)例形式詳細(xì)分析了java數(shù)組去除重復(fù)的幾種常用方法、實(shí)現(xiàn)原理與相關(guān)注意事項(xiàng),需要的朋友可以參考下
    2017-09-09
  • SpringBoot+layui實(shí)現(xiàn)文件上傳功能

    SpringBoot+layui實(shí)現(xiàn)文件上傳功能

    Spring Boot是由Pivotal團(tuán)隊(duì)提供的全新框架,其設(shè)計(jì)目的是用來簡(jiǎn)化新Spring應(yīng)用的初始搭建以及開發(fā)過程。這篇文章主要介紹了SpringBoot+layui實(shí)現(xiàn)文件上傳,需要的朋友可以參考下
    2018-09-09
  • Java基礎(chǔ)之自定義類加載器

    Java基礎(chǔ)之自定義類加載器

    應(yīng)該有很多小伙伴還不了解Java自定義類加載器吧,下文中有對(duì)Java自定義類加載器非常詳細(xì)的描述,還有小伙伴們最喜歡的代碼環(huán)節(jié),需要的朋友可以參考下
    2021-05-05
  • 查看Java所支持的語言及相應(yīng)的版本信息

    查看Java所支持的語言及相應(yīng)的版本信息

    Java語言作為第一種支持國際化的語言,在Internet從一開始就具有其他語言無與倫比的國際化的本質(zhì)特性,查看Java所支持的語言及相應(yīng)的版本信息可以采用以下代碼進(jìn)行查詢
    2014-01-01
  • 一文詳解如何使用Java來發(fā)送qq郵箱郵件

    一文詳解如何使用Java來發(fā)送qq郵箱郵件

    這篇文章主要給大家介紹了關(guān)于如何使用Java來發(fā)送qq郵箱郵件的相關(guān)資料,文中降了準(zhǔn)備工作(開啟服務(wù)并生成授權(quán)碼)、接口調(diào)用(引入依賴和編寫接口代碼)、發(fā)送HTML格式郵件等內(nèi)容,需要的朋友可以參考下
    2024-12-12
  • Java用遞歸方法解決漢諾塔問題詳解

    Java用遞歸方法解決漢諾塔問題詳解

    漢諾塔問題是一個(gè)經(jīng)典的問題。漢諾塔(Hanoi?Tower),又稱河內(nèi)塔,源于印度一個(gè)古老傳說。本文將用Java遞歸方法求解這一問題,感興趣的可以學(xué)習(xí)一下
    2022-04-04
  • Java的中l(wèi)ombok下的@Builder注解用法詳解

    Java的中l(wèi)ombok下的@Builder注解用法詳解

    這篇文章主要介紹了Java的中l(wèi)ombok下的@Builder注解用法詳解,lombok注解在java進(jìn)行編譯時(shí)進(jìn)行代碼的構(gòu)建,對(duì)于java對(duì)象的創(chuàng)建工作它可以更優(yōu)雅,不需要寫多余的重復(fù)的代碼,在出現(xiàn)lombok之后,對(duì)象的創(chuàng)建工作更提供Builder方法,需要的朋友可以參考下
    2023-11-11
  • 如何開發(fā)基于Netty的HTTP/HTTPS應(yīng)用程序

    如何開發(fā)基于Netty的HTTP/HTTPS應(yīng)用程序

    HTTP/HTTPS是最常見的協(xié)議套件之一,并且隨著智能手機(jī)的成功,它的應(yīng)用也日益廣泛,因?yàn)閷?duì)于任何公司來說,擁有一個(gè)可以被移動(dòng)設(shè)備訪問的網(wǎng)站幾乎是必須的。下面就來看看如何開發(fā)基于Netty的HTTP/HTTPS應(yīng)用程序
    2021-06-06
  • Java線程池由淺入深掌握到精通

    Java線程池由淺入深掌握到精通

    什么是線程池?很簡(jiǎn)單,簡(jiǎn)單看名字就知道是裝有線程的池子,我們可以把要執(zhí)行的多線程交給線程池來處理,和連接池的概念一樣,通過維護(hù)一定數(shù)量的線程池來達(dá)到多個(gè)線程的復(fù)用
    2021-09-09
  • Java 輸入流中的read(byte[] b)方法詳解

    Java 輸入流中的read(byte[] b)方法詳解

    這篇文章主要介紹了Java 輸入流中的read(byte[] b)方法詳解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2021-01-01

最新評(píng)論