RocketMQ整合SpringBoot實(shí)現(xiàn)生產(chǎn)級二次封裝
前言說明
前置掌握:SpringBoot基礎(chǔ)使用、RocketMQ和SpringBoot的整合使用
- 主要使用參考第二節(jié)
- 核心使用參考第一篇文章
文章難度:四顆星
代碼不難,重點(diǎn)是封裝的思想需要體會
不同觀點(diǎn)歡迎大家在評論區(qū)一起討論學(xué)習(xí),沒有對錯之分,每個系統(tǒng)業(yè)務(wù)特性不同,適合系統(tǒng)的才是最好的~
源碼地址:https://gitee.com/tianxincoder/practice-rocketmq-enterprise文章只會說明核心代碼,其他的基礎(chǔ)整合配置和多環(huán)境自動隔離參考源碼即可
一、為什么要二次封裝
為了不產(chǎn)生歧義,文章中提到的二次封裝均是基于原始使用方式的封裝,而非源碼級別的二次封裝
換句話說:如果都需要對源碼進(jìn)行封裝了,那么說明公司業(yè)務(wù)規(guī)模都到一定程度了,二次封裝這種東西已經(jīng)不需要討論了,封裝已經(jīng)是一個共識
- 首先明確一點(diǎn):不進(jìn)行二次封裝完全不影響RocketMQ的使用,可以選擇二次封裝和不選擇二次封裝
- 二次封裝可以提供更多的功能和更簡潔的使用方式
- 如果一個封裝搞得比原始使用方式更復(fù)雜,那么就失去了二次封裝的意義
- Q1:二次封裝可不可以不要?完全可以,完全不影響正常使用
- Q2:二次封裝有沒有必要?仁者見仁智者見智的問題,如果覺得沒有必要那么這篇文章可以跳過~
- ORM框架中典型的一個二次封裝框架就是MyBatisPlus(簡稱MP),后者是對MyBatis原生使用的增加,不使用MP直接使用MyBatis可不可以?完全可以,那為什么要用二次封裝后的MP?
- 場景:大部分的數(shù)據(jù)庫操作,無外乎CRUD,那么最常用的比如(根據(jù)名稱就可以知道這個方法做什么用,就沒有必要再二次說明了):updateById、batchUpdate、deleteById、saveOrUpdate、batchInsert。對于上面這5個操作,變化的只是表字段和表名,剩下的語法都是一樣
- 不封裝:直接使用MyBatis完全可以自己實(shí)現(xiàn)上面方法的功能,但是每個表都需要寫一遍自己上面的方法,假設(shè)有100張表,那么就會多出495個(下面說明)重復(fù)功能代碼,而且所有代碼都是冗余的
- 封裝后:由封裝者提供上面5個方法的公共實(shí)現(xiàn),然后所有需要使用上面功能的Service只需要繼承封裝好的類就自然的擁有了上面的5大功能,那么代碼的冗余量就從100張表*5個方法==500,去掉封裝的5個,節(jié)省了495的代碼冗余量
- 所以二次封裝是為了更方便用、更簡潔、更加適用于系統(tǒng),量身打造可以大大提升開發(fā)效率。就如上面的5個方法,完全重復(fù)性的東西為什么要浪費(fèi)開發(fā)時間來做這些冗余的事情呢?
1.1 二次封裝不同觀點(diǎn)
讓我們以一個生活中的蛋炒飯開個頭
原始框架好比提供了原材料:廚具、雞蛋,米飯等食材、菜譜
- 對于框架的使用通常有以下兩種方式
- 第一種:根據(jù)菜譜來進(jìn)行做飯(使用原生的方法調(diào)用),洗菜、做飯、刷碗、打掃不管啥入?yún)⒆约汗?/li>
- 第二種:找一個人來學(xué)會這道菜(負(fù)責(zé)二次封裝的人)的多種做法(封裝大部分業(yè)務(wù)場景)并做成一種點(diǎn)餐式的服務(wù),誰想吃哪種類型的蛋炒飯直接點(diǎn)餐(調(diào)用封裝好的方法)就可以吃上香噴噴的蛋炒飯
問題:哪種方案更好? 答案:兩種各有各的優(yōu)勢(在說廢話,哈哈~)
- 第一種:原生方式
- 優(yōu)點(diǎn):可以按照各種方式靈活調(diào)用,比如每個人都使用RocketMQTemplate原生send發(fā)送消息,想要發(fā)送什么類型的消息就發(fā)送什么類型的消息,比較隨意
- 缺點(diǎn):代碼大量冗余,從構(gòu)建參數(shù)對象、發(fā)送消息、消費(fèi)消息、異常處理、日志記錄、異常重試啥啥的都是自己搞,每個消費(fèi)隊(duì)列就會出現(xiàn)上面所有的步驟。比如現(xiàn)在有一個訂單處理中心A接收來自各種類型訂單,此時如B、C兩個原始訂單來源想讓A處理訂單,那么B和C都需要按照A的要求進(jìn)行調(diào)用,代碼會冗余
- 第二種:點(diǎn)餐式服務(wù)
- 優(yōu)點(diǎn):封裝了大部分統(tǒng)一的方法調(diào)用,比如 發(fā)送消息、異常處理、日志記錄等等都是重復(fù)的,封裝后點(diǎn)餐的人不需要再關(guān)系這一部分要怎么處理,只需要告訴點(diǎn)餐服務(wù)要不要進(jìn)行異常、要不異常重試等等,那么此時對于點(diǎn)餐人來說只需要 付錢(調(diào)用服務(wù))、吃飯(消費(fèi)消息),除此之外啥也不用管,全部由點(diǎn)餐服務(wù)提供者完成所有上述兩個步驟外的其它操作
- 缺點(diǎn):無法滿足所有點(diǎn)餐人的要求,有的人喜歡味道重一點(diǎn),有的喜歡味道淡一點(diǎn)。但是這個缺點(diǎn)完全可以處理,比如點(diǎn)餐服務(wù)提供了自定義廚房(返回原始發(fā)送對象),此時調(diào)用者可以按照第一種方式進(jìn)行使用
- 選哪種?
- 個人而言:業(yè)務(wù)系統(tǒng)復(fù)雜的優(yōu)先選擇第二種 ,簡單業(yè)務(wù)的選擇第一種(盡量采用封裝,后續(xù)維護(hù)方便)。對于一個復(fù)雜的系統(tǒng),本身業(yè)務(wù)級的代碼就已經(jīng)很多了,結(jié)果還要每個人處理全部一樣的東西,消費(fèi)者越多代碼冗余越多。如果一個系統(tǒng)只是為了使用MQ來進(jìn)行業(yè)務(wù)分離,消費(fèi)者也不多,那么可以選擇最快的方式,但是最終會選擇第二種,如果業(yè)務(wù)隨著時間增長越復(fù)雜,越晚改成第二種花費(fèi)的代價越大!
- 第一種就好比此時我們要直接操作內(nèi)存,原生操作就好比C++或C,可以直接操作內(nèi)存,但是同時用完后還要自己寫各種異常處理和釋放內(nèi)存;代碼封裝就好比Java,我們只需要告訴Java我們要使用內(nèi)存,然后用完就不用管
- 企業(yè)中,業(yè)務(wù)功能產(chǎn)出是一級優(yōu)先級,在此之上才能有更高級的東西。技術(shù)服務(wù)于業(yè)務(wù),而不是業(yè)務(wù)服務(wù)于技術(shù)!比如現(xiàn)在30個人的系統(tǒng),我們要使用緩存加速訪問,那么我們是選擇 內(nèi)部緩存(直接用集合或者map存起來)還是用Redis?
- 內(nèi)部緩存和Redis能不能達(dá)到目的?能
- 哪個更方便更快?內(nèi)部緩存!內(nèi)部對象就很快實(shí)現(xiàn)
- 如果業(yè)務(wù)發(fā)展遲早會轉(zhuǎn)為Redis這種專業(yè)的緩存中間件,就好比業(yè)務(wù)發(fā)展前第一種,業(yè)務(wù)發(fā)展后選擇第二種,但是對于大部分業(yè)務(wù)系統(tǒng)來說功能增加是很快的,特別是產(chǎn)品同事上一分鐘提需求下一分鐘就要上線這種(開個玩笑~),所以我們在引用一個技術(shù)需不需要進(jìn)行二次封裝時需要技術(shù)負(fù)責(zé)人對業(yè)務(wù)增長有一個預(yù)判。建議是都進(jìn)行封裝一下
1.2 封裝的抽離點(diǎn)
- 對于二次封裝,其中最主要的就是找出該框架在日常使用中所出現(xiàn)的大部分涉及到的操作,然后找出變化操作和不變化操作
- RocketMQ日常使用主要場景為例:
- 發(fā)送消息階段:準(zhǔn)備需要發(fā)送的消息、發(fā)送消息、記錄原始消息日志、發(fā)送失敗處理、可靠性處理
- 消費(fèi)消息階段:記錄接收消息日志、業(yè)務(wù)處理、業(yè)務(wù)日志記錄、異常處理、異常重試、異常通知、死信處理
- 提取變化點(diǎn)和不變化點(diǎn)(可以抽取為公共處理的場景)
- 發(fā)送消息階段:
- 變化點(diǎn):準(zhǔn)備需要發(fā)送的消息
- 不變化點(diǎn):發(fā)送消息、記錄原始消息日志、發(fā)送失敗處理、可靠性處理
- 消費(fèi)消息階段
- 變化點(diǎn):業(yè)務(wù)處理、業(yè)務(wù)日志記錄
- 不變化點(diǎn):記錄接收消息日志、異常處理、異常重試、異常通知、死信處理
- 發(fā)送消息階段:
- 從上可以看到,對于RocketMQ的使用,大部分場景都是可以抽離成一個公共的方法處理,只有業(yè)務(wù)級的需要自己處理,所以如果我們把不變化場景抽取后,每個同事只需要寫自己業(yè)務(wù)相關(guān)部分即可
- 抽取后的復(fù)雜度:對于新加一個消費(fèi)者,只需要處理業(yè)務(wù)相關(guān)三個場景(準(zhǔn)備需要發(fā)送的消息、業(yè)務(wù)處理、業(yè)務(wù)日志記錄),剩下的九個場景,只需要封裝一次就可以。需要現(xiàn)在就幾十個消費(fèi)者,可以想想一些減少了多少代碼冗余
1.3 設(shè)計(jì)模式的應(yīng)用
- 要封裝出一個好的抽象層,【設(shè)計(jì)模式】建議好好體會和學(xué)習(xí)一下
- 設(shè)計(jì)模式對于用不到的人來說比較虛幻,對于用的到的人來說,這個真牛X
二、二次封裝核心要點(diǎn)
2.1 二次封裝核心點(diǎn)
2.1.1 封裝主要討論點(diǎn)
- 對于RocketMQ或者說對于整個MQ體系來說(不管是RabbitMQ、RocketMQ、Kafka)等封裝的核心主要有兩個:發(fā)送消息、消費(fèi)消息者兩個場景
- 對于RocketMQ我們主要討論三個地方:RocketMQTemplate封裝、RocketMQListener封裝和廣播消息的封裝
- 廣播消息是分布式系統(tǒng)中同時讓所有節(jié)點(diǎn)都干一件事情的一個好的方式,如果用不到忽略廣播消息即可
2.1.2 發(fā)送/消費(fèi)的幾種消息實(shí)體
- RocketMQ發(fā)送消息對于不同的使用來說,大部分選擇下面的幾種發(fā)送消息類型
- A、發(fā)送Json對象,比如Fastjson的JSONObject
- B、直接發(fā)送轉(zhuǎn)Json后的String對象
- C、根據(jù)業(yè)務(wù)封裝對應(yīng)實(shí)體類
- D、直接使用原生MessageExt接收
- 怎么選擇?怎么選擇才是最優(yōu)?
- 上面哪一種都可以達(dá)到目的,如果要統(tǒng)一封裝就必須要有一個標(biāo)準(zhǔn)
- 怎么選擇只需要回答這個問題:在不看消息發(fā)送者的情況下,消費(fèi)者怎么知道發(fā)送者發(fā)送的消息含義?
- 比如現(xiàn)在有一個訂單消息,如果我們不看消息發(fā)送者,怎么知道發(fā)送者給消費(fèi)者發(fā)送哪些字段
- A、B、D可以嗎?一定不可以!JSON對象和String對象,如果我們不看消息發(fā)送者不可能知道到底發(fā)送了啥,這點(diǎn)我相信沒有可以討論的地方,因?yàn)轭愋蜎Q定了這個操作不可能
- C可以嗎?可以!此時不需要看消息發(fā)送者,只需要看消費(fèi)者的實(shí)體類點(diǎn)進(jìn)去,有哪些業(yè)務(wù)字段一清二楚
- 可能有杠要抬了,有看實(shí)體類的功夫,我看消息發(fā)送者都看完了
- 靈魂拷問1:如果消息發(fā)送者和消費(fèi)者不在一個系統(tǒng)怎么看?邪魅一笑,不同業(yè)務(wù)線可能沒代碼權(quán)限吧?分布式系統(tǒng)完全獨(dú)立可能吧?
- 靈魂拷問2:如果現(xiàn)在需要一個功能,如果某些必須要的字段消息發(fā)送者如果沒有給的話需要校驗(yàn),普通String和JSONObject怎么實(shí)現(xiàn)?換成實(shí)體類呢?
- 基于上述討論點(diǎn),封裝建議基于實(shí)體類來,實(shí)體類不管是排查問題、新人熟悉系統(tǒng)代碼、信息校驗(yàn)等String和JSONObject無法像實(shí)體類一樣輕松勝任
2.2 RocketMQTemplate封裝
2.2.1 封裝基礎(chǔ)實(shí)體類
- 基礎(chǔ)消息實(shí)體類封裝了除了業(yè)務(wù)消息外所有其他公共字段,主要看下面代碼中的字段和注釋
- 基礎(chǔ)抽象消息實(shí)體,包含基礎(chǔ)的消息、根據(jù)自己的業(yè)務(wù)消息設(shè)置更多的字段
- 其中也可以包含所有消費(fèi)者可能用得到的方法等,比如做些數(shù)據(jù)的加解密
package com.codecoord.rocketmq.domain; import lombok.Data; import java.time.LocalDateTime; import java.util.UUID; /** * 基礎(chǔ)消息實(shí)體,包含基礎(chǔ)的消息 * 根據(jù)自己的業(yè)務(wù)消息設(shè)置更多的字段 * * @author tianxincoord@163.com * @since 2022/6/16 */ @Data public abstract class BaseMqMessage { /** * 業(yè)務(wù)鍵,用于RocketMQ控制臺查看消費(fèi)情況 */ protected String key; /** * 發(fā)送消息來源,用于排查問題 */ protected String source = ""; /** * 發(fā)送時間 */ protected LocalDateTime sendTime = LocalDateTime.now(); /** * 跟蹤id,用于slf4j等日志記錄跟蹤id,方便查詢業(yè)務(wù)鏈 */ protected String traceId = UUID.randomUUID().toString(); /** * 重試次數(shù),用于判斷重試次數(shù),超過重試次數(shù)發(fā)送異常警告 */ protected Integer retryTimes = 0; }
- 有了此基礎(chǔ)抽象實(shí)體類,那么剩下的所有業(yè)務(wù)消息實(shí)體只需要繼承此基類,然后在自己業(yè)務(wù)類中包含自己需要的字段即可,因?yàn)檫@些公共字段不管是向上轉(zhuǎn)型還是向下轉(zhuǎn)型,子類和父類都可以看得到
2.2.2 RocketMQTemplate
- RocketMQTemplate發(fā)送消息的代碼如果不封裝,我們發(fā)送消息需要這樣
- String destination = topic + ":" + tag;
- template.syncSend(destination, message);
- 每個人發(fā)送消息都要自己處理這個冒號,直接傳入topic和tag不香嗎?按照抽離變化點(diǎn)中的變化點(diǎn),只有消息是變化的,除此之外的其他規(guī)則交給封裝類
- RocketMQTemplate主要封裝發(fā)送消息的日志、異常的處理、消息key設(shè)置、等等其他配置
- 封裝代碼類如下,下面包含了主要發(fā)送方式,更多自己添加即可
- 這里就是消息發(fā)送的點(diǎn)餐機(jī)器,同時也提供了封裝方法也提供原始RocketMQTemplate供使用
- 此處只是提供一種方式,生產(chǎn)中按照項(xiàng)目組商量決定
package com.codecoord.rocketmq.template; import com.alibaba.fastjson.JSONObject; import com.codecoord.rocketmq.constant.RocketMqSysConstant; import com.codecoord.rocketmq.domain.BaseMqMessage; import com.codecoord.rocketmq.util.JsonUtil; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * RocketMQ模板類 * * @author tianxincoord@163.com * @since 2022/4/15 */ @Component public class RocketMqTemplate { private static final Logger LOGGER = LoggerFactory.getLogger(RocketMqTemplate.class); @Resource(name = "rocketMQTemplate") private RocketMQTemplate template; /** * 獲取模板,如果封裝的方法不夠提供原生的使用方式 */ public RocketMQTemplate getTemplate() { return template; } /** * 構(gòu)建目的地 */ public String buildDestination(String topic, String tag) { return topic + RocketMqSysConstant.DELIMITER + tag; } /** * 發(fā)送同步消息 */ public <T extends BaseMqMessage> SendResult send(String topic, String tag, T message) { // 注意分隔符 return send(topic + RocketMqSysConstant.DELIMITER + tag, message); } public <T extends BaseMqMessage> SendResult send(String destination, T message) { // 設(shè)置業(yè)務(wù)鍵,此處根據(jù)公共的參數(shù)進(jìn)行處理 // 更多的其它基礎(chǔ)業(yè)務(wù)處理... Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build(); SendResult sendResult = template.syncSend(destination, sendMessage); // 此處為了方便查看給日志轉(zhuǎn)了json,根據(jù)選擇選擇日志記錄方式,例如ELK采集 LOGGER.info("[{}]同步消息[{}]發(fā)送結(jié)果[{}]", destination, JsonUtil.toJson(message), JSONObject.toJSON(sendResult)); return sendResult; } /** * 發(fā)送延遲消息 */ public <T extends BaseMqMessage> SendResult send(String topic, String tag, T message, int delayLevel) { return send(topic + RocketMqSysConstant.DELIMITER + tag, message, delayLevel); } public <T extends BaseMqMessage> SendResult send(String destination, T message, int delayLevel) { Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build(); SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel); LOGGER.info("[{}]延遲等級[{}]消息[{}]發(fā)送結(jié)果[{}]", destination, delayLevel, JsonUtil.toJson(message), JsonUtil.toJson(sendResult)); return sendResult; } }
- 這個類是最基礎(chǔ)的原始封裝類,相當(dāng)于餐館提供的點(diǎn)餐服務(wù)。上面提供無業(yè)務(wù)特性的發(fā)送,比如想要發(fā)送日志消息或者動態(tài)發(fā)送消息目的場景
3.2.3 增強(qiáng)RocketMQTemplate
- 以訂單處理中心來說,變化點(diǎn)僅僅只是單號等業(yè)務(wù)數(shù)據(jù)不一樣,發(fā)往訂單處理中心的消息不管是topic還是tag等等其實(shí)完全都一樣,那么此時可以根據(jù)業(yè)務(wù)來增加封裝
- 增強(qiáng)原始功能需要注意下面兩個點(diǎn)
- 所有父類能出現(xiàn)的地方,子類都能出現(xiàn):也就是子類擁有功能 >= 父類 ,比如Java的List,只要入?yún)⑹荓ist的地方,傳ArrayList和LinkedList完全可以
- 增強(qiáng)功能不能改變原始功能的行為:比如父類有一個方法say是說話,結(jié)果子類覆寫了say改成了行為是吃飯,然后當(dāng)調(diào)用者調(diào)用say的時候得到了一個完全預(yù)期外的結(jié)果
- 就以訂單中心消息發(fā)送為例,封裝OrderMessageTemplate繼承自RocketMqTemplate,此時前者就擁有了封裝父類的所有基礎(chǔ)方法,擁有了所有父類的功能。然后可以在前者增加自身業(yè)務(wù)特性的發(fā)送方法,比如發(fā)送訂單處理消息
package com.codecoord.rocketmq.template; import com.codecoord.rocketmq.constant.RocketMqBizConstant; import com.codecoord.rocketmq.domain.RocketMqEntityMessage; import org.apache.rocketmq.client.producer.SendResult; import org.springframework.stereotype.Component; import javax.annotation.Resource; import javax.validation.constraints.NotNull; import java.time.LocalDate; import java.time.LocalDateTime; /** * 訂單類發(fā)送消息模板工具類 * * @author tianxincode@163.com * @since 2022/6/16 */ @Component public class OrderMessageTemplate extends RocketMqTemplate { /// 如果不采用繼承也可以直接注入使用 /* @Resource private RocketMqTemplate rocketMqTemplate; */ /** * 入?yún)⒅恍枰獋魅胧悄膫€訂單號和業(yè)務(wù)體消息即可,其他操作根據(jù)需要處理 * 這樣對于調(diào)用者而言,可以更加簡化調(diào)用 */ public SendResult sendOrderPaid(@NotNull String orderId, String body) { RocketMqEntityMessage message = new RocketMqEntityMessage(); message.setKey(orderId); message.setSource("訂單支付"); message.setMessage(body); // 這兩個字段只是為了測試 message.setBirthday(LocalDate.now()); message.setTradeTime(LocalDateTime.now()); return send(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.ORDER_PAID_TAG, message); } }
- 此時對于調(diào)用者只需要 orderMessageTemplate.sendOrderPaid("O001", "xxx");就可以把消息發(fā)送到訂單處理中心
- 封裝后的好處,假如現(xiàn)在有10個訂單來源,現(xiàn)在需要調(diào)整消息發(fā)送格式,如果不進(jìn)行封裝那么10個來源發(fā)送的地方都需要改;如果進(jìn)行了二次封裝,只需要改sendOrderPaid方法即可,而且還不會出錯,此時優(yōu)勢就體現(xiàn)出來了
2.3 RocketMQListener封裝
- RocketMQListener是消費(fèi)消息的核心,同時也涉及到更多的操作,比如:基礎(chǔ)日志記錄、異常處理、消息重試、警告通知等等等
- 按照抽離變化點(diǎn),RocketMQListener只應(yīng)該處理與自身業(yè)務(wù)相關(guān)的,除此之外的其它應(yīng)該交給父類,子類只需要告訴父類要不要異常處理、要不要重試等等,點(diǎn)餐式服務(wù)
- 封裝消息消費(fèi)的抽象類
- 注意泛型限定為標(biāo)準(zhǔn)基礎(chǔ)消息類,這樣能到消費(fèi)者的一定有統(tǒng)一的標(biāo)準(zhǔn)類BaseMqMessage
- 下面簡單封裝示例
package com.codecoord.rocketmq.listener; import com.codecoord.rocketmq.constant.RocketMqSysConstant; import com.codecoord.rocketmq.domain.BaseMqMessage; import com.codecoord.rocketmq.template.RocketMqTemplate; import com.codecoord.rocketmq.util.JsonUtil; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import javax.annotation.Resource; import java.time.Instant; import java.util.Objects; /** * 抽象消息監(jiān)聽器,封裝了所有公共處理業(yè)務(wù),如 * 1、基礎(chǔ)日志記錄 * 2、異常處理 * 3、消息重試 * 4、警告通知 * 5、.... * * @author tianxincoord@163.com * @since 2022/4/17 */ public abstract class BaseMqMessageListener<T extends BaseMqMessage> { /** * 這里的日志記錄器是哪個子類的就會被哪個子類的類進(jìn)行初始化 */ protected final Logger logger = LoggerFactory.getLogger(this.getClass()); @Resource private RocketMqTemplate rocketMqTemplate; /** * 消息者名稱 * * @return 消費(fèi)者名稱 */ protected abstract String consumerName(); /** * 消息處理 * * @param message 待處理消息 * @throws Exception 消費(fèi)異常 */ protected abstract void handleMessage(T message) throws Exception; /** * 超過重試次數(shù)消息,需要啟用isRetry * * @param message 待處理消息 */ protected abstract void overMaxRetryTimesMessage(T message); /** * 是否過濾消息,例如某些 * * @param message 待處理消息 * @return true: 本次消息被過濾,false:不過濾 */ protected boolean isFilter(T message) { return false; } /** * 是否異常時重復(fù)發(fā)送 * * @return true: 消息重試,false:不重試 */ protected abstract boolean isRetry(); /** * 消費(fèi)異常時是否拋出異常 * * @return true: 拋出異常,false:消費(fèi)異常(如果沒有開啟重試則消息會被自動ack) */ protected abstract boolean isThrowException(); /** * 最大重試次數(shù) * * @return 最大重試次數(shù),默認(rèn)10次 */ protected int maxRetryTimes() { return 10; } /** * isRetry開啟時,重新入隊(duì)延遲時間 * * @return -1:立即入隊(duì)重試 */ protected int retryDelayLevel() { return -1; } /** * 由父類來完成基礎(chǔ)的日志和調(diào)配,下面的只是提供一個思路 */ public void dispatchMessage(T message) { MDC.put(RocketMqSysConstant.TRACE_ID, message.getTraceId()); // 基礎(chǔ)日志記錄被父類處理了 logger.info("[{}]消費(fèi)者收到消息[{}]", consumerName(), JsonUtil.toJson(message)); if (isFilter(message)) { logger.info("消息不滿足消費(fèi)條件,已過濾"); return; } // 超過最大重試次數(shù)時調(diào)用子類方法處理 if (message.getRetryTimes() > maxRetryTimes()) { overMaxRetryTimesMessage(message); return; } try { long start = Instant.now().toEpochMilli(); handleMessage(message); long end = Instant.now().toEpochMilli(); logger.info("消息消費(fèi)成功,耗時[{}ms]", (end - start)); } catch (Exception e) { logger.error("消息消費(fèi)異常", e); // 是捕獲異常還是拋出,由子類決定 if (isThrowException()) { throw new RuntimeException(e); } if (isRetry()) { // 獲取子類RocketMQMessageListener注解拿到topic和tag RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class); if (Objects.nonNull(annotation)) { message.setSource(message.getSource() + "消息重試"); message.setRetryTimes(message.getRetryTimes() + 1); SendResult sendResult; try { // 如果消息發(fā)送不成功,則再次重新發(fā)送,如果發(fā)送異常則拋出由MQ再次處理(異常時不走延遲消息) // 此處捕獲之后,相當(dāng)于此條消息被消息完成然后重新發(fā)送新的消息 sendResult = rocketMqTemplate.send(annotation.topic(), annotation.selectorExpression(), message, retryDelayLevel()); } catch (Exception ex) { throw new RuntimeException(ex); } // 發(fā)送失敗的處理就是不進(jìn)行ACK,由RocketMQ重試 if (sendResult.getSendStatus() != SendStatus.SEND_OK) { throw new RuntimeException("重試消息發(fā)送失敗"); } } } } } }
- 封裝消費(fèi)最終類
- 注意:收到的消息是先委派給父類,父類進(jìn)行調(diào)度管理
package com.codecoord.rocketmq.listener; import com.codecoord.rocketmq.constant.RocketMqBizConstant; import com.codecoord.rocketmq.domain.RocketMqEntityMessage; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; /** * 實(shí)體類消費(fèi)監(jiān)聽器,在實(shí)現(xiàn)RocketMQListener中間還加了一層BaseMqMessageListener來處理基礎(chǔ)業(yè)務(wù)消息 * * @author tianxincoord@163.com * @since 2022/5/12 */ @Slf4j @Component @RocketMQMessageListener( topic = RocketMqBizConstant.SOURCE_TOPIC, consumerGroup = RocketMqBizConstant.SOURCE_GROUP, selectorExpression = RocketMqBizConstant.SOURCE_TAG, // 指定消費(fèi)者線程數(shù),默認(rèn)64,生產(chǎn)中請注意配置,避免過大或者過小 consumeThreadMax = 5 ) public class RocketEntityMessageListener extends BaseMqMessageListener<RocketMqEntityMessage> implements RocketMQListener<RocketMqEntityMessage> { /** * 此處只是說明封裝的思想,更多還是要根據(jù)業(yè)務(wù)操作決定 * 內(nèi)功心法有了,無論什么招式都可以發(fā)揮最大威力 */ @Override protected String consumerName() { return "RocketMQ二次封裝消息消費(fèi)者"; } @Override public void onMessage(RocketMqEntityMessage message) { // 注意,此時這里沒有直接處理業(yè)務(wù),而是先委派給父類做基礎(chǔ)操作,然后父類做完基礎(chǔ)操作后會調(diào)用子類的實(shí)際處理類型 super.dispatchMessage(message); } @Override protected void handleMessage(RocketMqEntityMessage message) throws Exception { // 此時這里才是最終的業(yè)務(wù)處理,代碼只需要處理資源類關(guān)閉異常,其他的可以交給父類重試 System.out.println("業(yè)務(wù)消息處理"); } @Override protected void overMaxRetryTimesMessage(RocketMqEntityMessage message) { // 當(dāng)超過指定重試次數(shù)消息時此處方法會被調(diào)用 // 生產(chǎn)中可以進(jìn)行回退或其他業(yè)務(wù)操作 } @Override protected boolean isRetry() { return false; } @Override protected int maxRetryTimes() { // 指定需要的重試次數(shù),超過重試次數(shù)overMaxRetryTimesMessage會被調(diào)用 return 5; } @Override protected boolean isThrowException() { // 是否拋出異常,到消費(fèi)異常時是被父類攔截處理還是直接拋出異常 return false; } }
- 封裝后對于子類來說,只需要告訴父類要不要做就擁有了最開始說的所有功能,簡化了使用,此時子類消費(fèi)者只需要專注于自己的業(yè)務(wù)核心處理就可以了
2.4 廣播消息的應(yīng)用場景
- 應(yīng)用場景:多租戶或者服務(wù)有內(nèi)部緩存需要刷新情況下如果需要刷新租戶信息或者緩存信息
- 也就是需要所有服務(wù)節(jié)點(diǎn)都需要同事做某一件事情的時候,此時可以借助廣播消息發(fā)送消息到所有節(jié)點(diǎn)刷新,無需一個節(jié)點(diǎn)一個節(jié)點(diǎn)的處理
- 特別說明:廣播消息默認(rèn)會在家目錄下創(chuàng)建消費(fèi)進(jìn)度文件,會以www.tianxincoord.com:9876@www.tianxincoord.com:9876這種地址形式生成文件路徑,但是由于帶有:符號,windows下是不允許此符號作為文件夾名稱的,所以如果rocketMQ的鏈接地址不是連接串(不帶有端口)可以取消下面的messageModel注釋,否則啟動的時候就會提示目標(biāo)卷或者路徑不存在,其實(shí)是因?yàn)檫@個問題
package com.codecoord.rocketmq.listener; import com.codecoord.rocketmq.constant.RocketMqBizConstant; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; /** * 廣播消息 * 應(yīng)用場景:多租戶或者服務(wù)有內(nèi)部緩存需要刷新情況下如果需要刷新租戶信息或者緩存信息 * 也就是需要所有服務(wù)節(jié)點(diǎn)都需要同事做某一件事情的時候 * 此時可以借助廣播消息發(fā)送消息到所有節(jié)點(diǎn)刷新,無需一個節(jié)點(diǎn)一個節(jié)點(diǎn)的處理 * * 特別說明:廣播消息默認(rèn)會在家目錄下創(chuàng)建消費(fèi)進(jìn)度文件,會以www.tianxincoord.com:9876@www.tianxincoord.com:9876 * 這種地址形式生成文件路徑,但是由于帶有:符號,windows下是不允許此符號作為文件夾名稱的 * 所以如果rocketMQ的鏈接地址不是連接串(不帶有端口)可以取消下面的messageModel注釋 * 否則啟動的時候就會提示目標(biāo)卷或者路徑不存在,其實(shí)是因?yàn)檫@個問題 * * @author tianxincoord@163.com * @since 2022/5/12 */ @Slf4j @Component @RocketMQMessageListener( topic = RocketMqBizConstant.SOURCE_TOPIC, consumerGroup = RocketMqBizConstant.SOURCE_BROADCASTING_GROUP, selectorExpression = RocketMqBizConstant.SOURCE_BROADCASTING_TAG // messageModel = MessageModel.BROADCASTING ) public class RocketBroadcastingListener implements RocketMQListener<MessageExt> { /** * MessageExt:內(nèi)置的消息實(shí)體,生產(chǎn)中根據(jù)需要自己封裝實(shí)體 */ @Override public void onMessage(MessageExt message) { log.info("收到廣播消息【{}】", new String(message.getBody())); } }
2.3 代碼封裝完結(jié)測試
封裝測試大家可以直接參考RocketMqController即可
package com.codecoord.rocketmq.controller; import com.alibaba.fastjson.JSONObject; import com.codecoord.rocketmq.constant.RocketMqBizConstant; import com.codecoord.rocketmq.domain.RocketMqEntityMessage; import com.codecoord.rocketmq.template.OrderMessageTemplate; import com.codecoord.rocketmq.template.RocketMqTemplate; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendResult; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.time.LocalDate; import java.time.LocalDateTime; import java.util.UUID; /** * 消息發(fā)送 * * @author tianxin01@huice.com * @since 2022/6/16 */ @RestController @RequestMapping("/rocketmq") @Slf4j public class RocketMqController { /** * 注意此處注入的是封裝的RocketMqTemplate */ @Resource private RocketMqTemplate rocketMqTemplate; /** * 注入對應(yīng)業(yè)務(wù)的模板類 */ @Resource private OrderMessageTemplate orderMessageTemplate; /** * 通過實(shí)體類發(fā)送消息,發(fā)送注意事項(xiàng)請參考實(shí)體類 * 說明:也可以在RocketMqTemplate按照業(yè)務(wù)封裝發(fā)送方法,這樣只需要調(diào)用方法指定基礎(chǔ)業(yè)務(wù)消息接口 */ @RequestMapping("/entity/message") public Object sendMessage() { RocketMqEntityMessage message = new RocketMqEntityMessage(); // 設(shè)置業(yè)務(wù)key message.setKey(UUID.randomUUID().toString()); // 設(shè)置消息來源,便于查詢we年 message.setSource("封裝測試"); // 業(yè)務(wù)消息內(nèi)容 message.setMessage("當(dāng)前消息發(fā)送時間為:" + LocalDateTime.now()); // Java時間字段需要單獨(dú)處理,否則會序列化失敗 message.setBirthday(LocalDate.now()); message.setTradeTime(LocalDateTime.now()); return rocketMqTemplate.send(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.SOURCE_TAG, message); } /** * 此時對于調(diào)用者而且,無需創(chuàng)建任何類 * 如果某天需要調(diào)整消息發(fā)送來源,如果不封裝,所有原來產(chǎn)生message的地方全部改 * 如果封裝了,只需要改sendOrderPaid就可以切換 */ @RequestMapping("/order/paid") public Object sendOrderPaidMessage() { return orderMessageTemplate.sendOrderPaid(UUID.randomUUID().toString(), "客戶下單了...,快快備貨"); } /** * 直接將對象進(jìn)行傳輸,也可以自己進(jìn)行json轉(zhuǎn)化后傳輸 */ @RequestMapping("/messageExt/message") public SendResult convertAndSend() { // 生產(chǎn)中不推薦使用jsonObject傳遞,不看發(fā)送者無法知道傳遞的消息包含什么信息 JSONObject jsonObject = new JSONObject(); jsonObject.put("type", "messageExt"); String destination = rocketMqTemplate.buildDestination(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.SOURCE_BROADCASTING_TAG); // 如果要走內(nèi)部方法發(fā)送則必須要按照標(biāo)準(zhǔn)來,否則就使用原生的消息發(fā)送 return rocketMqTemplate.getTemplate().syncSend(destination, jsonObject); } }
到此這篇關(guān)于RocketMQ整合SpringBoot實(shí)現(xiàn)生產(chǎn)級二次封裝的文章就介紹到這了,更多相關(guān)RocketMQ SpringBoot封裝內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot Starter依賴原理與實(shí)例詳解
SpringBoot中的starter是一種非常重要的機(jī)制,能夠拋棄以前繁雜的配置,將其統(tǒng)一集成進(jìn)starter,應(yīng)用者只需要在maven中引入starter依賴,SpringBoot就能自動掃描到要加載的信息并啟動相應(yīng)的默認(rèn)配置。starter讓我們擺脫了各種依賴庫的處理,需要配置各種信息的困擾2022-09-09mybatis水平分表實(shí)現(xiàn)動態(tài)表名的項(xiàng)目實(shí)例
本文主要介紹了mybatis水平分表實(shí)現(xiàn)動態(tài)表名的項(xiàng)目實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07JavaWeb利用struts實(shí)現(xiàn)文件下載時改變文件名稱
這篇文章主要為大家詳細(xì)介紹了JavaWeb利用struts實(shí)現(xiàn)文件下載時改變文件名稱的相關(guān)資料,需要的朋友可以參考下2016-06-06Java基于ArrayList實(shí)現(xiàn)群主發(fā)紅包功能
這篇文章主要介紹了Java基于ArrayList實(shí)現(xiàn)群主發(fā)紅包功能,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-09-09詳解SpringBoot如何實(shí)現(xiàn)緩存預(yù)熱
緩存預(yù)熱是指在 Spring Boot 項(xiàng)目啟動時,預(yù)先將數(shù)據(jù)加載到緩存系統(tǒng)(如 Redis)中的一種機(jī)制,下面我們就來看看SpringBoot是如何實(shí)現(xiàn)緩存預(yù)熱的吧2024-01-01Mockito mock Kotlin Object類方法報錯解決方法
這篇文章主要介紹了Mockito mock Kotlin Object類方法報錯解決方法,本篇文章通過簡要的案例,講解了該項(xiàng)技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下2021-09-09理解Java注解及Spring的@Autowired是如何實(shí)現(xiàn)的
今天通過本文帶領(lǐng)大家學(xué)習(xí)注解的基礎(chǔ)知識,學(xué)習(xí)Spring的@Autowired是怎么實(shí)現(xiàn)的,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧2021-07-07Redis 集成Spring的示例代碼(spring-data-redis)
本篇文章主要介紹了Redis 集成Spring的示例代碼(spring-data-redis) ,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-09-09