redis做服務間通信工具的項目示例
前言
先說一下為什么要有這個東西,用消息中間件的好處就不用說了,日常開發(fā)中還是有很多場景需要用到消息傳遞的,消息的topic如何管理,如何約束topic,重要的topic消費記錄、歷史消息等就是這個sdk需要做的。
本質(zhì)上只是一層對消息中間件的封裝。這次只是拋磚引玉只引入redis的三種消息類型,pubsub、queue以及stream。
擴展其他中間件按著代碼思路一樣。望各路大佬賜教
架構(gòu)設計
一個消息服務sdk首先需要具備兩個能力,即生產(chǎn)和消費,這兩個功能離不開校驗topic合法性,我們姑且簡單點陪在mysql數(shù)據(jù)庫中,但不可能每次校驗topic是否合法都要去查詢數(shù)據(jù)庫,這里借鑒kafka存放topic信息的思想,找一個redis的key存放所有的topic列表。
定義一個核心service接口。
public?interface?MessageHubService?{ ????/** ?????*?生產(chǎn)消息 ?????*/ ????void?producer(MessageForm?messageForm); ????/** ?????*?消費消息 ?????*/ ????void?consumer(ConsumerAdapterForm?adapterForm); ????/** ?????*?檢查topic、type合法性 ?????*/ ????void?checkTopic(String?topic,?String?type); }
方法入?yún)⒔y(tǒng)一使用MessageForm類,里面定義一些基礎的信息,比如哪個消息topic,哪個消息類型等等。
@Data public class MessageForm { // 消息組件類型 private String type; // 消息主題 private String topic; private String message = ""; // 消費者組 private String group = "UPTOWN"; }
自從之前文章中說的文件夾改造之后特別喜歡三層結(jié)構(gòu),即service、baseServiceImpl、customizeServiceImpl。
大體就是service定義接口參數(shù)、返回類型標準化接口,baseServiceImpl實現(xiàn)service基礎接口實現(xiàn),做一些統(tǒng)一的攔截處理,比如校驗topic合法等操作,customizeServiceImpl屬于具體實現(xiàn)類extends baseServiceImpl實現(xiàn)具體邏輯。
topic白名單通過Timer維護,定義一個Timer通過lua腳本隔一段時間刷新到redis中。
基礎類baseServiceImpl實現(xiàn)
@Service public?class?MessageHubServiceImpl?implements?MessageHubService,?ApplicationContextAware?{ ????@Resource ????protected?StringRedisTemplate?stringRedisTemplate; ????public?Map<String,?MessageHubService>?messageHubServiceMap?=?new?ConcurrentHashMap<>(); ????private?ApplicationContext?applicationContext; ????@PostConstruct ????public?void?init()?{ ????????messageHubServiceMap.put(TopicTypeConstants.REDIS_PUBSUB_TYPE,?applicationContext.getBean(RedisPubSubProcessor.class)); ????????messageHubServiceMap.put(TopicTypeConstants.REDIS_STREAM_TYPE,?applicationContext.getBean(RedisQueueProcessor.class)); ????????messageHubServiceMap.put(TopicTypeConstants.REDIS_QUEUE_TYPE,?applicationContext.getBean(RedisStreamProcessor.class)); ????} ????public?void?checkTopic(String?topic,?String?type)?{ ????????if?(!messageHubServiceMap.containsKey(type))?{ ????????????throw?new?MatrixException("消息類型不支持"); ????????} ????????List<String>?whiteTopicList?=?stringRedisTemplate.opsForList().range(TopicTypeConstants.WHITE_TOPIC,?0,?-1); ????????if?((!ObjectUtils.isEmpty(whiteTopicList)?&&?!whiteTopicList.contains(topic))?||?ObjectUtils.isEmpty(whiteTopicList))?{ ????????????throw?new?MatrixException("當前topic未配置"); ????????} ????} ????@Override ????public?void?producer(MessageForm?messageForm)?{ ????????this.checkTopic(messageForm.getTopic(),?messageForm.getType()); ????????this.messageHubServiceMap.get(messageForm.getType()).producer(messageForm); ????} ????/** ?????*?消費者創(chuàng)建通過注解,已校驗topic合法性 ?????*/ ????@Override ????public?void?consumer(ConsumerAdapterForm?messageForm)?{ ????????this.messageHubServiceMap.get(messageForm.getType()).consumer(messageForm); ????} ????@Override ????public?void?setApplicationContext(ApplicationContext?applicationContext)?throws?BeansException?{ ????????this.applicationContext?=?applicationContext; ????} }
具體自定義實現(xiàn)類
@Scope(proxyMode?=?ScopedProxyMode.TARGET_CLASS) @Service("redisPubSubProcessor") public?class?RedisPubSubProcessor?extends?MessageHubServiceImpl?{ ????@Override ????public?void?producer(MessageForm?messageForm)?{ ????????//?具體生產(chǎn)邏輯 ????} ????@Override ????public?void?consumer(ConsumerAdapterForm?messageForm)?{ ????????//?具體消費邏輯 ????} }
代碼非常清晰了,整體滿足service、baseServiceImpl、customizeServiceImpl三層結(jié)構(gòu)。
生產(chǎn)者邏輯
生產(chǎn)者API做的比較簡單,只是提供一個API調(diào)用,在調(diào)用前做一些校驗工作,僅僅的是一條命令,不做發(fā)送失敗的重試等操作。
消費者邏輯
消費者的話還是定義一個注解,還是通過借助SpringBoot生命周期掃描注解的方式在后臺建立常駐線程的方式。
@Slf4j @Component public?class?ConsumerConfig?implements?DisposableBean,?SmartInstantiationAwareBeanPostProcessor?{ ????@Resource(name?=?"messageHubServiceImpl") ????MessageHubService?messageHubService; ????@Bean(name?=?"redisPubSubConsumerMap") ????public?Map<String,?MessageListenerAdapter>?redisPubSubConsumerMap()?{ ????????return?new?ConcurrentHashMap<>(); ????} ????@Override ????public?void?destroy()?throws?Exception?{ ????} ????@Override ????public?Object?getEarlyBeanReference(Object?bean,?String?beanName)?throws?BeansException?{ ????????Method[]?methods?=?ReflectionUtils.getAllDeclaredMethods(bean.getClass()); ????????for?(Method?method?:?methods)?{ ????????????MessageHub?annotation?=?AnnotationUtils.findAnnotation(method,?MessageHub.class); ????????????if?(annotation?==?null)?{ ????????????????continue; ????????????} ????????????String?resolveTopic?=?annotation.topic(); ????????????try?{ ????????????????messageHubService.checkTopic(resolveTopic,?annotation.type()); ????????????}?catch?(Exception?e)?{ ????????????????throw?new?Error(e.getMessage()); ????????????} ????????????ConsumerAdapterForm?adapterForm?=?new?ConsumerAdapterForm(); ????????????adapterForm.setBean(bean); ????????????adapterForm.setInvokeMethod(method); ????????????adapterForm.setTopic(resolveTopic); ????????????adapterForm.setType(annotation.type()); ????????????adapterForm.setGroup(annotation.group()); ????????????messageHubService.consumer(adapterForm); ????????} ????????return?bean; ????} }
這里依靠spring生命周期,拿到所有的bean,根據(jù)注解標注的方法去走不同的邏輯生成常駐線程,監(jiān)聽到消息之后回調(diào)到標注了注解的方法里。
具體的消費邏輯就不贅述了,感興趣的可以看下源碼:gitee.com/atuptown/up…
Topic守護線程
@Slf4j @Service public?class?TopicReloadTask?extends?TimerTask?{ ????@Resource ????StringRedisTemplate?stringRedisTemplate; ????@Resource ????EntityManager?entityManager; ????public?final?String?TOPIC_SQL?=?"?select?*?from?MESSAGEHUB_TOPIC?"; ????public?final?String?LUA_SCRIPT?= ????????????????"redis.call('del',?'MESSAGEHUB_TOPIC')"?+ ????????????????"local?topics?=?KEYS?"?+ ????????????????"for?i,?v?in?pairs(topics)?do?"?+ ????????????????"??redis.call('lpush',?'MESSAGEHUB_TOPIC',?v)?"?+ ????????????????"end"; ????@Override ????public?void?run()?{ ????????try?{ ????????????List<String>?topics?=?this.getQueryResult(TOPIC_SQL,?MessageHubTopicBean.class).stream().map(MessageHubTopicBean::getTopic).collect(Collectors.toList()); ????????????if?(!ObjectUtils.isEmpty(topics))?{ ????????????????DefaultRedisScript<Long>?redisScript?=?new?DefaultRedisScript<>(LUA_SCRIPT,?Long.class); ????????????????Long?result?=?stringRedisTemplate.execute(redisScript,?topics); ????????????????log.info("reload?topic?finish"); ????????????} ????????}?catch?(Throwable?t)?{ ????????????log.error("messagehub?topic?reload?error",?t); ????????} ????} ????private?<T>?List<T>?getQueryResult(String?sql,?Class<T>?clazz)?{ ????????Query?dataQuery?=?entityManager.createNativeQuery(sql,?clazz); ????????List<T>?result?=?new?ArrayList<>(); ????????List<Object>?list?=?dataQuery.getResultList(); ????????for?(Object?o?:?list)?{ ????????????result.add((T)?o); ????????} ????????return?result; ????} }
定義一個timer任務,隔一段時間將mysql中的topic白名單通過lua腳本的方式刷新到指定的reids topic key中。還有一些可以優(yōu)化的地方,比如同步topic的操作只需要一個服務即可,所以可以使用@ConditionalOnProperty注解判斷是否需要進行同步topic。
git地址:https://gitee.com/atuptown/uptown-messagehub
到此這篇關(guān)于redis做服務間通信工具的項目示例的文章就介紹到這了,更多相關(guān)redis 服務間通信 內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot整合ShardingSphere5.x實現(xiàn)數(shù)據(jù)加解密功能(最新推薦)
這篇文章主要介紹了SpringBoot整合ShardingSphere5.x實現(xiàn)數(shù)據(jù)加解密功能,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-06-06JavaWeb 入門篇:創(chuàng)建Web項目,Idea配置tomcat
這篇文章主要介紹了IDEA創(chuàng)建web項目配置Tomcat的詳細教程,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-07-07如何在Intellij中安裝LeetCode刷題插件方便Java刷題
這篇文章主要介紹了如何在Intellij中安裝LeetCode刷題插件方便Java刷題,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-08-08使用Spring從YAML文件讀取內(nèi)容映射為Map方式
這篇文章主要介紹了使用Spring從YAML文件讀取內(nèi)容映射為Map方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-02-02使用SpringBoot讀取Windows共享文件的代碼示例
在現(xiàn)代企業(yè)環(huán)境中,文件共享是一個常見的需求,Windows共享文件夾(SMB/CIFS協(xié)議)因其易用性和廣泛的兼容性,成為了許多企業(yè)的首選,在Java應用中,尤其是使用Spring Boot框架時,如何讀取Windows共享文件是一個值得探討的話題,本文介紹了使用SpringBoot讀取Windows共享文件2024-11-11