SpringBoot基于Disruptor實(shí)現(xiàn)高效的消息隊(duì)列?
一、前言
Disruptor是一個(gè)開源的Java框架,它被設(shè)計(jì)用于在生產(chǎn)者-消費(fèi)者問題上獲得盡量高的吞吐量和盡量低的延遲,從功能上來看Disruptor是實(shí)現(xiàn)了隊(duì)列的功能,而且是一個(gè)有界隊(duì)列。那么它的應(yīng)用場(chǎng)景自然就是“生產(chǎn)者-消費(fèi)者”模型的應(yīng)用場(chǎng)合了。Disruptor 是在內(nèi)存中以隊(duì)列的方式去實(shí)現(xiàn)的,而且是無鎖的。這也是 Disruptor 為什么高效的原因。
二、SpringBoot整合Disruptor
1.添加依賴
<!--Disruptor--> <dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.4.4</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency>
2.創(chuàng)建消息體實(shí)體
package com.example.aopdemo.disruptor; import lombok.Data; /** * @author qx * @date 2024/2/21 * @des 消息體 */ @Data public class MessageModel { private String message; }
3.創(chuàng)建事件工廠類
package com.example.aopdemo.disruptor; import com.lmax.disruptor.EventFactory; /** * @author qx * @date 2024/2/21 * @des 事件工廠類 */ public class MessageEventFactory implements EventFactory<MessageModel> { @Override public MessageModel newInstance() { return new MessageModel(); } }
4.創(chuàng)建消費(fèi)者
package com.example.aopdemo.disruptor; import com.lmax.disruptor.EventHandler; import lombok.extern.slf4j.Slf4j; /** * @author qx * @date 2024/2/21 * @des 消息消費(fèi)者 */ @Slf4j public class MessageEventHandler implements EventHandler<MessageModel> { @Override public void onEvent(MessageModel messageModel, long sequence, boolean endOfBatch) { log.info("消費(fèi)者獲取消息:{}", messageModel); } }
5.構(gòu)造BeanManager
package com.example.aopdemo.disruptor; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; /** * @author qx * @date 2024/2/21 * @des */ @Component public class BeanManager implements ApplicationContextAware { private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { BeanManager.applicationContext = applicationContext; } public static ApplicationContext getApplicationContext() { return applicationContext; } public static Object getBean(String name) { return applicationContext.getBean(name); } public static <T> T getBean(Class<T> clazz) { return applicationContext.getBean(clazz); } }
6.創(chuàng)建消息管理器
package com.example.aopdemo.disruptor; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author qx * @date 2024/2/21 * @des 事件管理器 */ @Configuration public class MessageManager { @Bean("messageModel") public RingBuffer<MessageModel> messageModelRingBuffer() { // 定義線程池 ExecutorService executorService = Executors.newFixedThreadPool(2); // 指定事件工廠 MessageEventFactory factory = new MessageEventFactory(); // 指定ringbuffer字節(jié)大小,必須為2的N次方(能將求模運(yùn)算轉(zhuǎn)為位運(yùn)算提高效率),否則將影響效率 int bufferSize = 1024 * 256; //單線程模式,獲取額外的性能 Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executorService, ProducerType.SINGLE, new BlockingWaitStrategy()); //設(shè)置事件業(yè)務(wù)處理器---消費(fèi)者 disruptor.handleEventsWith(new MessageEventHandler()); //啟動(dòng)disruptor線程 disruptor.start(); //獲取ringbuffer環(huán),用于接取生產(chǎn)者生產(chǎn)的事件 return disruptor.getRingBuffer(); } }
7.創(chuàng)建生產(chǎn)者
package com.example.aopdemo.disruptor; import com.lmax.disruptor.RingBuffer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * @author qx * @date 2024/2/21 * @des 生產(chǎn)者 */ @Service @Slf4j public class DisruptorService { @Autowired private RingBuffer<MessageModel> messageModelRingBuffer; public void sayMessage(String message) { // 獲取下一個(gè)Event槽的下標(biāo) long sequence = messageModelRingBuffer.next(); try { // 填充數(shù)據(jù) MessageModel messageModel = messageModelRingBuffer.get(sequence); messageModel.setMessage(message); log.info("往消息隊(duì)列中添加消息:{}", messageModel); } catch (Exception e) { log.error("failed to add event to messageModelRingBuffer for : e = {},{}", e, e.getMessage()); } finally { //發(fā)布Event,激活觀察者去消費(fèi),將sequence傳遞給改消費(fèi)者 //注意最后的publish方法必須放在finally中以確保必須得到調(diào)用;如果某個(gè)請(qǐng)求的sequence未被提交將會(huì)堵塞后續(xù)的發(fā)布操作或者其他的producer messageModelRingBuffer.publish(sequence); } } }
8.創(chuàng)建測(cè)試類
package com.example.aopdemo.controller; import com.example.aopdemo.disruptor.DisruptorService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; /** * @author qx * @date 2024/2/21 * @des Disruptor測(cè)試 */ @RestController public class DisruptorController { @Autowired private DisruptorService disruptorService; @GetMapping("/disruptor") public String disruptorTest(String message) { disruptorService.sayMessage(message); return "發(fā)送消息成功"; } }
9.測(cè)試
啟動(dòng)程序,在瀏覽器訪問請(qǐng)求連接進(jìn)行測(cè)試。
我們?cè)诳刂婆_(tái)上可以獲取到消息的發(fā)送和接收信息。
2024-02-21 15:22:16.059 INFO 6788 --- [nio-8080-exec-1] c.e.aopdemo.disruptor.DisruptorService : 往消息隊(duì)列中添加消息:MessageModel(message=hello)
2024-02-21 15:22:16.060 INFO 6788 --- [pool-1-thread-1] c.e.a.disruptor.MessageEventHandler : 消費(fèi)者獲取消息:MessageModel(message=hello)
到此這篇關(guān)于SpringBoot基于Disruptor實(shí)現(xiàn)高效的消息隊(duì)列 的文章就介紹到這了,更多相關(guān)SpringBoot Disruptor消息隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java實(shí)現(xiàn)數(shù)組去除重復(fù)數(shù)據(jù)的方法詳解
這篇文章主要介紹了Java實(shí)現(xiàn)數(shù)組去除重復(fù)數(shù)據(jù)的方法,結(jié)合實(shí)例形式詳細(xì)分析了java數(shù)組去除重復(fù)的幾種常用方法、實(shí)現(xiàn)原理與相關(guān)注意事項(xiàng),需要的朋友可以參考下2017-09-09SpringBoot+layui實(shí)現(xiàn)文件上傳功能
Spring Boot是由Pivotal團(tuán)隊(duì)提供的全新框架,其設(shè)計(jì)目的是用來簡(jiǎn)化新Spring應(yīng)用的初始搭建以及開發(fā)過程。這篇文章主要介紹了SpringBoot+layui實(shí)現(xiàn)文件上傳,需要的朋友可以參考下2018-09-09Java的中l(wèi)ombok下的@Builder注解用法詳解
這篇文章主要介紹了Java的中l(wèi)ombok下的@Builder注解用法詳解,lombok注解在java進(jìn)行編譯時(shí)進(jìn)行代碼的構(gòu)建,對(duì)于java對(duì)象的創(chuàng)建工作它可以更優(yōu)雅,不需要寫多余的重復(fù)的代碼,在出現(xiàn)lombok之后,對(duì)象的創(chuàng)建工作更提供Builder方法,需要的朋友可以參考下2023-11-11如何開發(fā)基于Netty的HTTP/HTTPS應(yīng)用程序
HTTP/HTTPS是最常見的協(xié)議套件之一,并且隨著智能手機(jī)的成功,它的應(yīng)用也日益廣泛,因?yàn)閷?duì)于任何公司來說,擁有一個(gè)可以被移動(dòng)設(shè)備訪問的網(wǎng)站幾乎是必須的。下面就來看看如何開發(fā)基于Netty的HTTP/HTTPS應(yīng)用程序2021-06-06