欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ整合SpringBoot實(shí)現(xiàn)生產(chǎn)級(jí)二次封裝

 更新時(shí)間:2022年06月20日 09:19:27   作者:TianXinCoord  
本文主要介紹了RocketMQ整合SpringBoot實(shí)現(xiàn)生產(chǎn)級(jí)二次封裝,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧

前言說(shuō)明

前置掌握:SpringBoot基礎(chǔ)使用、RocketMQ和SpringBoot的整合使用

  • 主要使用參考第二節(jié)
  • 核心使用參考第一篇文章

文章難度:四顆星

代碼不難,重點(diǎn)是封裝的思想需要體會(huì)

不同觀點(diǎn)歡迎大家在評(píng)論區(qū)一起討論學(xué)習(xí),沒(méi)有對(duì)錯(cuò)之分,每個(gè)系統(tǒng)業(yè)務(wù)特性不同,適合系統(tǒng)的才是最好的~

源碼地址https://gitee.com/tianxincoder/practice-rocketmq-enterprise文章只會(huì)說(shuō)明核心代碼,其他的基礎(chǔ)整合配置和多環(huán)境自動(dòng)隔離參考源碼即可

一、為什么要二次封裝

為了不產(chǎn)生歧義,文章中提到的二次封裝均是基于原始使用方式的封裝,而非源碼級(jí)別的二次封裝

換句話說(shuō):如果都需要對(duì)源碼進(jìn)行封裝了,那么說(shuō)明公司業(yè)務(wù)規(guī)模都到一定程度了,二次封裝這種東西已經(jīng)不需要討論了,封裝已經(jīng)是一個(gè)共識(shí)

  • 首先明確一點(diǎn):不進(jìn)行二次封裝完全不影響RocketMQ的使用,可以選擇二次封裝和不選擇二次封裝
    • 二次封裝可以提供更多的功能和更簡(jiǎn)潔的使用方式
    • 如果一個(gè)封裝搞得比原始使用方式更復(fù)雜,那么就失去了二次封裝的意義
    • Q1:二次封裝可不可以不要?完全可以,完全不影響正常使用
    • Q2:二次封裝有沒(méi)有必要?仁者見(jiàn)仁智者見(jiàn)智的問(wèn)題,如果覺(jué)得沒(méi)有必要那么這篇文章可以跳過(guò)~
  • ORM框架中典型的一個(gè)二次封裝框架就是MyBatisPlus(簡(jiǎn)稱MP),后者是對(duì)MyBatis原生使用的增加,不使用MP直接使用MyBatis可不可以?完全可以,那為什么要用二次封裝后的MP?
    • 場(chǎng)景:大部分的數(shù)據(jù)庫(kù)操作,無(wú)外乎CRUD,那么最常用的比如(根據(jù)名稱就可以知道這個(gè)方法做什么用,就沒(méi)有必要再二次說(shuō)明了):updateById、batchUpdate、deleteById、saveOrUpdate、batchInsert。對(duì)于上面這5個(gè)操作,變化的只是表字段和表名,剩下的語(yǔ)法都是一樣
    • 不封裝:直接使用MyBatis完全可以自己實(shí)現(xiàn)上面方法的功能,但是每個(gè)表都需要寫(xiě)一遍自己上面的方法,假設(shè)有100張表,那么就會(huì)多出495個(gè)(下面說(shuō)明)重復(fù)功能代碼,而且所有代碼都是冗余的
    • 封裝后:由封裝者提供上面5個(gè)方法的公共實(shí)現(xiàn),然后所有需要使用上面功能的Service只需要繼承封裝好的類就自然的擁有了上面的5大功能,那么代碼的冗余量就從100張表*5個(gè)方法==500,去掉封裝的5個(gè),節(jié)省了495的代碼冗余量
  • 所以二次封裝是為了更方便用、更簡(jiǎn)潔、更加適用于系統(tǒng),量身打造可以大大提升開(kāi)發(fā)效率。就如上面的5個(gè)方法,完全重復(fù)性的東西為什么要浪費(fèi)開(kāi)發(fā)時(shí)間來(lái)做這些冗余的事情呢?

1.1 二次封裝不同觀點(diǎn)

讓我們以一個(gè)生活中的蛋炒飯開(kāi)個(gè)頭

原始框架好比提供了原材料:廚具、雞蛋,米飯等食材、菜譜

  • 對(duì)于框架的使用通常有以下兩種方式
  • 第一種:根據(jù)菜譜來(lái)進(jìn)行做飯(使用原生的方法調(diào)用),洗菜、做飯、刷碗、打掃不管啥入?yún)⒆约汗?/li>
  • 第二種:找一個(gè)人來(lái)學(xué)會(huì)這道菜(負(fù)責(zé)二次封裝的人)的多種做法(封裝大部分業(yè)務(wù)場(chǎng)景)并做成一種點(diǎn)餐式的服務(wù),誰(shuí)想吃哪種類型的蛋炒飯直接點(diǎn)餐(調(diào)用封裝好的方法)就可以吃上香噴噴的蛋炒飯

問(wèn)題:哪種方案更好? 答案:兩種各有各的優(yōu)勢(shì)(在說(shuō)廢話,哈哈~)

  • 第一種:原生方式
    • 優(yōu)點(diǎn):可以按照各種方式靈活調(diào)用,比如每個(gè)人都使用RocketMQTemplate原生send發(fā)送消息,想要發(fā)送什么類型的消息就發(fā)送什么類型的消息,比較隨意
    • 缺點(diǎn):代碼大量冗余,從構(gòu)建參數(shù)對(duì)象、發(fā)送消息、消費(fèi)消息、異常處理、日志記錄、異常重試啥啥的都是自己搞,每個(gè)消費(fèi)隊(duì)列就會(huì)出現(xiàn)上面所有的步驟。比如現(xiàn)在有一個(gè)訂單處理中心A接收來(lái)自各種類型訂單,此時(shí)如B、C兩個(gè)原始訂單來(lái)源想讓A處理訂單,那么B和C都需要按照A的要求進(jìn)行調(diào)用,代碼會(huì)冗余
  • 第二種:點(diǎn)餐式服務(wù)
    • 優(yōu)點(diǎn):封裝了大部分統(tǒng)一的方法調(diào)用,比如 發(fā)送消息、異常處理、日志記錄等等都是重復(fù)的,封裝后點(diǎn)餐的人不需要再關(guān)系這一部分要怎么處理,只需要告訴點(diǎn)餐服務(wù)要不要進(jìn)行異常、要不異常重試等等,那么此時(shí)對(duì)于點(diǎn)餐人來(lái)說(shuō)只需要 付錢(qián)(調(diào)用服務(wù))、吃飯(消費(fèi)消息),除此之外啥也不用管,全部由點(diǎn)餐服務(wù)提供者完成所有上述兩個(gè)步驟外的其它操作
    • 缺點(diǎn):無(wú)法滿足所有點(diǎn)餐人的要求,有的人喜歡味道重一點(diǎn),有的喜歡味道淡一點(diǎn)。但是這個(gè)缺點(diǎn)完全可以處理,比如點(diǎn)餐服務(wù)提供了自定義廚房(返回原始發(fā)送對(duì)象),此時(shí)調(diào)用者可以按照第一種方式進(jìn)行使用
  • 選哪種?
    • 個(gè)人而言:業(yè)務(wù)系統(tǒng)復(fù)雜的優(yōu)先選擇第二種 ,簡(jiǎn)單業(yè)務(wù)的選擇第一種(盡量采用封裝,后續(xù)維護(hù)方便)。對(duì)于一個(gè)復(fù)雜的系統(tǒng),本身業(yè)務(wù)級(jí)的代碼就已經(jīng)很多了,結(jié)果還要每個(gè)人處理全部一樣的東西,消費(fèi)者越多代碼冗余越多。如果一個(gè)系統(tǒng)只是為了使用MQ來(lái)進(jìn)行業(yè)務(wù)分離,消費(fèi)者也不多,那么可以選擇最快的方式,但是最終會(huì)選擇第二種,如果業(yè)務(wù)隨著時(shí)間增長(zhǎng)越復(fù)雜,越晚改成第二種花費(fèi)的代價(jià)越大!
    • 第一種就好比此時(shí)我們要直接操作內(nèi)存,原生操作就好比C++或C,可以直接操作內(nèi)存,但是同時(shí)用完后還要自己寫(xiě)各種異常處理和釋放內(nèi)存;代碼封裝就好比Java,我們只需要告訴Java我們要使用內(nèi)存,然后用完就不用管
  • 企業(yè)中,業(yè)務(wù)功能產(chǎn)出是一級(jí)優(yōu)先級(jí),在此之上才能有更高級(jí)的東西。技術(shù)服務(wù)于業(yè)務(wù),而不是業(yè)務(wù)服務(wù)于技術(shù)!比如現(xiàn)在30個(gè)人的系統(tǒng),我們要使用緩存加速訪問(wèn),那么我們是選擇 內(nèi)部緩存(直接用集合或者map存起來(lái))還是用Redis?
    • 內(nèi)部緩存和Redis能不能達(dá)到目的?能
    • 哪個(gè)更方便更快??jī)?nèi)部緩存!內(nèi)部對(duì)象就很快實(shí)現(xiàn)
    • 如果業(yè)務(wù)發(fā)展遲早會(huì)轉(zhuǎn)為Redis這種專業(yè)的緩存中間件,就好比業(yè)務(wù)發(fā)展前第一種,業(yè)務(wù)發(fā)展后選擇第二種,但是對(duì)于大部分業(yè)務(wù)系統(tǒng)來(lái)說(shuō)功能增加是很快的,特別是產(chǎn)品同事上一分鐘提需求下一分鐘就要上線這種(開(kāi)個(gè)玩笑~),所以我們?cè)谝靡粋€(gè)技術(shù)需不需要進(jìn)行二次封裝時(shí)需要技術(shù)負(fù)責(zé)人對(duì)業(yè)務(wù)增長(zhǎng)有一個(gè)預(yù)判。建議是都進(jìn)行封裝一下

1.2 封裝的抽離點(diǎn)

  • 對(duì)于二次封裝,其中最主要的就是找出該框架在日常使用中所出現(xiàn)的大部分涉及到的操作,然后找出變化操作和不變化操作
  • RocketMQ日常使用主要場(chǎng)景為例:
    • 發(fā)送消息階段:準(zhǔn)備需要發(fā)送的消息、發(fā)送消息、記錄原始消息日志、發(fā)送失敗處理、可靠性處理
    • 消費(fèi)消息階段:記錄接收消息日志、業(yè)務(wù)處理、業(yè)務(wù)日志記錄、異常處理、異常重試、異常通知、死信處理
  • 提取變化點(diǎn)和不變化點(diǎn)(可以抽取為公共處理的場(chǎng)景)
    • 發(fā)送消息階段:
      • 變化點(diǎn):準(zhǔn)備需要發(fā)送的消息
      • 不變化點(diǎn):發(fā)送消息、記錄原始消息日志、發(fā)送失敗處理、可靠性處理
    • 消費(fèi)消息階段
      • 變化點(diǎn):業(yè)務(wù)處理、業(yè)務(wù)日志記錄
      • 不變化點(diǎn):記錄接收消息日志、異常處理、異常重試、異常通知、死信處理
  • 從上可以看到,對(duì)于RocketMQ的使用,大部分場(chǎng)景都是可以抽離成一個(gè)公共的方法處理,只有業(yè)務(wù)級(jí)的需要自己處理,所以如果我們把不變化場(chǎng)景抽取后,每個(gè)同事只需要寫(xiě)自己業(yè)務(wù)相關(guān)部分即可
  • 抽取后的復(fù)雜度:對(duì)于新加一個(gè)消費(fèi)者,只需要處理業(yè)務(wù)相關(guān)三個(gè)場(chǎng)景(準(zhǔn)備需要發(fā)送的消息、業(yè)務(wù)處理、業(yè)務(wù)日志記錄),剩下的九個(gè)場(chǎng)景,只需要封裝一次就可以。需要現(xiàn)在就幾十個(gè)消費(fèi)者,可以想想一些減少了多少代碼冗余

1.3 設(shè)計(jì)模式的應(yīng)用

  • 要封裝出一個(gè)好的抽象層,【設(shè)計(jì)模式】建議好好體會(huì)和學(xué)習(xí)一下
  • 設(shè)計(jì)模式對(duì)于用不到的人來(lái)說(shuō)比較虛幻,對(duì)于用的到的人來(lái)說(shuō),這個(gè)真牛X

二、二次封裝核心要點(diǎn)

2.1 二次封裝核心點(diǎn)

2.1.1 封裝主要討論點(diǎn)

  • 對(duì)于RocketMQ或者說(shuō)對(duì)于整個(gè)MQ體系來(lái)說(shuō)(不管是RabbitMQ、RocketMQ、Kafka)等封裝的核心主要有兩個(gè):發(fā)送消息、消費(fèi)消息者兩個(gè)場(chǎng)景
  • 對(duì)于RocketMQ我們主要討論三個(gè)地方:RocketMQTemplate封裝、RocketMQListener封裝和廣播消息的封裝
  • 廣播消息是分布式系統(tǒng)中同時(shí)讓所有節(jié)點(diǎn)都干一件事情的一個(gè)好的方式,如果用不到忽略廣播消息即可

2.1.2 發(fā)送/消費(fèi)的幾種消息實(shí)體

  • RocketMQ發(fā)送消息對(duì)于不同的使用來(lái)說(shuō),大部分選擇下面的幾種發(fā)送消息類型
    • A、發(fā)送Json對(duì)象,比如Fastjson的JSONObject
    • B、直接發(fā)送轉(zhuǎn)Json后的String對(duì)象
    • C、根據(jù)業(yè)務(wù)封裝對(duì)應(yīng)實(shí)體類
    • D、直接使用原生MessageExt接收
  • 怎么選擇?怎么選擇才是最優(yōu)?
    • 上面哪一種都可以達(dá)到目的,如果要統(tǒng)一封裝就必須要有一個(gè)標(biāo)準(zhǔn)
    • 怎么選擇只需要回答這個(gè)問(wèn)題:在不看消息發(fā)送者的情況下,消費(fèi)者怎么知道發(fā)送者發(fā)送的消息含義?
    • 比如現(xiàn)在有一個(gè)訂單消息,如果我們不看消息發(fā)送者,怎么知道發(fā)送者給消費(fèi)者發(fā)送哪些字段
      • A、B、D可以嗎?一定不可以!JSON對(duì)象和String對(duì)象,如果我們不看消息發(fā)送者不可能知道到底發(fā)送了啥,這點(diǎn)我相信沒(méi)有可以討論的地方,因?yàn)轭愋蜎Q定了這個(gè)操作不可能
      • C可以嗎?可以!此時(shí)不需要看消息發(fā)送者,只需要看消費(fèi)者的實(shí)體類點(diǎn)進(jìn)去,有哪些業(yè)務(wù)字段一清二楚
      • 可能有杠要抬了,有看實(shí)體類的功夫,我看消息發(fā)送者都看完了
        • 靈魂拷問(wèn)1:如果消息發(fā)送者和消費(fèi)者不在一個(gè)系統(tǒng)怎么看?邪魅一笑,不同業(yè)務(wù)線可能沒(méi)代碼權(quán)限吧?分布式系統(tǒng)完全獨(dú)立可能吧?
        • 靈魂拷問(wèn)2:如果現(xiàn)在需要一個(gè)功能,如果某些必須要的字段消息發(fā)送者如果沒(méi)有給的話需要校驗(yàn),普通String和JSONObject怎么實(shí)現(xiàn)?換成實(shí)體類呢?
  • 基于上述討論點(diǎn),封裝建議基于實(shí)體類來(lái),實(shí)體類不管是排查問(wèn)題、新人熟悉系統(tǒng)代碼、信息校驗(yàn)等String和JSONObject無(wú)法像實(shí)體類一樣輕松勝任

2.2 RocketMQTemplate封裝

2.2.1 封裝基礎(chǔ)實(shí)體類

  • 基礎(chǔ)消息實(shí)體類封裝了除了業(yè)務(wù)消息外所有其他公共字段,主要看下面代碼中的字段和注釋
  • 基礎(chǔ)抽象消息實(shí)體,包含基礎(chǔ)的消息、根據(jù)自己的業(yè)務(wù)消息設(shè)置更多的字段
    • 其中也可以包含所有消費(fèi)者可能用得到的方法等,比如做些數(shù)據(jù)的加解密
package com.codecoord.rocketmq.domain;

import lombok.Data;

import java.time.LocalDateTime;
import java.util.UUID;

/**
 * 基礎(chǔ)消息實(shí)體,包含基礎(chǔ)的消息
 * 根據(jù)自己的業(yè)務(wù)消息設(shè)置更多的字段
 *
 * @author tianxincoord@163.com
 * @since 2022/6/16
 */
@Data
public abstract class BaseMqMessage {
    /**
     * 業(yè)務(wù)鍵,用于RocketMQ控制臺(tái)查看消費(fèi)情況
     */
    protected String key;
    /**
     * 發(fā)送消息來(lái)源,用于排查問(wèn)題
     */
    protected String source = "";
    /**
     * 發(fā)送時(shí)間
     */
    protected LocalDateTime sendTime = LocalDateTime.now();
    /**
     * 跟蹤id,用于slf4j等日志記錄跟蹤id,方便查詢業(yè)務(wù)鏈
     */
    protected String traceId = UUID.randomUUID().toString();
    /**
     * 重試次數(shù),用于判斷重試次數(shù),超過(guò)重試次數(shù)發(fā)送異常警告
     */
    protected Integer retryTimes = 0;
}
  • 有了此基礎(chǔ)抽象實(shí)體類,那么剩下的所有業(yè)務(wù)消息實(shí)體只需要繼承此基類,然后在自己業(yè)務(wù)類中包含自己需要的字段即可,因?yàn)檫@些公共字段不管是向上轉(zhuǎn)型還是向下轉(zhuǎn)型,子類和父類都可以看得到

2.2.2 RocketMQTemplate

  • RocketMQTemplate發(fā)送消息的代碼如果不封裝,我們發(fā)送消息需要這樣
    • String destination = topic + ":" + tag;
    • template.syncSend(destination, message);
  • 每個(gè)人發(fā)送消息都要自己處理這個(gè)冒號(hào),直接傳入topic和tag不香嗎?按照抽離變化點(diǎn)中的變化點(diǎn),只有消息是變化的,除此之外的其他規(guī)則交給封裝類
  • RocketMQTemplate主要封裝發(fā)送消息的日志、異常的處理、消息key設(shè)置、等等其他配置
  • 封裝代碼類如下,下面包含了主要發(fā)送方式,更多自己添加即可
    • 這里就是消息發(fā)送的點(diǎn)餐機(jī)器,同時(shí)也提供了封裝方法也提供原始RocketMQTemplate供使用
    • 此處只是提供一種方式,生產(chǎn)中按照項(xiàng)目組商量決定
package com.codecoord.rocketmq.template;

import com.alibaba.fastjson.JSONObject;
import com.codecoord.rocketmq.constant.RocketMqSysConstant;
import com.codecoord.rocketmq.domain.BaseMqMessage;
import com.codecoord.rocketmq.util.JsonUtil;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * RocketMQ模板類
 *
 * @author tianxincoord@163.com
 * @since 2022/4/15
 */
@Component
public class RocketMqTemplate {
    private static final Logger LOGGER = LoggerFactory.getLogger(RocketMqTemplate.class);
    @Resource(name = "rocketMQTemplate")
    private RocketMQTemplate template;

    /**
     * 獲取模板,如果封裝的方法不夠提供原生的使用方式
     */
    public RocketMQTemplate getTemplate() {
        return template;
    }

    /**
     * 構(gòu)建目的地
     */
    public String buildDestination(String topic, String tag) {
        return topic + RocketMqSysConstant.DELIMITER + tag;
    }

    /**
     * 發(fā)送同步消息
     */
    public <T extends BaseMqMessage> SendResult send(String topic, String tag, T message) {
        // 注意分隔符
        return send(topic + RocketMqSysConstant.DELIMITER + tag, message);
    }

    public <T extends BaseMqMessage> SendResult send(String destination, T message) {
        // 設(shè)置業(yè)務(wù)鍵,此處根據(jù)公共的參數(shù)進(jìn)行處理
        // 更多的其它基礎(chǔ)業(yè)務(wù)處理...
        Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
        SendResult sendResult = template.syncSend(destination, sendMessage);
        // 此處為了方便查看給日志轉(zhuǎn)了json,根據(jù)選擇選擇日志記錄方式,例如ELK采集
        LOGGER.info("[{}]同步消息[{}]發(fā)送結(jié)果[{}]", destination, JsonUtil.toJson(message), JSONObject.toJSON(sendResult));
        return sendResult;
    }

    /**
     * 發(fā)送延遲消息
     */
    public <T extends BaseMqMessage> SendResult send(String topic, String tag, T message, int delayLevel) {
        return send(topic + RocketMqSysConstant.DELIMITER + tag, message, delayLevel);
    }

    public <T extends BaseMqMessage> SendResult send(String destination, T message, int delayLevel) {
        Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
        SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel);
        LOGGER.info("[{}]延遲等級(jí)[{}]消息[{}]發(fā)送結(jié)果[{}]", destination, delayLevel, JsonUtil.toJson(message), JsonUtil.toJson(sendResult));
        return sendResult;
    }
}
  • 這個(gè)類是最基礎(chǔ)的原始封裝類,相當(dāng)于餐館提供的點(diǎn)餐服務(wù)。上面提供無(wú)業(yè)務(wù)特性的發(fā)送,比如想要發(fā)送日志消息或者動(dòng)態(tài)發(fā)送消息目的場(chǎng)景

3.2.3 增強(qiáng)RocketMQTemplate

  • 以訂單處理中心來(lái)說(shuō),變化點(diǎn)僅僅只是單號(hào)等業(yè)務(wù)數(shù)據(jù)不一樣,發(fā)往訂單處理中心的消息不管是topic還是tag等等其實(shí)完全都一樣,那么此時(shí)可以根據(jù)業(yè)務(wù)來(lái)增加封裝
  • 增強(qiáng)原始功能需要注意下面兩個(gè)點(diǎn)
    • 所有父類能出現(xiàn)的地方,子類都能出現(xiàn):也就是子類擁有功能 >= 父類 ,比如Java的List,只要入?yún)⑹荓ist的地方,傳ArrayList和LinkedList完全可以
    • 增強(qiáng)功能不能改變?cè)脊δ艿男袨?/strong>:比如父類有一個(gè)方法say是說(shuō)話,結(jié)果子類覆寫(xiě)了say改成了行為是吃飯,然后當(dāng)調(diào)用者調(diào)用say的時(shí)候得到了一個(gè)完全預(yù)期外的結(jié)果
  • 就以訂單中心消息發(fā)送為例,封裝OrderMessageTemplate繼承自RocketMqTemplate,此時(shí)前者就擁有了封裝父類的所有基礎(chǔ)方法,擁有了所有父類的功能。然后可以在前者增加自身業(yè)務(wù)特性的發(fā)送方法,比如發(fā)送訂單處理消息
package com.codecoord.rocketmq.template;

import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import com.codecoord.rocketmq.domain.RocketMqEntityMessage;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import javax.validation.constraints.NotNull;
import java.time.LocalDate;
import java.time.LocalDateTime;

/**
 * 訂單類發(fā)送消息模板工具類
 *
 * @author tianxincode@163.com
 * @since 2022/6/16
 */
@Component
public class OrderMessageTemplate extends RocketMqTemplate {
    /// 如果不采用繼承也可以直接注入使用
    /* @Resource
    private RocketMqTemplate rocketMqTemplate; */

    /**
     * 入?yún)⒅恍枰獋魅胧悄膫€(gè)訂單號(hào)和業(yè)務(wù)體消息即可,其他操作根據(jù)需要處理
     * 這樣對(duì)于調(diào)用者而言,可以更加簡(jiǎn)化調(diào)用
     */
    public SendResult sendOrderPaid(@NotNull String orderId, String body) {
        RocketMqEntityMessage message = new RocketMqEntityMessage();
        message.setKey(orderId);
        message.setSource("訂單支付");
        message.setMessage(body);
        // 這兩個(gè)字段只是為了測(cè)試
        message.setBirthday(LocalDate.now());
        message.setTradeTime(LocalDateTime.now());
        return send(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.ORDER_PAID_TAG, message);
    }
}
  • 此時(shí)對(duì)于調(diào)用者只需要 orderMessageTemplate.sendOrderPaid("O001", "xxx");就可以把消息發(fā)送到訂單處理中心
  • 封裝后的好處,假如現(xiàn)在有10個(gè)訂單來(lái)源,現(xiàn)在需要調(diào)整消息發(fā)送格式,如果不進(jìn)行封裝那么10個(gè)來(lái)源發(fā)送的地方都需要改;如果進(jìn)行了二次封裝,只需要改sendOrderPaid方法即可,而且還不會(huì)出錯(cuò),此時(shí)優(yōu)勢(shì)就體現(xiàn)出來(lái)了

2.3 RocketMQListener封裝

  • RocketMQListener是消費(fèi)消息的核心,同時(shí)也涉及到更多的操作,比如:基礎(chǔ)日志記錄、異常處理、消息重試、警告通知等等等
  • 按照抽離變化點(diǎn),RocketMQListener只應(yīng)該處理與自身業(yè)務(wù)相關(guān)的,除此之外的其它應(yīng)該交給父類,子類只需要告訴父類要不要異常處理、要不要重試等等,點(diǎn)餐式服務(wù)
  • 封裝消息消費(fèi)的抽象類
    • 注意泛型限定為標(biāo)準(zhǔn)基礎(chǔ)消息類,這樣能到消費(fèi)者的一定有統(tǒng)一的標(biāo)準(zhǔn)類BaseMqMessage
    • 下面簡(jiǎn)單封裝示例
package com.codecoord.rocketmq.listener;

import com.codecoord.rocketmq.constant.RocketMqSysConstant;
import com.codecoord.rocketmq.domain.BaseMqMessage;
import com.codecoord.rocketmq.template.RocketMqTemplate;
import com.codecoord.rocketmq.util.JsonUtil;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import javax.annotation.Resource;
import java.time.Instant;
import java.util.Objects;

/**
 * 抽象消息監(jiān)聽(tīng)器,封裝了所有公共處理業(yè)務(wù),如
 * 1、基礎(chǔ)日志記錄
 * 2、異常處理
 * 3、消息重試
 * 4、警告通知
 * 5、....
 *
 * @author tianxincoord@163.com
 * @since 2022/4/17
 */
public abstract class BaseMqMessageListener<T extends BaseMqMessage> {
    /**
     * 這里的日志記錄器是哪個(gè)子類的就會(huì)被哪個(gè)子類的類進(jìn)行初始化
     */
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Resource
    private RocketMqTemplate rocketMqTemplate;

    /**
     * 消息者名稱
     *
     * @return 消費(fèi)者名稱
     */
    protected abstract String consumerName();

    /**
     * 消息處理
     *
     * @param message 待處理消息
     * @throws Exception 消費(fèi)異常
     */
    protected abstract void handleMessage(T message) throws Exception;

    /**
     * 超過(guò)重試次數(shù)消息,需要啟用isRetry
     *
     * @param message 待處理消息
     */
    protected abstract void overMaxRetryTimesMessage(T message);
    /**
     * 是否過(guò)濾消息,例如某些
     *
     * @param message 待處理消息
     * @return true: 本次消息被過(guò)濾,false:不過(guò)濾
     */
    protected boolean isFilter(T message) {
        return false;
    }

    /**
     * 是否異常時(shí)重復(fù)發(fā)送
     *
     * @return true: 消息重試,false:不重試
     */
    protected abstract boolean isRetry();

    /**
     * 消費(fèi)異常時(shí)是否拋出異常
     *
     * @return true: 拋出異常,false:消費(fèi)異常(如果沒(méi)有開(kāi)啟重試則消息會(huì)被自動(dòng)ack)
     */
    protected abstract boolean isThrowException();

    /**
     * 最大重試次數(shù)
     *
     * @return 最大重試次數(shù),默認(rèn)10次
     */
    protected int maxRetryTimes() {
        return 10;
    }

    /**
     * isRetry開(kāi)啟時(shí),重新入隊(duì)延遲時(shí)間
     *
     * @return -1:立即入隊(duì)重試
     */
    protected int retryDelayLevel() {
        return -1;
    }

    /**
     * 由父類來(lái)完成基礎(chǔ)的日志和調(diào)配,下面的只是提供一個(gè)思路
     */
    public void dispatchMessage(T message) {
        MDC.put(RocketMqSysConstant.TRACE_ID, message.getTraceId());
        // 基礎(chǔ)日志記錄被父類處理了
        logger.info("[{}]消費(fèi)者收到消息[{}]", consumerName(), JsonUtil.toJson(message));
        if (isFilter(message)) {
            logger.info("消息不滿足消費(fèi)條件,已過(guò)濾");
            return;
        }
        // 超過(guò)最大重試次數(shù)時(shí)調(diào)用子類方法處理
        if (message.getRetryTimes() > maxRetryTimes()) {
            overMaxRetryTimesMessage(message);
            return;
        }
        try {
            long start = Instant.now().toEpochMilli();
            handleMessage(message);
            long end = Instant.now().toEpochMilli();
            logger.info("消息消費(fèi)成功,耗時(shí)[{}ms]", (end - start));
        } catch (Exception e) {
            logger.error("消息消費(fèi)異常", e);
            // 是捕獲異常還是拋出,由子類決定
            if (isThrowException()) {
                throw new RuntimeException(e);
            }
            if (isRetry()) {
                // 獲取子類RocketMQMessageListener注解拿到topic和tag
                RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);
                if (Objects.nonNull(annotation)) {
                    message.setSource(message.getSource() + "消息重試");
                    message.setRetryTimes(message.getRetryTimes() + 1);
                    SendResult sendResult;
                    try {
                        // 如果消息發(fā)送不成功,則再次重新發(fā)送,如果發(fā)送異常則拋出由MQ再次處理(異常時(shí)不走延遲消息)
                        // 此處捕獲之后,相當(dāng)于此條消息被消息完成然后重新發(fā)送新的消息
                        sendResult = rocketMqTemplate.send(annotation.topic(), annotation.selectorExpression(), message, retryDelayLevel());
                    } catch (Exception ex) {
                        throw new RuntimeException(ex);
                    }
                    // 發(fā)送失敗的處理就是不進(jìn)行ACK,由RocketMQ重試
                    if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                        throw new RuntimeException("重試消息發(fā)送失敗");
                    }
                }
            }
        }
    }
}
  • 封裝消費(fèi)最終類
    • 注意:收到的消息是先委派給父類,父類進(jìn)行調(diào)度管理
package com.codecoord.rocketmq.listener;

import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import com.codecoord.rocketmq.domain.RocketMqEntityMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * 實(shí)體類消費(fèi)監(jiān)聽(tīng)器,在實(shí)現(xiàn)RocketMQListener中間還加了一層BaseMqMessageListener來(lái)處理基礎(chǔ)業(yè)務(wù)消息
 *
 * @author tianxincoord@163.com
 * @since 2022/5/12
 */
@Slf4j
@Component
@RocketMQMessageListener(
        topic = RocketMqBizConstant.SOURCE_TOPIC,
        consumerGroup = RocketMqBizConstant.SOURCE_GROUP,
        selectorExpression = RocketMqBizConstant.SOURCE_TAG,
        // 指定消費(fèi)者線程數(shù),默認(rèn)64,生產(chǎn)中請(qǐng)注意配置,避免過(guò)大或者過(guò)小
        consumeThreadMax = 5
)
public class RocketEntityMessageListener extends BaseMqMessageListener<RocketMqEntityMessage>
                                         implements RocketMQListener<RocketMqEntityMessage> {
    /**
     * 此處只是說(shuō)明封裝的思想,更多還是要根據(jù)業(yè)務(wù)操作決定
     * 內(nèi)功心法有了,無(wú)論什么招式都可以發(fā)揮最大威力
     */
    @Override
    protected String consumerName() {
        return "RocketMQ二次封裝消息消費(fèi)者";
    }

    @Override
    public void onMessage(RocketMqEntityMessage message) {
        // 注意,此時(shí)這里沒(méi)有直接處理業(yè)務(wù),而是先委派給父類做基礎(chǔ)操作,然后父類做完基礎(chǔ)操作后會(huì)調(diào)用子類的實(shí)際處理類型
        super.dispatchMessage(message);
    }

    @Override
    protected void handleMessage(RocketMqEntityMessage message) throws Exception {
        // 此時(shí)這里才是最終的業(yè)務(wù)處理,代碼只需要處理資源類關(guān)閉異常,其他的可以交給父類重試
        System.out.println("業(yè)務(wù)消息處理");
    }

    @Override
    protected void overMaxRetryTimesMessage(RocketMqEntityMessage message) {
        // 當(dāng)超過(guò)指定重試次數(shù)消息時(shí)此處方法會(huì)被調(diào)用
        // 生產(chǎn)中可以進(jìn)行回退或其他業(yè)務(wù)操作
    }

    @Override
    protected boolean isRetry() {
        return false;
    }

    @Override
    protected int maxRetryTimes() {
        // 指定需要的重試次數(shù),超過(guò)重試次數(shù)overMaxRetryTimesMessage會(huì)被調(diào)用
        return 5;
    }

    @Override
    protected boolean isThrowException() {
        // 是否拋出異常,到消費(fèi)異常時(shí)是被父類攔截處理還是直接拋出異常
        return false;
    }
}
  • 封裝后對(duì)于子類來(lái)說(shuō),只需要告訴父類要不要做就擁有了最開(kāi)始說(shuō)的所有功能,簡(jiǎn)化了使用,此時(shí)子類消費(fèi)者只需要專注于自己的業(yè)務(wù)核心處理就可以了

2.4 廣播消息的應(yīng)用場(chǎng)景

  • 應(yīng)用場(chǎng)景:多租戶或者服務(wù)有內(nèi)部緩存需要刷新情況下如果需要刷新租戶信息或者緩存信息
  • 也就是需要所有服務(wù)節(jié)點(diǎn)都需要同事做某一件事情的時(shí)候,此時(shí)可以借助廣播消息發(fā)送消息到所有節(jié)點(diǎn)刷新,無(wú)需一個(gè)節(jié)點(diǎn)一個(gè)節(jié)點(diǎn)的處理
  • 特別說(shuō)明:廣播消息默認(rèn)會(huì)在家目錄下創(chuàng)建消費(fèi)進(jìn)度文件,會(huì)以www.tianxincoord.com:9876@www.tianxincoord.com:9876這種地址形式生成文件路徑,但是由于帶有:符號(hào),windows下是不允許此符號(hào)作為文件夾名稱的,所以如果rocketMQ的鏈接地址不是連接串(不帶有端口)可以取消下面的messageModel注釋,否則啟動(dòng)的時(shí)候就會(huì)提示目標(biāo)卷或者路徑不存在,其實(shí)是因?yàn)檫@個(gè)問(wèn)題
package com.codecoord.rocketmq.listener;

import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * 廣播消息
 * 應(yīng)用場(chǎng)景:多租戶或者服務(wù)有內(nèi)部緩存需要刷新情況下如果需要刷新租戶信息或者緩存信息
 *      也就是需要所有服務(wù)節(jié)點(diǎn)都需要同事做某一件事情的時(shí)候
 * 此時(shí)可以借助廣播消息發(fā)送消息到所有節(jié)點(diǎn)刷新,無(wú)需一個(gè)節(jié)點(diǎn)一個(gè)節(jié)點(diǎn)的處理
 *
 * 特別說(shuō)明:廣播消息默認(rèn)會(huì)在家目錄下創(chuàng)建消費(fèi)進(jìn)度文件,會(huì)以www.tianxincoord.com:9876@www.tianxincoord.com:9876
 *      這種地址形式生成文件路徑,但是由于帶有:符號(hào),windows下是不允許此符號(hào)作為文件夾名稱的
 *      所以如果rocketMQ的鏈接地址不是連接串(不帶有端口)可以取消下面的messageModel注釋
 *      否則啟動(dòng)的時(shí)候就會(huì)提示目標(biāo)卷或者路徑不存在,其實(shí)是因?yàn)檫@個(gè)問(wèn)題
 *
 * @author tianxincoord@163.com
 * @since 2022/5/12
 */
@Slf4j
@Component
@RocketMQMessageListener(
        topic = RocketMqBizConstant.SOURCE_TOPIC,
        consumerGroup = RocketMqBizConstant.SOURCE_BROADCASTING_GROUP,
        selectorExpression = RocketMqBizConstant.SOURCE_BROADCASTING_TAG
        // messageModel = MessageModel.BROADCASTING
)
public class RocketBroadcastingListener implements RocketMQListener<MessageExt> {

    /**
     * MessageExt:內(nèi)置的消息實(shí)體,生產(chǎn)中根據(jù)需要自己封裝實(shí)體
     */
    @Override
    public void onMessage(MessageExt message) {
        log.info("收到廣播消息【{}】", new String(message.getBody()));
    }
}

2.3 代碼封裝完結(jié)測(cè)試

封裝測(cè)試大家可以直接參考RocketMqController即可

package com.codecoord.rocketmq.controller;

import com.alibaba.fastjson.JSONObject;
import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import com.codecoord.rocketmq.domain.RocketMqEntityMessage;
import com.codecoord.rocketmq.template.OrderMessageTemplate;
import com.codecoord.rocketmq.template.RocketMqTemplate;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.UUID;

/**
 * 消息發(fā)送
 *
 * @author tianxin01@huice.com
 * @since 2022/6/16
 */
@RestController
@RequestMapping("/rocketmq")
@Slf4j
public class RocketMqController {
    /**
     * 注意此處注入的是封裝的RocketMqTemplate
     */
    @Resource
    private RocketMqTemplate rocketMqTemplate;
    /**
     * 注入對(duì)應(yīng)業(yè)務(wù)的模板類
     */
    @Resource
    private OrderMessageTemplate orderMessageTemplate;

    /**
     * 通過(guò)實(shí)體類發(fā)送消息,發(fā)送注意事項(xiàng)請(qǐng)參考實(shí)體類
     * 說(shuō)明:也可以在RocketMqTemplate按照業(yè)務(wù)封裝發(fā)送方法,這樣只需要調(diào)用方法指定基礎(chǔ)業(yè)務(wù)消息接口
     */
    @RequestMapping("/entity/message")
    public Object sendMessage() {
        RocketMqEntityMessage message = new RocketMqEntityMessage();
        // 設(shè)置業(yè)務(wù)key
        message.setKey(UUID.randomUUID().toString());
        // 設(shè)置消息來(lái)源,便于查詢we年
        message.setSource("封裝測(cè)試");
        // 業(yè)務(wù)消息內(nèi)容
        message.setMessage("當(dāng)前消息發(fā)送時(shí)間為:" + LocalDateTime.now());
        // Java時(shí)間字段需要單獨(dú)處理,否則會(huì)序列化失敗
        message.setBirthday(LocalDate.now());
        message.setTradeTime(LocalDateTime.now());
        return rocketMqTemplate.send(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.SOURCE_TAG, message);
    }

    /**
     * 此時(shí)對(duì)于調(diào)用者而且,無(wú)需創(chuàng)建任何類
     * 如果某天需要調(diào)整消息發(fā)送來(lái)源,如果不封裝,所有原來(lái)產(chǎn)生message的地方全部改
     * 如果封裝了,只需要改sendOrderPaid就可以切換
     */
    @RequestMapping("/order/paid")
    public Object sendOrderPaidMessage() {
        return orderMessageTemplate.sendOrderPaid(UUID.randomUUID().toString(), "客戶下單了...,快快備貨");
    }

    /**
     * 直接將對(duì)象進(jìn)行傳輸,也可以自己進(jìn)行json轉(zhuǎn)化后傳輸
     */
    @RequestMapping("/messageExt/message")
    public SendResult convertAndSend() {
        // 生產(chǎn)中不推薦使用jsonObject傳遞,不看發(fā)送者無(wú)法知道傳遞的消息包含什么信息
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("type", "messageExt");
        String destination = rocketMqTemplate.buildDestination(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.SOURCE_BROADCASTING_TAG);
        // 如果要走內(nèi)部方法發(fā)送則必須要按照標(biāo)準(zhǔn)來(lái),否則就使用原生的消息發(fā)送
        return rocketMqTemplate.getTemplate().syncSend(destination, jsonObject);
    }
}

到此這篇關(guān)于RocketMQ整合SpringBoot實(shí)現(xiàn)生產(chǎn)級(jí)二次封裝的文章就介紹到這了,更多相關(guān)RocketMQ SpringBoot封裝內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • SpringBoot Starter依賴原理與實(shí)例詳解

    SpringBoot Starter依賴原理與實(shí)例詳解

    SpringBoot中的starter是一種非常重要的機(jī)制,能夠拋棄以前繁雜的配置,將其統(tǒng)一集成進(jìn)starter,應(yīng)用者只需要在maven中引入starter依賴,SpringBoot就能自動(dòng)掃描到要加載的信息并啟動(dòng)相應(yīng)的默認(rèn)配置。starter讓我們擺脫了各種依賴庫(kù)的處理,需要配置各種信息的困擾
    2022-09-09
  • mybatis水平分表實(shí)現(xiàn)動(dòng)態(tài)表名的項(xiàng)目實(shí)例

    mybatis水平分表實(shí)現(xiàn)動(dòng)態(tài)表名的項(xiàng)目實(shí)例

    本文主要介紹了mybatis水平分表實(shí)現(xiàn)動(dòng)態(tài)表名的項(xiàng)目實(shí)例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2022-07-07
  • idea如何將指定目錄打成jar包

    idea如何將指定目錄打成jar包

    這篇文章主要介紹了idea如何將指定目錄打成jar包問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-09-09
  • Java運(yùn)行Jar包內(nèi)存配置的操作

    Java運(yùn)行Jar包內(nèi)存配置的操作

    這篇文章主要介紹了Java運(yùn)行Jar包內(nèi)存配置的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2021-01-01
  • JavaWeb利用struts實(shí)現(xiàn)文件下載時(shí)改變文件名稱

    JavaWeb利用struts實(shí)現(xiàn)文件下載時(shí)改變文件名稱

    這篇文章主要為大家詳細(xì)介紹了JavaWeb利用struts實(shí)現(xiàn)文件下載時(shí)改變文件名稱的相關(guān)資料,需要的朋友可以參考下
    2016-06-06
  • Java基于ArrayList實(shí)現(xiàn)群主發(fā)紅包功能

    Java基于ArrayList實(shí)現(xiàn)群主發(fā)紅包功能

    這篇文章主要介紹了Java基于ArrayList實(shí)現(xiàn)群主發(fā)紅包功能,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-09-09
  • 詳解SpringBoot如何實(shí)現(xiàn)緩存預(yù)熱

    詳解SpringBoot如何實(shí)現(xiàn)緩存預(yù)熱

    緩存預(yù)熱是指在 Spring Boot 項(xiàng)目啟動(dòng)時(shí),預(yù)先將數(shù)據(jù)加載到緩存系統(tǒng)(如 Redis)中的一種機(jī)制,下面我們就來(lái)看看SpringBoot是如何實(shí)現(xiàn)緩存預(yù)熱的吧
    2024-01-01
  • Mockito mock Kotlin Object類方法報(bào)錯(cuò)解決方法

    Mockito mock Kotlin Object類方法報(bào)錯(cuò)解決方法

    這篇文章主要介紹了Mockito mock Kotlin Object類方法報(bào)錯(cuò)解決方法,本篇文章通過(guò)簡(jiǎn)要的案例,講解了該項(xiàng)技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下
    2021-09-09
  • 理解Java注解及Spring的@Autowired是如何實(shí)現(xiàn)的

    理解Java注解及Spring的@Autowired是如何實(shí)現(xiàn)的

    今天通過(guò)本文帶領(lǐng)大家學(xué)習(xí)注解的基礎(chǔ)知識(shí),學(xué)習(xí)Spring的@Autowired是怎么實(shí)現(xiàn)的,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧
    2021-07-07
  • Redis 集成Spring的示例代碼(spring-data-redis)

    Redis 集成Spring的示例代碼(spring-data-redis)

    本篇文章主要介紹了Redis 集成Spring的示例代碼(spring-data-redis) ,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-09-09

最新評(píng)論