欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ整合SpringBoot實(shí)現(xiàn)生產(chǎn)級二次封裝

 更新時間:2022年06月20日 09:19:27   作者:TianXinCoord  
本文主要介紹了RocketMQ整合SpringBoot實(shí)現(xiàn)生產(chǎn)級二次封裝,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

前言說明

前置掌握:SpringBoot基礎(chǔ)使用、RocketMQ和SpringBoot的整合使用

  • 主要使用參考第二節(jié)
  • 核心使用參考第一篇文章

文章難度:四顆星

代碼不難,重點(diǎn)是封裝的思想需要體會

不同觀點(diǎn)歡迎大家在評論區(qū)一起討論學(xué)習(xí),沒有對錯之分,每個系統(tǒng)業(yè)務(wù)特性不同,適合系統(tǒng)的才是最好的~

源碼地址https://gitee.com/tianxincoder/practice-rocketmq-enterprise文章只會說明核心代碼,其他的基礎(chǔ)整合配置和多環(huán)境自動隔離參考源碼即可

一、為什么要二次封裝

為了不產(chǎn)生歧義,文章中提到的二次封裝均是基于原始使用方式的封裝,而非源碼級別的二次封裝

換句話說:如果都需要對源碼進(jìn)行封裝了,那么說明公司業(yè)務(wù)規(guī)模都到一定程度了,二次封裝這種東西已經(jīng)不需要討論了,封裝已經(jīng)是一個共識

  • 首先明確一點(diǎn):不進(jìn)行二次封裝完全不影響RocketMQ的使用,可以選擇二次封裝和不選擇二次封裝
    • 二次封裝可以提供更多的功能和更簡潔的使用方式
    • 如果一個封裝搞得比原始使用方式更復(fù)雜,那么就失去了二次封裝的意義
    • Q1:二次封裝可不可以不要?完全可以,完全不影響正常使用
    • Q2:二次封裝有沒有必要?仁者見仁智者見智的問題,如果覺得沒有必要那么這篇文章可以跳過~
  • ORM框架中典型的一個二次封裝框架就是MyBatisPlus(簡稱MP),后者是對MyBatis原生使用的增加,不使用MP直接使用MyBatis可不可以?完全可以,那為什么要用二次封裝后的MP?
    • 場景:大部分的數(shù)據(jù)庫操作,無外乎CRUD,那么最常用的比如(根據(jù)名稱就可以知道這個方法做什么用,就沒有必要再二次說明了):updateById、batchUpdate、deleteById、saveOrUpdate、batchInsert。對于上面這5個操作,變化的只是表字段和表名,剩下的語法都是一樣
    • 不封裝:直接使用MyBatis完全可以自己實(shí)現(xiàn)上面方法的功能,但是每個表都需要寫一遍自己上面的方法,假設(shè)有100張表,那么就會多出495個(下面說明)重復(fù)功能代碼,而且所有代碼都是冗余的
    • 封裝后:由封裝者提供上面5個方法的公共實(shí)現(xiàn),然后所有需要使用上面功能的Service只需要繼承封裝好的類就自然的擁有了上面的5大功能,那么代碼的冗余量就從100張表*5個方法==500,去掉封裝的5個,節(jié)省了495的代碼冗余量
  • 所以二次封裝是為了更方便用、更簡潔、更加適用于系統(tǒng),量身打造可以大大提升開發(fā)效率。就如上面的5個方法,完全重復(fù)性的東西為什么要浪費(fèi)開發(fā)時間來做這些冗余的事情呢?

1.1 二次封裝不同觀點(diǎn)

讓我們以一個生活中的蛋炒飯開個頭

原始框架好比提供了原材料:廚具、雞蛋,米飯等食材、菜譜

  • 對于框架的使用通常有以下兩種方式
  • 第一種:根據(jù)菜譜來進(jìn)行做飯(使用原生的方法調(diào)用),洗菜、做飯、刷碗、打掃不管啥入?yún)⒆约汗?/li>
  • 第二種:找一個人來學(xué)會這道菜(負(fù)責(zé)二次封裝的人)的多種做法(封裝大部分業(yè)務(wù)場景)并做成一種點(diǎn)餐式的服務(wù),誰想吃哪種類型的蛋炒飯直接點(diǎn)餐(調(diào)用封裝好的方法)就可以吃上香噴噴的蛋炒飯

問題:哪種方案更好? 答案:兩種各有各的優(yōu)勢(在說廢話,哈哈~)

  • 第一種:原生方式
    • 優(yōu)點(diǎn):可以按照各種方式靈活調(diào)用,比如每個人都使用RocketMQTemplate原生send發(fā)送消息,想要發(fā)送什么類型的消息就發(fā)送什么類型的消息,比較隨意
    • 缺點(diǎn):代碼大量冗余,從構(gòu)建參數(shù)對象、發(fā)送消息、消費(fèi)消息、異常處理、日志記錄、異常重試啥啥的都是自己搞,每個消費(fèi)隊(duì)列就會出現(xiàn)上面所有的步驟。比如現(xiàn)在有一個訂單處理中心A接收來自各種類型訂單,此時如B、C兩個原始訂單來源想讓A處理訂單,那么B和C都需要按照A的要求進(jìn)行調(diào)用,代碼會冗余
  • 第二種:點(diǎn)餐式服務(wù)
    • 優(yōu)點(diǎn):封裝了大部分統(tǒng)一的方法調(diào)用,比如 發(fā)送消息、異常處理、日志記錄等等都是重復(fù)的,封裝后點(diǎn)餐的人不需要再關(guān)系這一部分要怎么處理,只需要告訴點(diǎn)餐服務(wù)要不要進(jìn)行異常、要不異常重試等等,那么此時對于點(diǎn)餐人來說只需要 付錢(調(diào)用服務(wù))、吃飯(消費(fèi)消息),除此之外啥也不用管,全部由點(diǎn)餐服務(wù)提供者完成所有上述兩個步驟外的其它操作
    • 缺點(diǎn):無法滿足所有點(diǎn)餐人的要求,有的人喜歡味道重一點(diǎn),有的喜歡味道淡一點(diǎn)。但是這個缺點(diǎn)完全可以處理,比如點(diǎn)餐服務(wù)提供了自定義廚房(返回原始發(fā)送對象),此時調(diào)用者可以按照第一種方式進(jìn)行使用
  • 選哪種?
    • 個人而言:業(yè)務(wù)系統(tǒng)復(fù)雜的優(yōu)先選擇第二種 ,簡單業(yè)務(wù)的選擇第一種(盡量采用封裝,后續(xù)維護(hù)方便)。對于一個復(fù)雜的系統(tǒng),本身業(yè)務(wù)級的代碼就已經(jīng)很多了,結(jié)果還要每個人處理全部一樣的東西,消費(fèi)者越多代碼冗余越多。如果一個系統(tǒng)只是為了使用MQ來進(jìn)行業(yè)務(wù)分離,消費(fèi)者也不多,那么可以選擇最快的方式,但是最終會選擇第二種,如果業(yè)務(wù)隨著時間增長越復(fù)雜,越晚改成第二種花費(fèi)的代價越大!
    • 第一種就好比此時我們要直接操作內(nèi)存,原生操作就好比C++或C,可以直接操作內(nèi)存,但是同時用完后還要自己寫各種異常處理和釋放內(nèi)存;代碼封裝就好比Java,我們只需要告訴Java我們要使用內(nèi)存,然后用完就不用管
  • 企業(yè)中,業(yè)務(wù)功能產(chǎn)出是一級優(yōu)先級,在此之上才能有更高級的東西。技術(shù)服務(wù)于業(yè)務(wù),而不是業(yè)務(wù)服務(wù)于技術(shù)!比如現(xiàn)在30個人的系統(tǒng),我們要使用緩存加速訪問,那么我們是選擇 內(nèi)部緩存(直接用集合或者map存起來)還是用Redis?
    • 內(nèi)部緩存和Redis能不能達(dá)到目的?能
    • 哪個更方便更快?內(nèi)部緩存!內(nèi)部對象就很快實(shí)現(xiàn)
    • 如果業(yè)務(wù)發(fā)展遲早會轉(zhuǎn)為Redis這種專業(yè)的緩存中間件,就好比業(yè)務(wù)發(fā)展前第一種,業(yè)務(wù)發(fā)展后選擇第二種,但是對于大部分業(yè)務(wù)系統(tǒng)來說功能增加是很快的,特別是產(chǎn)品同事上一分鐘提需求下一分鐘就要上線這種(開個玩笑~),所以我們在引用一個技術(shù)需不需要進(jìn)行二次封裝時需要技術(shù)負(fù)責(zé)人對業(yè)務(wù)增長有一個預(yù)判。建議是都進(jìn)行封裝一下

1.2 封裝的抽離點(diǎn)

  • 對于二次封裝,其中最主要的就是找出該框架在日常使用中所出現(xiàn)的大部分涉及到的操作,然后找出變化操作和不變化操作
  • RocketMQ日常使用主要場景為例:
    • 發(fā)送消息階段:準(zhǔn)備需要發(fā)送的消息、發(fā)送消息、記錄原始消息日志、發(fā)送失敗處理、可靠性處理
    • 消費(fèi)消息階段:記錄接收消息日志、業(yè)務(wù)處理、業(yè)務(wù)日志記錄、異常處理、異常重試、異常通知、死信處理
  • 提取變化點(diǎn)和不變化點(diǎn)(可以抽取為公共處理的場景)
    • 發(fā)送消息階段:
      • 變化點(diǎn):準(zhǔn)備需要發(fā)送的消息
      • 不變化點(diǎn):發(fā)送消息、記錄原始消息日志、發(fā)送失敗處理、可靠性處理
    • 消費(fèi)消息階段
      • 變化點(diǎn):業(yè)務(wù)處理、業(yè)務(wù)日志記錄
      • 不變化點(diǎn):記錄接收消息日志、異常處理、異常重試、異常通知、死信處理
  • 從上可以看到,對于RocketMQ的使用,大部分場景都是可以抽離成一個公共的方法處理,只有業(yè)務(wù)級的需要自己處理,所以如果我們把不變化場景抽取后,每個同事只需要寫自己業(yè)務(wù)相關(guān)部分即可
  • 抽取后的復(fù)雜度:對于新加一個消費(fèi)者,只需要處理業(yè)務(wù)相關(guān)三個場景(準(zhǔn)備需要發(fā)送的消息、業(yè)務(wù)處理、業(yè)務(wù)日志記錄),剩下的九個場景,只需要封裝一次就可以。需要現(xiàn)在就幾十個消費(fèi)者,可以想想一些減少了多少代碼冗余

1.3 設(shè)計(jì)模式的應(yīng)用

  • 要封裝出一個好的抽象層,【設(shè)計(jì)模式】建議好好體會和學(xué)習(xí)一下
  • 設(shè)計(jì)模式對于用不到的人來說比較虛幻,對于用的到的人來說,這個真牛X

二、二次封裝核心要點(diǎn)

2.1 二次封裝核心點(diǎn)

2.1.1 封裝主要討論點(diǎn)

  • 對于RocketMQ或者說對于整個MQ體系來說(不管是RabbitMQ、RocketMQ、Kafka)等封裝的核心主要有兩個:發(fā)送消息、消費(fèi)消息者兩個場景
  • 對于RocketMQ我們主要討論三個地方:RocketMQTemplate封裝、RocketMQListener封裝和廣播消息的封裝
  • 廣播消息是分布式系統(tǒng)中同時讓所有節(jié)點(diǎn)都干一件事情的一個好的方式,如果用不到忽略廣播消息即可

2.1.2 發(fā)送/消費(fèi)的幾種消息實(shí)體

  • RocketMQ發(fā)送消息對于不同的使用來說,大部分選擇下面的幾種發(fā)送消息類型
    • A、發(fā)送Json對象,比如Fastjson的JSONObject
    • B、直接發(fā)送轉(zhuǎn)Json后的String對象
    • C、根據(jù)業(yè)務(wù)封裝對應(yīng)實(shí)體類
    • D、直接使用原生MessageExt接收
  • 怎么選擇?怎么選擇才是最優(yōu)?
    • 上面哪一種都可以達(dá)到目的,如果要統(tǒng)一封裝就必須要有一個標(biāo)準(zhǔn)
    • 怎么選擇只需要回答這個問題:在不看消息發(fā)送者的情況下,消費(fèi)者怎么知道發(fā)送者發(fā)送的消息含義?
    • 比如現(xiàn)在有一個訂單消息,如果我們不看消息發(fā)送者,怎么知道發(fā)送者給消費(fèi)者發(fā)送哪些字段
      • A、B、D可以嗎?一定不可以!JSON對象和String對象,如果我們不看消息發(fā)送者不可能知道到底發(fā)送了啥,這點(diǎn)我相信沒有可以討論的地方,因?yàn)轭愋蜎Q定了這個操作不可能
      • C可以嗎?可以!此時不需要看消息發(fā)送者,只需要看消費(fèi)者的實(shí)體類點(diǎn)進(jìn)去,有哪些業(yè)務(wù)字段一清二楚
      • 可能有杠要抬了,有看實(shí)體類的功夫,我看消息發(fā)送者都看完了
        • 靈魂拷問1:如果消息發(fā)送者和消費(fèi)者不在一個系統(tǒng)怎么看?邪魅一笑,不同業(yè)務(wù)線可能沒代碼權(quán)限吧?分布式系統(tǒng)完全獨(dú)立可能吧?
        • 靈魂拷問2:如果現(xiàn)在需要一個功能,如果某些必須要的字段消息發(fā)送者如果沒有給的話需要校驗(yàn),普通String和JSONObject怎么實(shí)現(xiàn)?換成實(shí)體類呢?
  • 基于上述討論點(diǎn),封裝建議基于實(shí)體類來,實(shí)體類不管是排查問題、新人熟悉系統(tǒng)代碼、信息校驗(yàn)等String和JSONObject無法像實(shí)體類一樣輕松勝任

2.2 RocketMQTemplate封裝

2.2.1 封裝基礎(chǔ)實(shí)體類

  • 基礎(chǔ)消息實(shí)體類封裝了除了業(yè)務(wù)消息外所有其他公共字段,主要看下面代碼中的字段和注釋
  • 基礎(chǔ)抽象消息實(shí)體,包含基礎(chǔ)的消息、根據(jù)自己的業(yè)務(wù)消息設(shè)置更多的字段
    • 其中也可以包含所有消費(fèi)者可能用得到的方法等,比如做些數(shù)據(jù)的加解密
package com.codecoord.rocketmq.domain;

import lombok.Data;

import java.time.LocalDateTime;
import java.util.UUID;

/**
 * 基礎(chǔ)消息實(shí)體,包含基礎(chǔ)的消息
 * 根據(jù)自己的業(yè)務(wù)消息設(shè)置更多的字段
 *
 * @author tianxincoord@163.com
 * @since 2022/6/16
 */
@Data
public abstract class BaseMqMessage {
    /**
     * 業(yè)務(wù)鍵,用于RocketMQ控制臺查看消費(fèi)情況
     */
    protected String key;
    /**
     * 發(fā)送消息來源,用于排查問題
     */
    protected String source = "";
    /**
     * 發(fā)送時間
     */
    protected LocalDateTime sendTime = LocalDateTime.now();
    /**
     * 跟蹤id,用于slf4j等日志記錄跟蹤id,方便查詢業(yè)務(wù)鏈
     */
    protected String traceId = UUID.randomUUID().toString();
    /**
     * 重試次數(shù),用于判斷重試次數(shù),超過重試次數(shù)發(fā)送異常警告
     */
    protected Integer retryTimes = 0;
}
  • 有了此基礎(chǔ)抽象實(shí)體類,那么剩下的所有業(yè)務(wù)消息實(shí)體只需要繼承此基類,然后在自己業(yè)務(wù)類中包含自己需要的字段即可,因?yàn)檫@些公共字段不管是向上轉(zhuǎn)型還是向下轉(zhuǎn)型,子類和父類都可以看得到

2.2.2 RocketMQTemplate

  • RocketMQTemplate發(fā)送消息的代碼如果不封裝,我們發(fā)送消息需要這樣
    • String destination = topic + ":" + tag;
    • template.syncSend(destination, message);
  • 每個人發(fā)送消息都要自己處理這個冒號,直接傳入topic和tag不香嗎?按照抽離變化點(diǎn)中的變化點(diǎn),只有消息是變化的,除此之外的其他規(guī)則交給封裝類
  • RocketMQTemplate主要封裝發(fā)送消息的日志、異常的處理、消息key設(shè)置、等等其他配置
  • 封裝代碼類如下,下面包含了主要發(fā)送方式,更多自己添加即可
    • 這里就是消息發(fā)送的點(diǎn)餐機(jī)器,同時也提供了封裝方法也提供原始RocketMQTemplate供使用
    • 此處只是提供一種方式,生產(chǎn)中按照項(xiàng)目組商量決定
package com.codecoord.rocketmq.template;

import com.alibaba.fastjson.JSONObject;
import com.codecoord.rocketmq.constant.RocketMqSysConstant;
import com.codecoord.rocketmq.domain.BaseMqMessage;
import com.codecoord.rocketmq.util.JsonUtil;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * RocketMQ模板類
 *
 * @author tianxincoord@163.com
 * @since 2022/4/15
 */
@Component
public class RocketMqTemplate {
    private static final Logger LOGGER = LoggerFactory.getLogger(RocketMqTemplate.class);
    @Resource(name = "rocketMQTemplate")
    private RocketMQTemplate template;

    /**
     * 獲取模板,如果封裝的方法不夠提供原生的使用方式
     */
    public RocketMQTemplate getTemplate() {
        return template;
    }

    /**
     * 構(gòu)建目的地
     */
    public String buildDestination(String topic, String tag) {
        return topic + RocketMqSysConstant.DELIMITER + tag;
    }

    /**
     * 發(fā)送同步消息
     */
    public <T extends BaseMqMessage> SendResult send(String topic, String tag, T message) {
        // 注意分隔符
        return send(topic + RocketMqSysConstant.DELIMITER + tag, message);
    }

    public <T extends BaseMqMessage> SendResult send(String destination, T message) {
        // 設(shè)置業(yè)務(wù)鍵,此處根據(jù)公共的參數(shù)進(jìn)行處理
        // 更多的其它基礎(chǔ)業(yè)務(wù)處理...
        Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
        SendResult sendResult = template.syncSend(destination, sendMessage);
        // 此處為了方便查看給日志轉(zhuǎn)了json,根據(jù)選擇選擇日志記錄方式,例如ELK采集
        LOGGER.info("[{}]同步消息[{}]發(fā)送結(jié)果[{}]", destination, JsonUtil.toJson(message), JSONObject.toJSON(sendResult));
        return sendResult;
    }

    /**
     * 發(fā)送延遲消息
     */
    public <T extends BaseMqMessage> SendResult send(String topic, String tag, T message, int delayLevel) {
        return send(topic + RocketMqSysConstant.DELIMITER + tag, message, delayLevel);
    }

    public <T extends BaseMqMessage> SendResult send(String destination, T message, int delayLevel) {
        Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
        SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel);
        LOGGER.info("[{}]延遲等級[{}]消息[{}]發(fā)送結(jié)果[{}]", destination, delayLevel, JsonUtil.toJson(message), JsonUtil.toJson(sendResult));
        return sendResult;
    }
}
  • 這個類是最基礎(chǔ)的原始封裝類,相當(dāng)于餐館提供的點(diǎn)餐服務(wù)。上面提供無業(yè)務(wù)特性的發(fā)送,比如想要發(fā)送日志消息或者動態(tài)發(fā)送消息目的場景

3.2.3 增強(qiáng)RocketMQTemplate

  • 以訂單處理中心來說,變化點(diǎn)僅僅只是單號等業(yè)務(wù)數(shù)據(jù)不一樣,發(fā)往訂單處理中心的消息不管是topic還是tag等等其實(shí)完全都一樣,那么此時可以根據(jù)業(yè)務(wù)來增加封裝
  • 增強(qiáng)原始功能需要注意下面兩個點(diǎn)
    • 所有父類能出現(xiàn)的地方,子類都能出現(xiàn):也就是子類擁有功能 >= 父類 ,比如Java的List,只要入?yún)⑹荓ist的地方,傳ArrayList和LinkedList完全可以
    • 增強(qiáng)功能不能改變原始功能的行為:比如父類有一個方法say是說話,結(jié)果子類覆寫了say改成了行為是吃飯,然后當(dāng)調(diào)用者調(diào)用say的時候得到了一個完全預(yù)期外的結(jié)果
  • 就以訂單中心消息發(fā)送為例,封裝OrderMessageTemplate繼承自RocketMqTemplate,此時前者就擁有了封裝父類的所有基礎(chǔ)方法,擁有了所有父類的功能。然后可以在前者增加自身業(yè)務(wù)特性的發(fā)送方法,比如發(fā)送訂單處理消息
package com.codecoord.rocketmq.template;

import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import com.codecoord.rocketmq.domain.RocketMqEntityMessage;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import javax.validation.constraints.NotNull;
import java.time.LocalDate;
import java.time.LocalDateTime;

/**
 * 訂單類發(fā)送消息模板工具類
 *
 * @author tianxincode@163.com
 * @since 2022/6/16
 */
@Component
public class OrderMessageTemplate extends RocketMqTemplate {
    /// 如果不采用繼承也可以直接注入使用
    /* @Resource
    private RocketMqTemplate rocketMqTemplate; */

    /**
     * 入?yún)⒅恍枰獋魅胧悄膫€訂單號和業(yè)務(wù)體消息即可,其他操作根據(jù)需要處理
     * 這樣對于調(diào)用者而言,可以更加簡化調(diào)用
     */
    public SendResult sendOrderPaid(@NotNull String orderId, String body) {
        RocketMqEntityMessage message = new RocketMqEntityMessage();
        message.setKey(orderId);
        message.setSource("訂單支付");
        message.setMessage(body);
        // 這兩個字段只是為了測試
        message.setBirthday(LocalDate.now());
        message.setTradeTime(LocalDateTime.now());
        return send(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.ORDER_PAID_TAG, message);
    }
}
  • 此時對于調(diào)用者只需要 orderMessageTemplate.sendOrderPaid("O001", "xxx");就可以把消息發(fā)送到訂單處理中心
  • 封裝后的好處,假如現(xiàn)在有10個訂單來源,現(xiàn)在需要調(diào)整消息發(fā)送格式,如果不進(jìn)行封裝那么10個來源發(fā)送的地方都需要改;如果進(jìn)行了二次封裝,只需要改sendOrderPaid方法即可,而且還不會出錯,此時優(yōu)勢就體現(xiàn)出來了

2.3 RocketMQListener封裝

  • RocketMQListener是消費(fèi)消息的核心,同時也涉及到更多的操作,比如:基礎(chǔ)日志記錄、異常處理、消息重試、警告通知等等等
  • 按照抽離變化點(diǎn),RocketMQListener只應(yīng)該處理與自身業(yè)務(wù)相關(guān)的,除此之外的其它應(yīng)該交給父類,子類只需要告訴父類要不要異常處理、要不要重試等等,點(diǎn)餐式服務(wù)
  • 封裝消息消費(fèi)的抽象類
    • 注意泛型限定為標(biāo)準(zhǔn)基礎(chǔ)消息類,這樣能到消費(fèi)者的一定有統(tǒng)一的標(biāo)準(zhǔn)類BaseMqMessage
    • 下面簡單封裝示例
package com.codecoord.rocketmq.listener;

import com.codecoord.rocketmq.constant.RocketMqSysConstant;
import com.codecoord.rocketmq.domain.BaseMqMessage;
import com.codecoord.rocketmq.template.RocketMqTemplate;
import com.codecoord.rocketmq.util.JsonUtil;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import javax.annotation.Resource;
import java.time.Instant;
import java.util.Objects;

/**
 * 抽象消息監(jiān)聽器,封裝了所有公共處理業(yè)務(wù),如
 * 1、基礎(chǔ)日志記錄
 * 2、異常處理
 * 3、消息重試
 * 4、警告通知
 * 5、....
 *
 * @author tianxincoord@163.com
 * @since 2022/4/17
 */
public abstract class BaseMqMessageListener<T extends BaseMqMessage> {
    /**
     * 這里的日志記錄器是哪個子類的就會被哪個子類的類進(jìn)行初始化
     */
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Resource
    private RocketMqTemplate rocketMqTemplate;

    /**
     * 消息者名稱
     *
     * @return 消費(fèi)者名稱
     */
    protected abstract String consumerName();

    /**
     * 消息處理
     *
     * @param message 待處理消息
     * @throws Exception 消費(fèi)異常
     */
    protected abstract void handleMessage(T message) throws Exception;

    /**
     * 超過重試次數(shù)消息,需要啟用isRetry
     *
     * @param message 待處理消息
     */
    protected abstract void overMaxRetryTimesMessage(T message);
    /**
     * 是否過濾消息,例如某些
     *
     * @param message 待處理消息
     * @return true: 本次消息被過濾,false:不過濾
     */
    protected boolean isFilter(T message) {
        return false;
    }

    /**
     * 是否異常時重復(fù)發(fā)送
     *
     * @return true: 消息重試,false:不重試
     */
    protected abstract boolean isRetry();

    /**
     * 消費(fèi)異常時是否拋出異常
     *
     * @return true: 拋出異常,false:消費(fèi)異常(如果沒有開啟重試則消息會被自動ack)
     */
    protected abstract boolean isThrowException();

    /**
     * 最大重試次數(shù)
     *
     * @return 最大重試次數(shù),默認(rèn)10次
     */
    protected int maxRetryTimes() {
        return 10;
    }

    /**
     * isRetry開啟時,重新入隊(duì)延遲時間
     *
     * @return -1:立即入隊(duì)重試
     */
    protected int retryDelayLevel() {
        return -1;
    }

    /**
     * 由父類來完成基礎(chǔ)的日志和調(diào)配,下面的只是提供一個思路
     */
    public void dispatchMessage(T message) {
        MDC.put(RocketMqSysConstant.TRACE_ID, message.getTraceId());
        // 基礎(chǔ)日志記錄被父類處理了
        logger.info("[{}]消費(fèi)者收到消息[{}]", consumerName(), JsonUtil.toJson(message));
        if (isFilter(message)) {
            logger.info("消息不滿足消費(fèi)條件,已過濾");
            return;
        }
        // 超過最大重試次數(shù)時調(diào)用子類方法處理
        if (message.getRetryTimes() > maxRetryTimes()) {
            overMaxRetryTimesMessage(message);
            return;
        }
        try {
            long start = Instant.now().toEpochMilli();
            handleMessage(message);
            long end = Instant.now().toEpochMilli();
            logger.info("消息消費(fèi)成功,耗時[{}ms]", (end - start));
        } catch (Exception e) {
            logger.error("消息消費(fèi)異常", e);
            // 是捕獲異常還是拋出,由子類決定
            if (isThrowException()) {
                throw new RuntimeException(e);
            }
            if (isRetry()) {
                // 獲取子類RocketMQMessageListener注解拿到topic和tag
                RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);
                if (Objects.nonNull(annotation)) {
                    message.setSource(message.getSource() + "消息重試");
                    message.setRetryTimes(message.getRetryTimes() + 1);
                    SendResult sendResult;
                    try {
                        // 如果消息發(fā)送不成功,則再次重新發(fā)送,如果發(fā)送異常則拋出由MQ再次處理(異常時不走延遲消息)
                        // 此處捕獲之后,相當(dāng)于此條消息被消息完成然后重新發(fā)送新的消息
                        sendResult = rocketMqTemplate.send(annotation.topic(), annotation.selectorExpression(), message, retryDelayLevel());
                    } catch (Exception ex) {
                        throw new RuntimeException(ex);
                    }
                    // 發(fā)送失敗的處理就是不進(jìn)行ACK,由RocketMQ重試
                    if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                        throw new RuntimeException("重試消息發(fā)送失敗");
                    }
                }
            }
        }
    }
}
  • 封裝消費(fèi)最終類
    • 注意:收到的消息是先委派給父類,父類進(jìn)行調(diào)度管理
package com.codecoord.rocketmq.listener;

import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import com.codecoord.rocketmq.domain.RocketMqEntityMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * 實(shí)體類消費(fèi)監(jiān)聽器,在實(shí)現(xiàn)RocketMQListener中間還加了一層BaseMqMessageListener來處理基礎(chǔ)業(yè)務(wù)消息
 *
 * @author tianxincoord@163.com
 * @since 2022/5/12
 */
@Slf4j
@Component
@RocketMQMessageListener(
        topic = RocketMqBizConstant.SOURCE_TOPIC,
        consumerGroup = RocketMqBizConstant.SOURCE_GROUP,
        selectorExpression = RocketMqBizConstant.SOURCE_TAG,
        // 指定消費(fèi)者線程數(shù),默認(rèn)64,生產(chǎn)中請注意配置,避免過大或者過小
        consumeThreadMax = 5
)
public class RocketEntityMessageListener extends BaseMqMessageListener<RocketMqEntityMessage>
                                         implements RocketMQListener<RocketMqEntityMessage> {
    /**
     * 此處只是說明封裝的思想,更多還是要根據(jù)業(yè)務(wù)操作決定
     * 內(nèi)功心法有了,無論什么招式都可以發(fā)揮最大威力
     */
    @Override
    protected String consumerName() {
        return "RocketMQ二次封裝消息消費(fèi)者";
    }

    @Override
    public void onMessage(RocketMqEntityMessage message) {
        // 注意,此時這里沒有直接處理業(yè)務(wù),而是先委派給父類做基礎(chǔ)操作,然后父類做完基礎(chǔ)操作后會調(diào)用子類的實(shí)際處理類型
        super.dispatchMessage(message);
    }

    @Override
    protected void handleMessage(RocketMqEntityMessage message) throws Exception {
        // 此時這里才是最終的業(yè)務(wù)處理,代碼只需要處理資源類關(guān)閉異常,其他的可以交給父類重試
        System.out.println("業(yè)務(wù)消息處理");
    }

    @Override
    protected void overMaxRetryTimesMessage(RocketMqEntityMessage message) {
        // 當(dāng)超過指定重試次數(shù)消息時此處方法會被調(diào)用
        // 生產(chǎn)中可以進(jìn)行回退或其他業(yè)務(wù)操作
    }

    @Override
    protected boolean isRetry() {
        return false;
    }

    @Override
    protected int maxRetryTimes() {
        // 指定需要的重試次數(shù),超過重試次數(shù)overMaxRetryTimesMessage會被調(diào)用
        return 5;
    }

    @Override
    protected boolean isThrowException() {
        // 是否拋出異常,到消費(fèi)異常時是被父類攔截處理還是直接拋出異常
        return false;
    }
}
  • 封裝后對于子類來說,只需要告訴父類要不要做就擁有了最開始說的所有功能,簡化了使用,此時子類消費(fèi)者只需要專注于自己的業(yè)務(wù)核心處理就可以了

2.4 廣播消息的應(yīng)用場景

  • 應(yīng)用場景:多租戶或者服務(wù)有內(nèi)部緩存需要刷新情況下如果需要刷新租戶信息或者緩存信息
  • 也就是需要所有服務(wù)節(jié)點(diǎn)都需要同事做某一件事情的時候,此時可以借助廣播消息發(fā)送消息到所有節(jié)點(diǎn)刷新,無需一個節(jié)點(diǎn)一個節(jié)點(diǎn)的處理
  • 特別說明:廣播消息默認(rèn)會在家目錄下創(chuàng)建消費(fèi)進(jìn)度文件,會以www.tianxincoord.com:9876@www.tianxincoord.com:9876這種地址形式生成文件路徑,但是由于帶有:符號,windows下是不允許此符號作為文件夾名稱的,所以如果rocketMQ的鏈接地址不是連接串(不帶有端口)可以取消下面的messageModel注釋,否則啟動的時候就會提示目標(biāo)卷或者路徑不存在,其實(shí)是因?yàn)檫@個問題
package com.codecoord.rocketmq.listener;

import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * 廣播消息
 * 應(yīng)用場景:多租戶或者服務(wù)有內(nèi)部緩存需要刷新情況下如果需要刷新租戶信息或者緩存信息
 *      也就是需要所有服務(wù)節(jié)點(diǎn)都需要同事做某一件事情的時候
 * 此時可以借助廣播消息發(fā)送消息到所有節(jié)點(diǎn)刷新,無需一個節(jié)點(diǎn)一個節(jié)點(diǎn)的處理
 *
 * 特別說明:廣播消息默認(rèn)會在家目錄下創(chuàng)建消費(fèi)進(jìn)度文件,會以www.tianxincoord.com:9876@www.tianxincoord.com:9876
 *      這種地址形式生成文件路徑,但是由于帶有:符號,windows下是不允許此符號作為文件夾名稱的
 *      所以如果rocketMQ的鏈接地址不是連接串(不帶有端口)可以取消下面的messageModel注釋
 *      否則啟動的時候就會提示目標(biāo)卷或者路徑不存在,其實(shí)是因?yàn)檫@個問題
 *
 * @author tianxincoord@163.com
 * @since 2022/5/12
 */
@Slf4j
@Component
@RocketMQMessageListener(
        topic = RocketMqBizConstant.SOURCE_TOPIC,
        consumerGroup = RocketMqBizConstant.SOURCE_BROADCASTING_GROUP,
        selectorExpression = RocketMqBizConstant.SOURCE_BROADCASTING_TAG
        // messageModel = MessageModel.BROADCASTING
)
public class RocketBroadcastingListener implements RocketMQListener<MessageExt> {

    /**
     * MessageExt:內(nèi)置的消息實(shí)體,生產(chǎn)中根據(jù)需要自己封裝實(shí)體
     */
    @Override
    public void onMessage(MessageExt message) {
        log.info("收到廣播消息【{}】", new String(message.getBody()));
    }
}

2.3 代碼封裝完結(jié)測試

封裝測試大家可以直接參考RocketMqController即可

package com.codecoord.rocketmq.controller;

import com.alibaba.fastjson.JSONObject;
import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import com.codecoord.rocketmq.domain.RocketMqEntityMessage;
import com.codecoord.rocketmq.template.OrderMessageTemplate;
import com.codecoord.rocketmq.template.RocketMqTemplate;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.UUID;

/**
 * 消息發(fā)送
 *
 * @author tianxin01@huice.com
 * @since 2022/6/16
 */
@RestController
@RequestMapping("/rocketmq")
@Slf4j
public class RocketMqController {
    /**
     * 注意此處注入的是封裝的RocketMqTemplate
     */
    @Resource
    private RocketMqTemplate rocketMqTemplate;
    /**
     * 注入對應(yīng)業(yè)務(wù)的模板類
     */
    @Resource
    private OrderMessageTemplate orderMessageTemplate;

    /**
     * 通過實(shí)體類發(fā)送消息,發(fā)送注意事項(xiàng)請參考實(shí)體類
     * 說明:也可以在RocketMqTemplate按照業(yè)務(wù)封裝發(fā)送方法,這樣只需要調(diào)用方法指定基礎(chǔ)業(yè)務(wù)消息接口
     */
    @RequestMapping("/entity/message")
    public Object sendMessage() {
        RocketMqEntityMessage message = new RocketMqEntityMessage();
        // 設(shè)置業(yè)務(wù)key
        message.setKey(UUID.randomUUID().toString());
        // 設(shè)置消息來源,便于查詢we年
        message.setSource("封裝測試");
        // 業(yè)務(wù)消息內(nèi)容
        message.setMessage("當(dāng)前消息發(fā)送時間為:" + LocalDateTime.now());
        // Java時間字段需要單獨(dú)處理,否則會序列化失敗
        message.setBirthday(LocalDate.now());
        message.setTradeTime(LocalDateTime.now());
        return rocketMqTemplate.send(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.SOURCE_TAG, message);
    }

    /**
     * 此時對于調(diào)用者而且,無需創(chuàng)建任何類
     * 如果某天需要調(diào)整消息發(fā)送來源,如果不封裝,所有原來產(chǎn)生message的地方全部改
     * 如果封裝了,只需要改sendOrderPaid就可以切換
     */
    @RequestMapping("/order/paid")
    public Object sendOrderPaidMessage() {
        return orderMessageTemplate.sendOrderPaid(UUID.randomUUID().toString(), "客戶下單了...,快快備貨");
    }

    /**
     * 直接將對象進(jìn)行傳輸,也可以自己進(jìn)行json轉(zhuǎn)化后傳輸
     */
    @RequestMapping("/messageExt/message")
    public SendResult convertAndSend() {
        // 生產(chǎn)中不推薦使用jsonObject傳遞,不看發(fā)送者無法知道傳遞的消息包含什么信息
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("type", "messageExt");
        String destination = rocketMqTemplate.buildDestination(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.SOURCE_BROADCASTING_TAG);
        // 如果要走內(nèi)部方法發(fā)送則必須要按照標(biāo)準(zhǔn)來,否則就使用原生的消息發(fā)送
        return rocketMqTemplate.getTemplate().syncSend(destination, jsonObject);
    }
}

到此這篇關(guān)于RocketMQ整合SpringBoot實(shí)現(xiàn)生產(chǎn)級二次封裝的文章就介紹到這了,更多相關(guān)RocketMQ SpringBoot封裝內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • SpringBoot Starter依賴原理與實(shí)例詳解

    SpringBoot Starter依賴原理與實(shí)例詳解

    SpringBoot中的starter是一種非常重要的機(jī)制,能夠拋棄以前繁雜的配置,將其統(tǒng)一集成進(jìn)starter,應(yīng)用者只需要在maven中引入starter依賴,SpringBoot就能自動掃描到要加載的信息并啟動相應(yīng)的默認(rèn)配置。starter讓我們擺脫了各種依賴庫的處理,需要配置各種信息的困擾
    2022-09-09
  • mybatis水平分表實(shí)現(xiàn)動態(tài)表名的項(xiàng)目實(shí)例

    mybatis水平分表實(shí)現(xiàn)動態(tài)表名的項(xiàng)目實(shí)例

    本文主要介紹了mybatis水平分表實(shí)現(xiàn)動態(tài)表名的項(xiàng)目實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2022-07-07
  • idea如何將指定目錄打成jar包

    idea如何將指定目錄打成jar包

    這篇文章主要介紹了idea如何將指定目錄打成jar包問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-09-09
  • Java運(yùn)行Jar包內(nèi)存配置的操作

    Java運(yùn)行Jar包內(nèi)存配置的操作

    這篇文章主要介紹了Java運(yùn)行Jar包內(nèi)存配置的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-01-01
  • JavaWeb利用struts實(shí)現(xiàn)文件下載時改變文件名稱

    JavaWeb利用struts實(shí)現(xiàn)文件下載時改變文件名稱

    這篇文章主要為大家詳細(xì)介紹了JavaWeb利用struts實(shí)現(xiàn)文件下載時改變文件名稱的相關(guān)資料,需要的朋友可以參考下
    2016-06-06
  • Java基于ArrayList實(shí)現(xiàn)群主發(fā)紅包功能

    Java基于ArrayList實(shí)現(xiàn)群主發(fā)紅包功能

    這篇文章主要介紹了Java基于ArrayList實(shí)現(xiàn)群主發(fā)紅包功能,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-09-09
  • 詳解SpringBoot如何實(shí)現(xiàn)緩存預(yù)熱

    詳解SpringBoot如何實(shí)現(xiàn)緩存預(yù)熱

    緩存預(yù)熱是指在 Spring Boot 項(xiàng)目啟動時,預(yù)先將數(shù)據(jù)加載到緩存系統(tǒng)(如 Redis)中的一種機(jī)制,下面我們就來看看SpringBoot是如何實(shí)現(xiàn)緩存預(yù)熱的吧
    2024-01-01
  • Mockito mock Kotlin Object類方法報錯解決方法

    Mockito mock Kotlin Object類方法報錯解決方法

    這篇文章主要介紹了Mockito mock Kotlin Object類方法報錯解決方法,本篇文章通過簡要的案例,講解了該項(xiàng)技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下
    2021-09-09
  • 理解Java注解及Spring的@Autowired是如何實(shí)現(xiàn)的

    理解Java注解及Spring的@Autowired是如何實(shí)現(xiàn)的

    今天通過本文帶領(lǐng)大家學(xué)習(xí)注解的基礎(chǔ)知識,學(xué)習(xí)Spring的@Autowired是怎么實(shí)現(xiàn)的,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧
    2021-07-07
  • Redis 集成Spring的示例代碼(spring-data-redis)

    Redis 集成Spring的示例代碼(spring-data-redis)

    本篇文章主要介紹了Redis 集成Spring的示例代碼(spring-data-redis) ,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-09-09

最新評論