RocketMQ整合SpringBoot實(shí)現(xiàn)生產(chǎn)級(jí)二次封裝
前言說(shuō)明
前置掌握:SpringBoot基礎(chǔ)使用、RocketMQ和SpringBoot的整合使用
- 主要使用參考第二節(jié)
- 核心使用參考第一篇文章
文章難度:四顆星
代碼不難,重點(diǎn)是封裝的思想需要體會(huì)
不同觀點(diǎn)歡迎大家在評(píng)論區(qū)一起討論學(xué)習(xí),沒(méi)有對(duì)錯(cuò)之分,每個(gè)系統(tǒng)業(yè)務(wù)特性不同,適合系統(tǒng)的才是最好的~
源碼地址:https://gitee.com/tianxincoder/practice-rocketmq-enterprise文章只會(huì)說(shuō)明核心代碼,其他的基礎(chǔ)整合配置和多環(huán)境自動(dòng)隔離參考源碼即可

一、為什么要二次封裝
為了不產(chǎn)生歧義,文章中提到的二次封裝均是基于原始使用方式的封裝,而非源碼級(jí)別的二次封裝
換句話說(shuō):如果都需要對(duì)源碼進(jìn)行封裝了,那么說(shuō)明公司業(yè)務(wù)規(guī)模都到一定程度了,二次封裝這種東西已經(jīng)不需要討論了,封裝已經(jīng)是一個(gè)共識(shí)
- 首先明確一點(diǎn):不進(jìn)行二次封裝完全不影響RocketMQ的使用,可以選擇二次封裝和不選擇二次封裝
- 二次封裝可以提供更多的功能和更簡(jiǎn)潔的使用方式
- 如果一個(gè)封裝搞得比原始使用方式更復(fù)雜,那么就失去了二次封裝的意義
- Q1:二次封裝可不可以不要?完全可以,完全不影響正常使用
- Q2:二次封裝有沒(méi)有必要?仁者見(jiàn)仁智者見(jiàn)智的問(wèn)題,如果覺(jué)得沒(méi)有必要那么這篇文章可以跳過(guò)~
- ORM框架中典型的一個(gè)二次封裝框架就是MyBatisPlus(簡(jiǎn)稱MP),后者是對(duì)MyBatis原生使用的增加,不使用MP直接使用MyBatis可不可以?完全可以,那為什么要用二次封裝后的MP?
- 場(chǎng)景:大部分的數(shù)據(jù)庫(kù)操作,無(wú)外乎CRUD,那么最常用的比如(根據(jù)名稱就可以知道這個(gè)方法做什么用,就沒(méi)有必要再二次說(shuō)明了):updateById、batchUpdate、deleteById、saveOrUpdate、batchInsert。對(duì)于上面這5個(gè)操作,變化的只是表字段和表名,剩下的語(yǔ)法都是一樣
- 不封裝:直接使用MyBatis完全可以自己實(shí)現(xiàn)上面方法的功能,但是每個(gè)表都需要寫(xiě)一遍自己上面的方法,假設(shè)有100張表,那么就會(huì)多出495個(gè)(下面說(shuō)明)重復(fù)功能代碼,而且所有代碼都是冗余的
- 封裝后:由封裝者提供上面5個(gè)方法的公共實(shí)現(xiàn),然后所有需要使用上面功能的Service只需要繼承封裝好的類就自然的擁有了上面的5大功能,那么代碼的冗余量就從100張表*5個(gè)方法==500,去掉封裝的5個(gè),節(jié)省了495的代碼冗余量
- 所以二次封裝是為了更方便用、更簡(jiǎn)潔、更加適用于系統(tǒng),量身打造可以大大提升開(kāi)發(fā)效率。就如上面的5個(gè)方法,完全重復(fù)性的東西為什么要浪費(fèi)開(kāi)發(fā)時(shí)間來(lái)做這些冗余的事情呢?
1.1 二次封裝不同觀點(diǎn)
讓我們以一個(gè)生活中的蛋炒飯開(kāi)個(gè)頭
原始框架好比提供了原材料:廚具、雞蛋,米飯等食材、菜譜
- 對(duì)于框架的使用通常有以下兩種方式
- 第一種:根據(jù)菜譜來(lái)進(jìn)行做飯(使用原生的方法調(diào)用),洗菜、做飯、刷碗、打掃不管啥入?yún)⒆约汗?/li>
- 第二種:找一個(gè)人來(lái)學(xué)會(huì)這道菜(負(fù)責(zé)二次封裝的人)的多種做法(封裝大部分業(yè)務(wù)場(chǎng)景)并做成一種點(diǎn)餐式的服務(wù),誰(shuí)想吃哪種類型的蛋炒飯直接點(diǎn)餐(調(diào)用封裝好的方法)就可以吃上香噴噴的蛋炒飯
問(wèn)題:哪種方案更好? 答案:兩種各有各的優(yōu)勢(shì)(在說(shuō)廢話,哈哈~)
- 第一種:原生方式
- 優(yōu)點(diǎn):可以按照各種方式靈活調(diào)用,比如每個(gè)人都使用RocketMQTemplate原生send發(fā)送消息,想要發(fā)送什么類型的消息就發(fā)送什么類型的消息,比較隨意
- 缺點(diǎn):代碼大量冗余,從構(gòu)建參數(shù)對(duì)象、發(fā)送消息、消費(fèi)消息、異常處理、日志記錄、異常重試啥啥的都是自己搞,每個(gè)消費(fèi)隊(duì)列就會(huì)出現(xiàn)上面所有的步驟。比如現(xiàn)在有一個(gè)訂單處理中心A接收來(lái)自各種類型訂單,此時(shí)如B、C兩個(gè)原始訂單來(lái)源想讓A處理訂單,那么B和C都需要按照A的要求進(jìn)行調(diào)用,代碼會(huì)冗余
- 第二種:點(diǎn)餐式服務(wù)
- 優(yōu)點(diǎn):封裝了大部分統(tǒng)一的方法調(diào)用,比如 發(fā)送消息、異常處理、日志記錄等等都是重復(fù)的,封裝后點(diǎn)餐的人不需要再關(guān)系這一部分要怎么處理,只需要告訴點(diǎn)餐服務(wù)要不要進(jìn)行異常、要不異常重試等等,那么此時(shí)對(duì)于點(diǎn)餐人來(lái)說(shuō)只需要 付錢(qián)(調(diào)用服務(wù))、吃飯(消費(fèi)消息),除此之外啥也不用管,全部由點(diǎn)餐服務(wù)提供者完成所有上述兩個(gè)步驟外的其它操作
- 缺點(diǎn):無(wú)法滿足所有點(diǎn)餐人的要求,有的人喜歡味道重一點(diǎn),有的喜歡味道淡一點(diǎn)。但是這個(gè)缺點(diǎn)完全可以處理,比如點(diǎn)餐服務(wù)提供了自定義廚房(返回原始發(fā)送對(duì)象),此時(shí)調(diào)用者可以按照第一種方式進(jìn)行使用
- 選哪種?
- 個(gè)人而言:業(yè)務(wù)系統(tǒng)復(fù)雜的優(yōu)先選擇第二種 ,簡(jiǎn)單業(yè)務(wù)的選擇第一種(盡量采用封裝,后續(xù)維護(hù)方便)。對(duì)于一個(gè)復(fù)雜的系統(tǒng),本身業(yè)務(wù)級(jí)的代碼就已經(jīng)很多了,結(jié)果還要每個(gè)人處理全部一樣的東西,消費(fèi)者越多代碼冗余越多。如果一個(gè)系統(tǒng)只是為了使用MQ來(lái)進(jìn)行業(yè)務(wù)分離,消費(fèi)者也不多,那么可以選擇最快的方式,但是最終會(huì)選擇第二種,如果業(yè)務(wù)隨著時(shí)間增長(zhǎng)越復(fù)雜,越晚改成第二種花費(fèi)的代價(jià)越大!
- 第一種就好比此時(shí)我們要直接操作內(nèi)存,原生操作就好比C++或C,可以直接操作內(nèi)存,但是同時(shí)用完后還要自己寫(xiě)各種異常處理和釋放內(nèi)存;代碼封裝就好比Java,我們只需要告訴Java我們要使用內(nèi)存,然后用完就不用管
- 企業(yè)中,業(yè)務(wù)功能產(chǎn)出是一級(jí)優(yōu)先級(jí),在此之上才能有更高級(jí)的東西。技術(shù)服務(wù)于業(yè)務(wù),而不是業(yè)務(wù)服務(wù)于技術(shù)!比如現(xiàn)在30個(gè)人的系統(tǒng),我們要使用緩存加速訪問(wèn),那么我們是選擇 內(nèi)部緩存(直接用集合或者map存起來(lái))還是用Redis?
- 內(nèi)部緩存和Redis能不能達(dá)到目的?能
- 哪個(gè)更方便更快??jī)?nèi)部緩存!內(nèi)部對(duì)象就很快實(shí)現(xiàn)
- 如果業(yè)務(wù)發(fā)展遲早會(huì)轉(zhuǎn)為Redis這種專業(yè)的緩存中間件,就好比業(yè)務(wù)發(fā)展前第一種,業(yè)務(wù)發(fā)展后選擇第二種,但是對(duì)于大部分業(yè)務(wù)系統(tǒng)來(lái)說(shuō)功能增加是很快的,特別是產(chǎn)品同事上一分鐘提需求下一分鐘就要上線這種(開(kāi)個(gè)玩笑~),所以我們?cè)谝靡粋€(gè)技術(shù)需不需要進(jìn)行二次封裝時(shí)需要技術(shù)負(fù)責(zé)人對(duì)業(yè)務(wù)增長(zhǎng)有一個(gè)預(yù)判。建議是都進(jìn)行封裝一下
1.2 封裝的抽離點(diǎn)
- 對(duì)于二次封裝,其中最主要的就是找出該框架在日常使用中所出現(xiàn)的大部分涉及到的操作,然后找出變化操作和不變化操作
- RocketMQ日常使用主要場(chǎng)景為例:
- 發(fā)送消息階段:準(zhǔn)備需要發(fā)送的消息、發(fā)送消息、記錄原始消息日志、發(fā)送失敗處理、可靠性處理
- 消費(fèi)消息階段:記錄接收消息日志、業(yè)務(wù)處理、業(yè)務(wù)日志記錄、異常處理、異常重試、異常通知、死信處理
- 提取變化點(diǎn)和不變化點(diǎn)(可以抽取為公共處理的場(chǎng)景)
- 發(fā)送消息階段:
- 變化點(diǎn):準(zhǔn)備需要發(fā)送的消息
- 不變化點(diǎn):發(fā)送消息、記錄原始消息日志、發(fā)送失敗處理、可靠性處理
- 消費(fèi)消息階段
- 變化點(diǎn):業(yè)務(wù)處理、業(yè)務(wù)日志記錄
- 不變化點(diǎn):記錄接收消息日志、異常處理、異常重試、異常通知、死信處理
- 發(fā)送消息階段:
- 從上可以看到,對(duì)于RocketMQ的使用,大部分場(chǎng)景都是可以抽離成一個(gè)公共的方法處理,只有業(yè)務(wù)級(jí)的需要自己處理,所以如果我們把不變化場(chǎng)景抽取后,每個(gè)同事只需要寫(xiě)自己業(yè)務(wù)相關(guān)部分即可
- 抽取后的復(fù)雜度:對(duì)于新加一個(gè)消費(fèi)者,只需要處理業(yè)務(wù)相關(guān)三個(gè)場(chǎng)景(準(zhǔn)備需要發(fā)送的消息、業(yè)務(wù)處理、業(yè)務(wù)日志記錄),剩下的九個(gè)場(chǎng)景,只需要封裝一次就可以。需要現(xiàn)在就幾十個(gè)消費(fèi)者,可以想想一些減少了多少代碼冗余
1.3 設(shè)計(jì)模式的應(yīng)用
- 要封裝出一個(gè)好的抽象層,【設(shè)計(jì)模式】建議好好體會(huì)和學(xué)習(xí)一下
- 設(shè)計(jì)模式對(duì)于用不到的人來(lái)說(shuō)比較虛幻,對(duì)于用的到的人來(lái)說(shuō),這個(gè)真牛X
二、二次封裝核心要點(diǎn)
2.1 二次封裝核心點(diǎn)
2.1.1 封裝主要討論點(diǎn)
- 對(duì)于RocketMQ或者說(shuō)對(duì)于整個(gè)MQ體系來(lái)說(shuō)(不管是RabbitMQ、RocketMQ、Kafka)等封裝的核心主要有兩個(gè):發(fā)送消息、消費(fèi)消息者兩個(gè)場(chǎng)景
- 對(duì)于RocketMQ我們主要討論三個(gè)地方:RocketMQTemplate封裝、RocketMQListener封裝和廣播消息的封裝
- 廣播消息是分布式系統(tǒng)中同時(shí)讓所有節(jié)點(diǎn)都干一件事情的一個(gè)好的方式,如果用不到忽略廣播消息即可
2.1.2 發(fā)送/消費(fèi)的幾種消息實(shí)體
- RocketMQ發(fā)送消息對(duì)于不同的使用來(lái)說(shuō),大部分選擇下面的幾種發(fā)送消息類型
- A、發(fā)送Json對(duì)象,比如Fastjson的JSONObject
- B、直接發(fā)送轉(zhuǎn)Json后的String對(duì)象
- C、根據(jù)業(yè)務(wù)封裝對(duì)應(yīng)實(shí)體類
- D、直接使用原生MessageExt接收
- 怎么選擇?怎么選擇才是最優(yōu)?
- 上面哪一種都可以達(dá)到目的,如果要統(tǒng)一封裝就必須要有一個(gè)標(biāo)準(zhǔn)
- 怎么選擇只需要回答這個(gè)問(wèn)題:在不看消息發(fā)送者的情況下,消費(fèi)者怎么知道發(fā)送者發(fā)送的消息含義?
- 比如現(xiàn)在有一個(gè)訂單消息,如果我們不看消息發(fā)送者,怎么知道發(fā)送者給消費(fèi)者發(fā)送哪些字段
- A、B、D可以嗎?一定不可以!JSON對(duì)象和String對(duì)象,如果我們不看消息發(fā)送者不可能知道到底發(fā)送了啥,這點(diǎn)我相信沒(méi)有可以討論的地方,因?yàn)轭愋蜎Q定了這個(gè)操作不可能
- C可以嗎?可以!此時(shí)不需要看消息發(fā)送者,只需要看消費(fèi)者的實(shí)體類點(diǎn)進(jìn)去,有哪些業(yè)務(wù)字段一清二楚
- 可能有杠要抬了,有看實(shí)體類的功夫,我看消息發(fā)送者都看完了
- 靈魂拷問(wèn)1:如果消息發(fā)送者和消費(fèi)者不在一個(gè)系統(tǒng)怎么看?邪魅一笑,不同業(yè)務(wù)線可能沒(méi)代碼權(quán)限吧?分布式系統(tǒng)完全獨(dú)立可能吧?
- 靈魂拷問(wèn)2:如果現(xiàn)在需要一個(gè)功能,如果某些必須要的字段消息發(fā)送者如果沒(méi)有給的話需要校驗(yàn),普通String和JSONObject怎么實(shí)現(xiàn)?換成實(shí)體類呢?
- 基于上述討論點(diǎn),封裝建議基于實(shí)體類來(lái),實(shí)體類不管是排查問(wèn)題、新人熟悉系統(tǒng)代碼、信息校驗(yàn)等String和JSONObject無(wú)法像實(shí)體類一樣輕松勝任
2.2 RocketMQTemplate封裝
2.2.1 封裝基礎(chǔ)實(shí)體類
- 基礎(chǔ)消息實(shí)體類封裝了除了業(yè)務(wù)消息外所有其他公共字段,主要看下面代碼中的字段和注釋
- 基礎(chǔ)抽象消息實(shí)體,包含基礎(chǔ)的消息、根據(jù)自己的業(yè)務(wù)消息設(shè)置更多的字段
- 其中也可以包含所有消費(fèi)者可能用得到的方法等,比如做些數(shù)據(jù)的加解密
package com.codecoord.rocketmq.domain;
import lombok.Data;
import java.time.LocalDateTime;
import java.util.UUID;
/**
* 基礎(chǔ)消息實(shí)體,包含基礎(chǔ)的消息
* 根據(jù)自己的業(yè)務(wù)消息設(shè)置更多的字段
*
* @author tianxincoord@163.com
* @since 2022/6/16
*/
@Data
public abstract class BaseMqMessage {
/**
* 業(yè)務(wù)鍵,用于RocketMQ控制臺(tái)查看消費(fèi)情況
*/
protected String key;
/**
* 發(fā)送消息來(lái)源,用于排查問(wèn)題
*/
protected String source = "";
/**
* 發(fā)送時(shí)間
*/
protected LocalDateTime sendTime = LocalDateTime.now();
/**
* 跟蹤id,用于slf4j等日志記錄跟蹤id,方便查詢業(yè)務(wù)鏈
*/
protected String traceId = UUID.randomUUID().toString();
/**
* 重試次數(shù),用于判斷重試次數(shù),超過(guò)重試次數(shù)發(fā)送異常警告
*/
protected Integer retryTimes = 0;
}- 有了此基礎(chǔ)抽象實(shí)體類,那么剩下的所有業(yè)務(wù)消息實(shí)體只需要繼承此基類,然后在自己業(yè)務(wù)類中包含自己需要的字段即可,因?yàn)檫@些公共字段不管是向上轉(zhuǎn)型還是向下轉(zhuǎn)型,子類和父類都可以看得到
2.2.2 RocketMQTemplate
- RocketMQTemplate發(fā)送消息的代碼如果不封裝,我們發(fā)送消息需要這樣
- String destination = topic + ":" + tag;
- template.syncSend(destination, message);
- 每個(gè)人發(fā)送消息都要自己處理這個(gè)冒號(hào),直接傳入topic和tag不香嗎?按照抽離變化點(diǎn)中的變化點(diǎn),只有消息是變化的,除此之外的其他規(guī)則交給封裝類
- RocketMQTemplate主要封裝發(fā)送消息的日志、異常的處理、消息key設(shè)置、等等其他配置
- 封裝代碼類如下,下面包含了主要發(fā)送方式,更多自己添加即可
- 這里就是消息發(fā)送的點(diǎn)餐機(jī)器,同時(shí)也提供了封裝方法也提供原始RocketMQTemplate供使用
- 此處只是提供一種方式,生產(chǎn)中按照項(xiàng)目組商量決定
package com.codecoord.rocketmq.template;
import com.alibaba.fastjson.JSONObject;
import com.codecoord.rocketmq.constant.RocketMqSysConstant;
import com.codecoord.rocketmq.domain.BaseMqMessage;
import com.codecoord.rocketmq.util.JsonUtil;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* RocketMQ模板類
*
* @author tianxincoord@163.com
* @since 2022/4/15
*/
@Component
public class RocketMqTemplate {
private static final Logger LOGGER = LoggerFactory.getLogger(RocketMqTemplate.class);
@Resource(name = "rocketMQTemplate")
private RocketMQTemplate template;
/**
* 獲取模板,如果封裝的方法不夠提供原生的使用方式
*/
public RocketMQTemplate getTemplate() {
return template;
}
/**
* 構(gòu)建目的地
*/
public String buildDestination(String topic, String tag) {
return topic + RocketMqSysConstant.DELIMITER + tag;
}
/**
* 發(fā)送同步消息
*/
public <T extends BaseMqMessage> SendResult send(String topic, String tag, T message) {
// 注意分隔符
return send(topic + RocketMqSysConstant.DELIMITER + tag, message);
}
public <T extends BaseMqMessage> SendResult send(String destination, T message) {
// 設(shè)置業(yè)務(wù)鍵,此處根據(jù)公共的參數(shù)進(jìn)行處理
// 更多的其它基礎(chǔ)業(yè)務(wù)處理...
Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
SendResult sendResult = template.syncSend(destination, sendMessage);
// 此處為了方便查看給日志轉(zhuǎn)了json,根據(jù)選擇選擇日志記錄方式,例如ELK采集
LOGGER.info("[{}]同步消息[{}]發(fā)送結(jié)果[{}]", destination, JsonUtil.toJson(message), JSONObject.toJSON(sendResult));
return sendResult;
}
/**
* 發(fā)送延遲消息
*/
public <T extends BaseMqMessage> SendResult send(String topic, String tag, T message, int delayLevel) {
return send(topic + RocketMqSysConstant.DELIMITER + tag, message, delayLevel);
}
public <T extends BaseMqMessage> SendResult send(String destination, T message, int delayLevel) {
Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel);
LOGGER.info("[{}]延遲等級(jí)[{}]消息[{}]發(fā)送結(jié)果[{}]", destination, delayLevel, JsonUtil.toJson(message), JsonUtil.toJson(sendResult));
return sendResult;
}
}- 這個(gè)類是最基礎(chǔ)的原始封裝類,相當(dāng)于餐館提供的點(diǎn)餐服務(wù)。上面提供無(wú)業(yè)務(wù)特性的發(fā)送,比如想要發(fā)送日志消息或者動(dòng)態(tài)發(fā)送消息目的場(chǎng)景
3.2.3 增強(qiáng)RocketMQTemplate
- 以訂單處理中心來(lái)說(shuō),變化點(diǎn)僅僅只是單號(hào)等業(yè)務(wù)數(shù)據(jù)不一樣,發(fā)往訂單處理中心的消息不管是topic還是tag等等其實(shí)完全都一樣,那么此時(shí)可以根據(jù)業(yè)務(wù)來(lái)增加封裝
- 增強(qiáng)原始功能需要注意下面兩個(gè)點(diǎn)
- 所有父類能出現(xiàn)的地方,子類都能出現(xiàn):也就是子類擁有功能 >= 父類 ,比如Java的List,只要入?yún)⑹荓ist的地方,傳ArrayList和LinkedList完全可以
- 增強(qiáng)功能不能改變?cè)脊δ艿男袨?/strong>:比如父類有一個(gè)方法say是說(shuō)話,結(jié)果子類覆寫(xiě)了say改成了行為是吃飯,然后當(dāng)調(diào)用者調(diào)用say的時(shí)候得到了一個(gè)完全預(yù)期外的結(jié)果
- 就以訂單中心消息發(fā)送為例,封裝OrderMessageTemplate繼承自RocketMqTemplate,此時(shí)前者就擁有了封裝父類的所有基礎(chǔ)方法,擁有了所有父類的功能。然后可以在前者增加自身業(yè)務(wù)特性的發(fā)送方法,比如發(fā)送訂單處理消息
package com.codecoord.rocketmq.template;
import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import com.codecoord.rocketmq.domain.RocketMqEntityMessage;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.validation.constraints.NotNull;
import java.time.LocalDate;
import java.time.LocalDateTime;
/**
* 訂單類發(fā)送消息模板工具類
*
* @author tianxincode@163.com
* @since 2022/6/16
*/
@Component
public class OrderMessageTemplate extends RocketMqTemplate {
/// 如果不采用繼承也可以直接注入使用
/* @Resource
private RocketMqTemplate rocketMqTemplate; */
/**
* 入?yún)⒅恍枰獋魅胧悄膫€(gè)訂單號(hào)和業(yè)務(wù)體消息即可,其他操作根據(jù)需要處理
* 這樣對(duì)于調(diào)用者而言,可以更加簡(jiǎn)化調(diào)用
*/
public SendResult sendOrderPaid(@NotNull String orderId, String body) {
RocketMqEntityMessage message = new RocketMqEntityMessage();
message.setKey(orderId);
message.setSource("訂單支付");
message.setMessage(body);
// 這兩個(gè)字段只是為了測(cè)試
message.setBirthday(LocalDate.now());
message.setTradeTime(LocalDateTime.now());
return send(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.ORDER_PAID_TAG, message);
}
}- 此時(shí)對(duì)于調(diào)用者只需要 orderMessageTemplate.sendOrderPaid("O001", "xxx");就可以把消息發(fā)送到訂單處理中心
- 封裝后的好處,假如現(xiàn)在有10個(gè)訂單來(lái)源,現(xiàn)在需要調(diào)整消息發(fā)送格式,如果不進(jìn)行封裝那么10個(gè)來(lái)源發(fā)送的地方都需要改;如果進(jìn)行了二次封裝,只需要改sendOrderPaid方法即可,而且還不會(huì)出錯(cuò),此時(shí)優(yōu)勢(shì)就體現(xiàn)出來(lái)了
2.3 RocketMQListener封裝
- RocketMQListener是消費(fèi)消息的核心,同時(shí)也涉及到更多的操作,比如:基礎(chǔ)日志記錄、異常處理、消息重試、警告通知等等等
- 按照抽離變化點(diǎn),RocketMQListener只應(yīng)該處理與自身業(yè)務(wù)相關(guān)的,除此之外的其它應(yīng)該交給父類,子類只需要告訴父類要不要異常處理、要不要重試等等,點(diǎn)餐式服務(wù)
- 封裝消息消費(fèi)的抽象類
- 注意泛型限定為標(biāo)準(zhǔn)基礎(chǔ)消息類,這樣能到消費(fèi)者的一定有統(tǒng)一的標(biāo)準(zhǔn)類BaseMqMessage
- 下面簡(jiǎn)單封裝示例
package com.codecoord.rocketmq.listener;
import com.codecoord.rocketmq.constant.RocketMqSysConstant;
import com.codecoord.rocketmq.domain.BaseMqMessage;
import com.codecoord.rocketmq.template.RocketMqTemplate;
import com.codecoord.rocketmq.util.JsonUtil;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import javax.annotation.Resource;
import java.time.Instant;
import java.util.Objects;
/**
* 抽象消息監(jiān)聽(tīng)器,封裝了所有公共處理業(yè)務(wù),如
* 1、基礎(chǔ)日志記錄
* 2、異常處理
* 3、消息重試
* 4、警告通知
* 5、....
*
* @author tianxincoord@163.com
* @since 2022/4/17
*/
public abstract class BaseMqMessageListener<T extends BaseMqMessage> {
/**
* 這里的日志記錄器是哪個(gè)子類的就會(huì)被哪個(gè)子類的類進(jìn)行初始化
*/
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
@Resource
private RocketMqTemplate rocketMqTemplate;
/**
* 消息者名稱
*
* @return 消費(fèi)者名稱
*/
protected abstract String consumerName();
/**
* 消息處理
*
* @param message 待處理消息
* @throws Exception 消費(fèi)異常
*/
protected abstract void handleMessage(T message) throws Exception;
/**
* 超過(guò)重試次數(shù)消息,需要啟用isRetry
*
* @param message 待處理消息
*/
protected abstract void overMaxRetryTimesMessage(T message);
/**
* 是否過(guò)濾消息,例如某些
*
* @param message 待處理消息
* @return true: 本次消息被過(guò)濾,false:不過(guò)濾
*/
protected boolean isFilter(T message) {
return false;
}
/**
* 是否異常時(shí)重復(fù)發(fā)送
*
* @return true: 消息重試,false:不重試
*/
protected abstract boolean isRetry();
/**
* 消費(fèi)異常時(shí)是否拋出異常
*
* @return true: 拋出異常,false:消費(fèi)異常(如果沒(méi)有開(kāi)啟重試則消息會(huì)被自動(dòng)ack)
*/
protected abstract boolean isThrowException();
/**
* 最大重試次數(shù)
*
* @return 最大重試次數(shù),默認(rèn)10次
*/
protected int maxRetryTimes() {
return 10;
}
/**
* isRetry開(kāi)啟時(shí),重新入隊(duì)延遲時(shí)間
*
* @return -1:立即入隊(duì)重試
*/
protected int retryDelayLevel() {
return -1;
}
/**
* 由父類來(lái)完成基礎(chǔ)的日志和調(diào)配,下面的只是提供一個(gè)思路
*/
public void dispatchMessage(T message) {
MDC.put(RocketMqSysConstant.TRACE_ID, message.getTraceId());
// 基礎(chǔ)日志記錄被父類處理了
logger.info("[{}]消費(fèi)者收到消息[{}]", consumerName(), JsonUtil.toJson(message));
if (isFilter(message)) {
logger.info("消息不滿足消費(fèi)條件,已過(guò)濾");
return;
}
// 超過(guò)最大重試次數(shù)時(shí)調(diào)用子類方法處理
if (message.getRetryTimes() > maxRetryTimes()) {
overMaxRetryTimesMessage(message);
return;
}
try {
long start = Instant.now().toEpochMilli();
handleMessage(message);
long end = Instant.now().toEpochMilli();
logger.info("消息消費(fèi)成功,耗時(shí)[{}ms]", (end - start));
} catch (Exception e) {
logger.error("消息消費(fèi)異常", e);
// 是捕獲異常還是拋出,由子類決定
if (isThrowException()) {
throw new RuntimeException(e);
}
if (isRetry()) {
// 獲取子類RocketMQMessageListener注解拿到topic和tag
RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);
if (Objects.nonNull(annotation)) {
message.setSource(message.getSource() + "消息重試");
message.setRetryTimes(message.getRetryTimes() + 1);
SendResult sendResult;
try {
// 如果消息發(fā)送不成功,則再次重新發(fā)送,如果發(fā)送異常則拋出由MQ再次處理(異常時(shí)不走延遲消息)
// 此處捕獲之后,相當(dāng)于此條消息被消息完成然后重新發(fā)送新的消息
sendResult = rocketMqTemplate.send(annotation.topic(), annotation.selectorExpression(), message, retryDelayLevel());
} catch (Exception ex) {
throw new RuntimeException(ex);
}
// 發(fā)送失敗的處理就是不進(jìn)行ACK,由RocketMQ重試
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
throw new RuntimeException("重試消息發(fā)送失敗");
}
}
}
}
}
}- 封裝消費(fèi)最終類
- 注意:收到的消息是先委派給父類,父類進(jìn)行調(diào)度管理
package com.codecoord.rocketmq.listener;
import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import com.codecoord.rocketmq.domain.RocketMqEntityMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* 實(shí)體類消費(fèi)監(jiān)聽(tīng)器,在實(shí)現(xiàn)RocketMQListener中間還加了一層BaseMqMessageListener來(lái)處理基礎(chǔ)業(yè)務(wù)消息
*
* @author tianxincoord@163.com
* @since 2022/5/12
*/
@Slf4j
@Component
@RocketMQMessageListener(
topic = RocketMqBizConstant.SOURCE_TOPIC,
consumerGroup = RocketMqBizConstant.SOURCE_GROUP,
selectorExpression = RocketMqBizConstant.SOURCE_TAG,
// 指定消費(fèi)者線程數(shù),默認(rèn)64,生產(chǎn)中請(qǐng)注意配置,避免過(guò)大或者過(guò)小
consumeThreadMax = 5
)
public class RocketEntityMessageListener extends BaseMqMessageListener<RocketMqEntityMessage>
implements RocketMQListener<RocketMqEntityMessage> {
/**
* 此處只是說(shuō)明封裝的思想,更多還是要根據(jù)業(yè)務(wù)操作決定
* 內(nèi)功心法有了,無(wú)論什么招式都可以發(fā)揮最大威力
*/
@Override
protected String consumerName() {
return "RocketMQ二次封裝消息消費(fèi)者";
}
@Override
public void onMessage(RocketMqEntityMessage message) {
// 注意,此時(shí)這里沒(méi)有直接處理業(yè)務(wù),而是先委派給父類做基礎(chǔ)操作,然后父類做完基礎(chǔ)操作后會(huì)調(diào)用子類的實(shí)際處理類型
super.dispatchMessage(message);
}
@Override
protected void handleMessage(RocketMqEntityMessage message) throws Exception {
// 此時(shí)這里才是最終的業(yè)務(wù)處理,代碼只需要處理資源類關(guān)閉異常,其他的可以交給父類重試
System.out.println("業(yè)務(wù)消息處理");
}
@Override
protected void overMaxRetryTimesMessage(RocketMqEntityMessage message) {
// 當(dāng)超過(guò)指定重試次數(shù)消息時(shí)此處方法會(huì)被調(diào)用
// 生產(chǎn)中可以進(jìn)行回退或其他業(yè)務(wù)操作
}
@Override
protected boolean isRetry() {
return false;
}
@Override
protected int maxRetryTimes() {
// 指定需要的重試次數(shù),超過(guò)重試次數(shù)overMaxRetryTimesMessage會(huì)被調(diào)用
return 5;
}
@Override
protected boolean isThrowException() {
// 是否拋出異常,到消費(fèi)異常時(shí)是被父類攔截處理還是直接拋出異常
return false;
}
}- 封裝后對(duì)于子類來(lái)說(shuō),只需要告訴父類要不要做就擁有了最開(kāi)始說(shuō)的所有功能,簡(jiǎn)化了使用,此時(shí)子類消費(fèi)者只需要專注于自己的業(yè)務(wù)核心處理就可以了
2.4 廣播消息的應(yīng)用場(chǎng)景
- 應(yīng)用場(chǎng)景:多租戶或者服務(wù)有內(nèi)部緩存需要刷新情況下如果需要刷新租戶信息或者緩存信息
- 也就是需要所有服務(wù)節(jié)點(diǎn)都需要同事做某一件事情的時(shí)候,此時(shí)可以借助廣播消息發(fā)送消息到所有節(jié)點(diǎn)刷新,無(wú)需一個(gè)節(jié)點(diǎn)一個(gè)節(jié)點(diǎn)的處理
- 特別說(shuō)明:廣播消息默認(rèn)會(huì)在家目錄下創(chuàng)建消費(fèi)進(jìn)度文件,會(huì)以www.tianxincoord.com:9876@www.tianxincoord.com:9876這種地址形式生成文件路徑,但是由于帶有:符號(hào),windows下是不允許此符號(hào)作為文件夾名稱的,所以如果rocketMQ的鏈接地址不是連接串(不帶有端口)可以取消下面的messageModel注釋,否則啟動(dòng)的時(shí)候就會(huì)提示目標(biāo)卷或者路徑不存在,其實(shí)是因?yàn)檫@個(gè)問(wèn)題
package com.codecoord.rocketmq.listener;
import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* 廣播消息
* 應(yīng)用場(chǎng)景:多租戶或者服務(wù)有內(nèi)部緩存需要刷新情況下如果需要刷新租戶信息或者緩存信息
* 也就是需要所有服務(wù)節(jié)點(diǎn)都需要同事做某一件事情的時(shí)候
* 此時(shí)可以借助廣播消息發(fā)送消息到所有節(jié)點(diǎn)刷新,無(wú)需一個(gè)節(jié)點(diǎn)一個(gè)節(jié)點(diǎn)的處理
*
* 特別說(shuō)明:廣播消息默認(rèn)會(huì)在家目錄下創(chuàng)建消費(fèi)進(jìn)度文件,會(huì)以www.tianxincoord.com:9876@www.tianxincoord.com:9876
* 這種地址形式生成文件路徑,但是由于帶有:符號(hào),windows下是不允許此符號(hào)作為文件夾名稱的
* 所以如果rocketMQ的鏈接地址不是連接串(不帶有端口)可以取消下面的messageModel注釋
* 否則啟動(dòng)的時(shí)候就會(huì)提示目標(biāo)卷或者路徑不存在,其實(shí)是因?yàn)檫@個(gè)問(wèn)題
*
* @author tianxincoord@163.com
* @since 2022/5/12
*/
@Slf4j
@Component
@RocketMQMessageListener(
topic = RocketMqBizConstant.SOURCE_TOPIC,
consumerGroup = RocketMqBizConstant.SOURCE_BROADCASTING_GROUP,
selectorExpression = RocketMqBizConstant.SOURCE_BROADCASTING_TAG
// messageModel = MessageModel.BROADCASTING
)
public class RocketBroadcastingListener implements RocketMQListener<MessageExt> {
/**
* MessageExt:內(nèi)置的消息實(shí)體,生產(chǎn)中根據(jù)需要自己封裝實(shí)體
*/
@Override
public void onMessage(MessageExt message) {
log.info("收到廣播消息【{}】", new String(message.getBody()));
}
}2.3 代碼封裝完結(jié)測(cè)試
封裝測(cè)試大家可以直接參考RocketMqController即可
package com.codecoord.rocketmq.controller;
import com.alibaba.fastjson.JSONObject;
import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import com.codecoord.rocketmq.domain.RocketMqEntityMessage;
import com.codecoord.rocketmq.template.OrderMessageTemplate;
import com.codecoord.rocketmq.template.RocketMqTemplate;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.UUID;
/**
* 消息發(fā)送
*
* @author tianxin01@huice.com
* @since 2022/6/16
*/
@RestController
@RequestMapping("/rocketmq")
@Slf4j
public class RocketMqController {
/**
* 注意此處注入的是封裝的RocketMqTemplate
*/
@Resource
private RocketMqTemplate rocketMqTemplate;
/**
* 注入對(duì)應(yīng)業(yè)務(wù)的模板類
*/
@Resource
private OrderMessageTemplate orderMessageTemplate;
/**
* 通過(guò)實(shí)體類發(fā)送消息,發(fā)送注意事項(xiàng)請(qǐng)參考實(shí)體類
* 說(shuō)明:也可以在RocketMqTemplate按照業(yè)務(wù)封裝發(fā)送方法,這樣只需要調(diào)用方法指定基礎(chǔ)業(yè)務(wù)消息接口
*/
@RequestMapping("/entity/message")
public Object sendMessage() {
RocketMqEntityMessage message = new RocketMqEntityMessage();
// 設(shè)置業(yè)務(wù)key
message.setKey(UUID.randomUUID().toString());
// 設(shè)置消息來(lái)源,便于查詢we年
message.setSource("封裝測(cè)試");
// 業(yè)務(wù)消息內(nèi)容
message.setMessage("當(dāng)前消息發(fā)送時(shí)間為:" + LocalDateTime.now());
// Java時(shí)間字段需要單獨(dú)處理,否則會(huì)序列化失敗
message.setBirthday(LocalDate.now());
message.setTradeTime(LocalDateTime.now());
return rocketMqTemplate.send(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.SOURCE_TAG, message);
}
/**
* 此時(shí)對(duì)于調(diào)用者而且,無(wú)需創(chuàng)建任何類
* 如果某天需要調(diào)整消息發(fā)送來(lái)源,如果不封裝,所有原來(lái)產(chǎn)生message的地方全部改
* 如果封裝了,只需要改sendOrderPaid就可以切換
*/
@RequestMapping("/order/paid")
public Object sendOrderPaidMessage() {
return orderMessageTemplate.sendOrderPaid(UUID.randomUUID().toString(), "客戶下單了...,快快備貨");
}
/**
* 直接將對(duì)象進(jìn)行傳輸,也可以自己進(jìn)行json轉(zhuǎn)化后傳輸
*/
@RequestMapping("/messageExt/message")
public SendResult convertAndSend() {
// 生產(chǎn)中不推薦使用jsonObject傳遞,不看發(fā)送者無(wú)法知道傳遞的消息包含什么信息
JSONObject jsonObject = new JSONObject();
jsonObject.put("type", "messageExt");
String destination = rocketMqTemplate.buildDestination(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.SOURCE_BROADCASTING_TAG);
// 如果要走內(nèi)部方法發(fā)送則必須要按照標(biāo)準(zhǔn)來(lái),否則就使用原生的消息發(fā)送
return rocketMqTemplate.getTemplate().syncSend(destination, jsonObject);
}
}到此這篇關(guān)于RocketMQ整合SpringBoot實(shí)現(xiàn)生產(chǎn)級(jí)二次封裝的文章就介紹到這了,更多相關(guān)RocketMQ SpringBoot封裝內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringBoot整合RocketMQ實(shí)現(xiàn)消息發(fā)送和接收的詳細(xì)步驟
- Springboot詳細(xì)講解RocketMQ實(shí)現(xiàn)順序消息的發(fā)送與消費(fèi)流程
- Springboot詳解RocketMQ實(shí)現(xiàn)廣播消息流程
- Springboot?整合?RocketMQ?收發(fā)消息的配置過(guò)程
- SpringBoot中使用RocketMQ的示例代碼
- 解決springboot集成rocketmq關(guān)于tag的坑
- Springboot詳解RocketMQ實(shí)現(xiàn)消息發(fā)送與接收流程
相關(guān)文章
SpringBoot Starter依賴原理與實(shí)例詳解
SpringBoot中的starter是一種非常重要的機(jī)制,能夠拋棄以前繁雜的配置,將其統(tǒng)一集成進(jìn)starter,應(yīng)用者只需要在maven中引入starter依賴,SpringBoot就能自動(dòng)掃描到要加載的信息并啟動(dòng)相應(yīng)的默認(rèn)配置。starter讓我們擺脫了各種依賴庫(kù)的處理,需要配置各種信息的困擾2022-09-09
mybatis水平分表實(shí)現(xiàn)動(dòng)態(tài)表名的項(xiàng)目實(shí)例
本文主要介紹了mybatis水平分表實(shí)現(xiàn)動(dòng)態(tài)表名的項(xiàng)目實(shí)例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07
JavaWeb利用struts實(shí)現(xiàn)文件下載時(shí)改變文件名稱
這篇文章主要為大家詳細(xì)介紹了JavaWeb利用struts實(shí)現(xiàn)文件下載時(shí)改變文件名稱的相關(guān)資料,需要的朋友可以參考下2016-06-06
Java基于ArrayList實(shí)現(xiàn)群主發(fā)紅包功能
這篇文章主要介紹了Java基于ArrayList實(shí)現(xiàn)群主發(fā)紅包功能,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-09-09
詳解SpringBoot如何實(shí)現(xiàn)緩存預(yù)熱
緩存預(yù)熱是指在 Spring Boot 項(xiàng)目啟動(dòng)時(shí),預(yù)先將數(shù)據(jù)加載到緩存系統(tǒng)(如 Redis)中的一種機(jī)制,下面我們就來(lái)看看SpringBoot是如何實(shí)現(xiàn)緩存預(yù)熱的吧2024-01-01
Mockito mock Kotlin Object類方法報(bào)錯(cuò)解決方法
這篇文章主要介紹了Mockito mock Kotlin Object類方法報(bào)錯(cuò)解決方法,本篇文章通過(guò)簡(jiǎn)要的案例,講解了該項(xiàng)技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下2021-09-09
理解Java注解及Spring的@Autowired是如何實(shí)現(xiàn)的
今天通過(guò)本文帶領(lǐng)大家學(xué)習(xí)注解的基礎(chǔ)知識(shí),學(xué)習(xí)Spring的@Autowired是怎么實(shí)現(xiàn)的,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2021-07-07
Redis 集成Spring的示例代碼(spring-data-redis)
本篇文章主要介紹了Redis 集成Spring的示例代碼(spring-data-redis) ,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-09-09

