SpringBoot基于Disruptor實現(xiàn)高效的消息隊列?
一、前言
Disruptor是一個開源的Java框架,它被設(shè)計用于在生產(chǎn)者-消費者問題上獲得盡量高的吞吐量和盡量低的延遲,從功能上來看Disruptor是實現(xiàn)了隊列的功能,而且是一個有界隊列。那么它的應(yīng)用場景自然就是“生產(chǎn)者-消費者”模型的應(yīng)用場合了。Disruptor 是在內(nèi)存中以隊列的方式去實現(xiàn)的,而且是無鎖的。這也是 Disruptor 為什么高效的原因。
二、SpringBoot整合Disruptor
1.添加依賴
<!--Disruptor-->
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.4</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
2.創(chuàng)建消息體實體
package com.example.aopdemo.disruptor;
import lombok.Data;
/**
* @author qx
* @date 2024/2/21
* @des 消息體
*/
@Data
public class MessageModel {
private String message;
}3.創(chuàng)建事件工廠類
package com.example.aopdemo.disruptor;
import com.lmax.disruptor.EventFactory;
/**
* @author qx
* @date 2024/2/21
* @des 事件工廠類
*/
public class MessageEventFactory implements EventFactory<MessageModel> {
@Override
public MessageModel newInstance() {
return new MessageModel();
}
}4.創(chuàng)建消費者
package com.example.aopdemo.disruptor;
import com.lmax.disruptor.EventHandler;
import lombok.extern.slf4j.Slf4j;
/**
* @author qx
* @date 2024/2/21
* @des 消息消費者
*/
@Slf4j
public class MessageEventHandler implements EventHandler<MessageModel> {
@Override
public void onEvent(MessageModel messageModel, long sequence, boolean endOfBatch) {
log.info("消費者獲取消息:{}", messageModel);
}
}5.構(gòu)造BeanManager
package com.example.aopdemo.disruptor;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
/**
* @author qx
* @date 2024/2/21
* @des
*/
@Component
public class BeanManager implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
BeanManager.applicationContext = applicationContext;
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
public static Object getBean(String name) {
return applicationContext.getBean(name);
}
public static <T> T getBean(Class<T> clazz) {
return applicationContext.getBean(clazz);
}
}6.創(chuàng)建消息管理器
package com.example.aopdemo.disruptor;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author qx
* @date 2024/2/21
* @des 事件管理器
*/
@Configuration
public class MessageManager {
@Bean("messageModel")
public RingBuffer<MessageModel> messageModelRingBuffer() {
// 定義線程池
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 指定事件工廠
MessageEventFactory factory = new MessageEventFactory();
// 指定ringbuffer字節(jié)大小,必須為2的N次方(能將求模運算轉(zhuǎn)為位運算提高效率),否則將影響效率
int bufferSize = 1024 * 256;
//單線程模式,獲取額外的性能
Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executorService, ProducerType.SINGLE, new BlockingWaitStrategy());
//設(shè)置事件業(yè)務(wù)處理器---消費者
disruptor.handleEventsWith(new MessageEventHandler());
//啟動disruptor線程
disruptor.start();
//獲取ringbuffer環(huán),用于接取生產(chǎn)者生產(chǎn)的事件
return disruptor.getRingBuffer();
}
}7.創(chuàng)建生產(chǎn)者
package com.example.aopdemo.disruptor;
import com.lmax.disruptor.RingBuffer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @author qx
* @date 2024/2/21
* @des 生產(chǎn)者
*/
@Service
@Slf4j
public class DisruptorService {
@Autowired
private RingBuffer<MessageModel> messageModelRingBuffer;
public void sayMessage(String message) {
// 獲取下一個Event槽的下標(biāo)
long sequence = messageModelRingBuffer.next();
try {
// 填充數(shù)據(jù)
MessageModel messageModel = messageModelRingBuffer.get(sequence);
messageModel.setMessage(message);
log.info("往消息隊列中添加消息:{}", messageModel);
} catch (Exception e) {
log.error("failed to add event to messageModelRingBuffer for : e = {},{}", e, e.getMessage());
} finally {
//發(fā)布Event,激活觀察者去消費,將sequence傳遞給改消費者
//注意最后的publish方法必須放在finally中以確保必須得到調(diào)用;如果某個請求的sequence未被提交將會堵塞后續(xù)的發(fā)布操作或者其他的producer
messageModelRingBuffer.publish(sequence);
}
}
}8.創(chuàng)建測試類
package com.example.aopdemo.controller;
import com.example.aopdemo.disruptor.DisruptorService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author qx
* @date 2024/2/21
* @des Disruptor測試
*/
@RestController
public class DisruptorController {
@Autowired
private DisruptorService disruptorService;
@GetMapping("/disruptor")
public String disruptorTest(String message) {
disruptorService.sayMessage(message);
return "發(fā)送消息成功";
}
}9.測試
啟動程序,在瀏覽器訪問請求連接進(jìn)行測試。

我們在控制臺上可以獲取到消息的發(fā)送和接收信息。
2024-02-21 15:22:16.059 INFO 6788 --- [nio-8080-exec-1] c.e.aopdemo.disruptor.DisruptorService : 往消息隊列中添加消息:MessageModel(message=hello)
2024-02-21 15:22:16.060 INFO 6788 --- [pool-1-thread-1] c.e.a.disruptor.MessageEventHandler : 消費者獲取消息:MessageModel(message=hello)
到此這篇關(guān)于SpringBoot基于Disruptor實現(xiàn)高效的消息隊列 的文章就介紹到這了,更多相關(guān)SpringBoot Disruptor消息隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java實現(xiàn)數(shù)組去除重復(fù)數(shù)據(jù)的方法詳解
這篇文章主要介紹了Java實現(xiàn)數(shù)組去除重復(fù)數(shù)據(jù)的方法,結(jié)合實例形式詳細(xì)分析了java數(shù)組去除重復(fù)的幾種常用方法、實現(xiàn)原理與相關(guān)注意事項,需要的朋友可以參考下2017-09-09
SpringBoot+layui實現(xiàn)文件上傳功能
Spring Boot是由Pivotal團(tuán)隊提供的全新框架,其設(shè)計目的是用來簡化新Spring應(yīng)用的初始搭建以及開發(fā)過程。這篇文章主要介紹了SpringBoot+layui實現(xiàn)文件上傳,需要的朋友可以參考下2018-09-09
Java的中l(wèi)ombok下的@Builder注解用法詳解
這篇文章主要介紹了Java的中l(wèi)ombok下的@Builder注解用法詳解,lombok注解在java進(jìn)行編譯時進(jìn)行代碼的構(gòu)建,對于java對象的創(chuàng)建工作它可以更優(yōu)雅,不需要寫多余的重復(fù)的代碼,在出現(xiàn)lombok之后,對象的創(chuàng)建工作更提供Builder方法,需要的朋友可以參考下2023-11-11
如何開發(fā)基于Netty的HTTP/HTTPS應(yīng)用程序
HTTP/HTTPS是最常見的協(xié)議套件之一,并且隨著智能手機(jī)的成功,它的應(yīng)用也日益廣泛,因為對于任何公司來說,擁有一個可以被移動設(shè)備訪問的網(wǎng)站幾乎是必須的。下面就來看看如何開發(fā)基于Netty的HTTP/HTTPS應(yīng)用程序2021-06-06

