欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spring Boot優(yōu)雅使用RocketMQ的方法實例

 更新時間:2019年12月29日 10:13:09   作者:SimpleWu  
這篇文章主要給大家介紹了關(guān)于Spring Boot優(yōu)雅使用RocketMQ的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家學習或者使用Spring Boot具有一定的參考學習價值,需要的朋友們下面來一起學習學習吧

前言

MQ,是一種跨進程的通信機制,用于上下游傳遞消息。在傳統(tǒng)的互聯(lián)網(wǎng)架構(gòu)中通常使用MQ來對上下游來做解耦合。

舉例:當A系統(tǒng)對B系統(tǒng)進行消息通訊,如A系統(tǒng)發(fā)布一條系統(tǒng)公告,B系統(tǒng)可以訂閱該頻道進行系統(tǒng)公告同步,整個過程中A系統(tǒng)并不關(guān)系B系統(tǒng)會不會同步,由訂閱該頻道的系統(tǒng)自行處理。

什么是RocketMQ?#

官方說明:

隨著使用越來越多的隊列和虛擬主題,ActiveMQ IO模塊遇到了瓶頸。我們盡力通過節(jié)流,斷路器或降級來解決此問題,但效果不佳。因此,我們那時開始關(guān)注流行的消息傳遞解決方案Kafka。不幸的是,Kafka不能滿足我們的要求,特別是在低延遲和高可靠性方面。

看到這里可以很清楚的知道RcoketMQ 是一款低延遲、高可靠、可伸縮、易于使用的消息中間件。

具有以下特性:

  • 支持發(fā)布/訂閱(Pub/Sub)和點對點(P2P)消息模型
  • 能夠保證嚴格的消息順序,在一個隊列中可靠的先進先出(FIFO)和嚴格的順序傳遞
  • 提供豐富的消息拉取模式,支持拉(pull)和推(push)兩種消息模式
  • 單一隊列百萬消息的堆積能力,億級消息堆積能力
  • 支持多種消息協(xié)議,如 JMS、MQTT 等
  • 分布式高可用的部署架構(gòu),滿足至少一次消息傳遞語義

RocketMQ環(huán)境安裝#

下載地址:https://rocketmq.apache.org/dowloading/releases/

從官方下載二進制或者源碼來進行使用。源碼編譯需要Maven3.2x,JDK8

在根目錄進行打包:

mvn -Prelease-all -DskipTests clean packager -U

distribution/target/apache-rocketmq文件夾中會存在一個文件夾版,zip,tar三個可運行的完整程序。

使用rocketmq-4.6.0.zip:

  • 啟動名稱服務(wù) mqnamesrv.cmd
  • 啟動數(shù)據(jù)中心 mqbroker.cmd -n localhost:9876

SpringBoot環(huán)境中使用RocketMQ#

SpringBoot 入門:http://www.dbjr.com.cn/article/177449.htm

SpringBoot 常用start:http://www.dbjr.com.cn/article/177451.htm

當前環(huán)境版本為:

  • SpringBoot 2.0.6.RELEASE
  • SpringCloud Finchley.RELEASE
  • SpringCldod Alibaba 0.2.1.RELEASE
  • RocketMQ 4.3.0

在項目工程中導入:

<!-- MQ Begin -->
<dependency>
 <groupId>org.apache.rocketmq</groupId>
 <artifactId>rocketmq-client</artifactId>
 <version>${rocketmq.version}</version>
</dependency>
<!-- MQ End -->

由于我們這邊已經(jīng)有工程了所以就不在進行創(chuàng)建這種過程了。主要是看看如何使用RocketMQ。

創(chuàng)建RocketMQProperties配置屬性類,類中內(nèi)容如下:

@ConfigurationProperties(prefix = "rocketmq")
public class RocketMQProperties {
 private boolean isEnable = false;
 private String namesrvAddr = "localhost:9876";
 private String groupName = "default";
 private int producerMaxMessageSize = 1024;
 private int producerSendMsgTimeout = 2000;
 private int producerRetryTimesWhenSendFailed = 2;
 private int consumerConsumeThreadMin = 5;
 private int consumerConsumeThreadMax = 30;
 private int consumerConsumeMessageBatchMaxSize = 1;
 //省略get set
}

現(xiàn)在我們所有子系統(tǒng)中的生產(chǎn)者,消費者對應(yīng):

isEnable 是否開啟mq

namesrvAddr 集群地址

groupName 分組名稱

設(shè)置為統(tǒng)一已方便系統(tǒng)對接,如有其它需求在進行擴展,類中我們已經(jīng)給了默認值也可以在配置文件或配置中心中獲取配置,配置如下:

#發(fā)送同一類消息的設(shè)置為同一個group,保證唯一,默認不需要設(shè)置,rocketmq會使用ip@pid(pid代表jvm名字)作為唯一標示
rocketmq.groupName=please_rename_unique_group_name
#是否開啟自動配置
rocketmq.isEnable=true
#mq的nameserver地址
rocketmq.namesrvAddr=127.0.0.1:9876
#消息最大長度 默認1024*4(4M)
rocketmq.producer.maxMessageSize=4096
#發(fā)送消息超時時間,默認3000
rocketmq.producer.sendMsgTimeout=3000
#發(fā)送消息失敗重試次數(shù),默認2
rocketmq.producer.retryTimesWhenSendFailed=2
#消費者線程數(shù)量
rocketmq.consumer.consumeThreadMin=5
rocketmq.consumer.consumeThreadMax=32
#設(shè)置一次消費消息的條數(shù),默認為1條
rocketmq.consumer.consumeMessageBatchMaxSize=1

創(chuàng)建消費者接口 RocketConsumer.java 該接口用戶約束消費者需要的核心步驟:

/**
 * 消費者接口
 * 
 * @author SimpleWu
 *
 */
public interface RocketConsumer {

/**
 * 初始化消費者
 */
 public abstract void init();

 /**
 * 注冊監(jiān)聽
 * 
 * @param messageListener
 */
 public void registerMessageListener(MessageListener messageListener);

}

創(chuàng)建抽象消費者 AbstractRocketConsumer.java:

/**
 * 消費者基本信息
 * 
 * @author SimpelWu
 */
public abstract class AbstractRocketConsumer implements RocketConsumer {

 protected String topics;
 protected String tags;
 protected MessageListener messageListener;
 protected String consumerTitel;
 protected MQPushConsumer mqPushConsumer;

 /**
 * 必要的信息
 * 
 * @param topics
 * @param tags
 * @param consumerTitel
 */
 public void necessary(String topics, String tags, String consumerTitel) {
 this.topics = topics;
 this.tags = tags;
 this.consumerTitel = consumerTitel;
 }

 public abstract void init();

 @Override
 public void registerMessageListener(MessageListener messageListener) {
 this.messageListener = messageListener;
 }
 
}

在類中我們必須指定這個topics,tags與消息監(jiān)聽邏輯

public abstract void init();該方法是用于初始化消費者,由子類實現(xiàn)。

接下來我們編寫自動配置類RocketMQConfiguation.java,該類用戶初始化一個默認的生產(chǎn)者連接,以及加載所有的消費者。

@EnableConfigurationProperties({ RocketMQProperties.class }) 使用該配置文件

@Configuration 標注為配置類

@ConditionalOnProperty(prefix = "rocketmq", value = "isEnable", havingValue = "true") 只有當配置中指定rocketmq.isEnable = true的時候才會生效

核心內(nèi)容如下:

/**
 * mq配置
 * 
 * @author SimpleWu
 */
@Configuration
@EnableConfigurationProperties({ RocketMQProperties.class })
@ConditionalOnProperty(prefix = "rocketmq", value = "isEnable", havingValue = "true")
public class RocketMQConfiguation {

 private RocketMQProperties properties;

 private ApplicationContext applicationContext;

 private Logger log = LoggerFactory.getLogger(RocketMQConfiguation.class);

 public RocketMQConfiguation(RocketMQProperties properties, ApplicationContext applicationContext) {
 this.properties = properties;
 this.applicationContext = applicationContext;
 }

 /**
 * 注入一個默認的消費者
 * @return
 * @throws MQClientException
 */
 @Bean
 public DefaultMQProducer getRocketMQProducer() throws MQClientException {
 if (StringUtils.isEmpty(properties.getGroupName())) {
  throw new MQClientException(-1, "groupName is blank");
 }

 if (StringUtils.isEmpty(properties.getNamesrvAddr())) {
  throw new MQClientException(-1, "nameServerAddr is blank");
 }
 DefaultMQProducer producer;
 producer = new DefaultMQProducer(properties.getGroupName());

 producer.setNamesrvAddr(properties.getNamesrvAddr());
 // producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");

 // 如果需要同一個jvm中不同的producer往不同的mq集群發(fā)送消息,需要設(shè)置不同的instanceName
 // producer.setInstanceName(instanceName);
 producer.setMaxMessageSize(properties.getProducerMaxMessageSize());
 producer.setSendMsgTimeout(properties.getProducerSendMsgTimeout());
 // 如果發(fā)送消息失敗,設(shè)置重試次數(shù),默認為2次
 producer.setRetryTimesWhenSendFailed(properties.getProducerRetryTimesWhenSendFailed());

 try {
  producer.start();
  log.info("producer is start ! groupName:{},namesrvAddr:{}", properties.getGroupName(),
   properties.getNamesrvAddr());
 } catch (MQClientException e) {
  log.error(String.format("producer is error {}", e.getMessage(), e));
  throw e;
 }
 return producer;

 }

 /**
 * SpringBoot啟動時加載所有消費者
 */
 @PostConstruct
 public void initConsumer() {
 Map<String, AbstractRocketConsumer> consumers = applicationContext.getBeansOfType(AbstractRocketConsumer.class);
 if (consumers == null || consumers.size() == 0) {
  log.info("init rocket consumer 0");
 }
 Iterator<String> beans = consumers.keySet().iterator();
 while (beans.hasNext()) {
  String beanName = (String) beans.next();
  AbstractRocketConsumer consumer = consumers.get(beanName);
  consumer.init();
  createConsumer(consumer);
  log.info("init success consumer title {} , toips {} , tags {}", consumer.consumerTitel, consumer.tags,
   consumer.topics);
 }
 }

 /**
 * 通過消費者信心創(chuàng)建消費者
 * 
 * @param consumerPojo
 */
 public void createConsumer(AbstractRocketConsumer arc) {
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(this.properties.getGroupName());
 consumer.setNamesrvAddr(this.properties.getNamesrvAddr());
 consumer.setConsumeThreadMin(this.properties.getConsumerConsumeThreadMin());
 consumer.setConsumeThreadMax(this.properties.getConsumerConsumeThreadMax());
 consumer.registerMessageListener(arc.messageListenerConcurrently);
 /**
  * 設(shè)置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費 如果非第一次啟動,那么按照上次消費的位置繼續(xù)消費
  */
 // consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
 /**
  * 設(shè)置消費模型,集群還是廣播,默認為集群
  */
 // consumer.setMessageModel(MessageModel.CLUSTERING);

 /**
  * 設(shè)置一次消費消息的條數(shù),默認為1條
  */
 consumer.setConsumeMessageBatchMaxSize(this.properties.getConsumerConsumeMessageBatchMaxSize());
 try {
  consumer.subscribe(arc.topics, arc.tags);
  consumer.start();
  arc.mqPushConsumer=consumer;
 } catch (MQClientException e) {
  log.error("info consumer title {}", arc.consumerTitel, e);
 }

 }

}

然后在src/main/resources文件夾中創(chuàng)建目錄與文件META-INF/spring.factories里面添加自動配置類即可開啟啟動配置,我們只需要導入依賴即可:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.xcloud.config.rocketmq.RocketMQConfiguation

接下來在服務(wù)中導入依賴,然后通過我們的抽象類獲取所有必要信息對消費者進行創(chuàng)建,該步驟會在所有消費者初始化完成后進行,且只會管理是Spring Bean的消費者。

下面我們看看如何創(chuàng)建一個消費者,創(chuàng)建消費者的步驟非常簡單,只需要繼承AbstractRocketConsumer然后再加上Spring的@Component就能夠完成消費者的創(chuàng)建,我們可以在類中自定義消費的主題與標簽。

在項目可以根據(jù)需求當消費者創(chuàng)建失敗的時候是否繼續(xù)啟動工程。

創(chuàng)建一個默認的消費者 DefaultConsumerMQ.java

@Component
public class DefaultConsumerMQ extends AbstractRocketConsumer {
 /**
 * 初始化消費者
 */
 @Override
 public void init() {
 // 設(shè)置主題,標簽與消費者標題
 super.necessary("TopicTest", "*", "這是標題");
 //消費者具體執(zhí)行邏輯
 registerMessageListener(new MessageListenerConcurrently() {
  @Override
  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  msgs.forEach(msg -> {
   System.out.printf("consumer message boyd %s %n", new String(msg.getBody()));
  });
  // 標記該消息已經(jīng)被成功消費
  return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  }
 });
 }
}

super.necessary("TopicTest", "*", "這是標題"); 是必須要設(shè)置的,代表該消費者監(jiān)聽TopicTest主題下所有tags,標題那個字段是我自己定義的,所以對于該配置來說沒什么意義。

我們可以在這里注入Spring的Bean來進行任意邏輯處理。

創(chuàng)建一個消息發(fā)送類進行測試

@Override
public String qmtest(@PathVariable("name")String name) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException {
 Message msg = new Message("TopicTest", "tags1", name.getBytes(RemotingHelper.DEFAULT_CHARSET));
 // 發(fā)送消息到一個Broker
 SendResult sendResult = defaultMQProducer.send(msg);
 // 通過sendResult返回消息是否成功送達
 System.out.printf("%s%n", sendResult);
 return null;
}

我們來通過Http請求測試:

http://localhost:10001/demo/base/mq/hello consumer message boyd hello 
http://localhost:10001/demo/base/mq/嘿嘿嘿嘿嘿 consumer message boyd 嘿嘿嘿嘿嘿 

好了到這里簡單的start算是設(shè)計完成了,后面還有一些:順序消息生產(chǎn),順序消費消息,異步消息生產(chǎn)等一系列功能,官人可參照官方去自行處理。

  • ActiveMQ 沒經(jīng)過大規(guī)模吞吐量場景的驗證,社區(qū)不高不活躍。
  • RabbitMQ 集群動態(tài)擴展麻煩,且與當前程序語言不至于難以定制化。
  • kafka 支持主要的MQ功能,功能無法達到程序需求的要求,所以不使用,且與當前程序語言不至于難以定制化。
  • rocketMQ 經(jīng)過全世界的女人的洗禮,已經(jīng)很強大;MQ功能較為完善,還是分布式的,擴展性好;支持復雜MQ業(yè)務(wù)場景。(業(yè)務(wù)復雜可做首選)

總結(jié)

以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學習或者工作具有一定的參考學習價值,謝謝大家對腳本之家的支持。

相關(guān)文章

  • Netty如何設(shè)置為Https訪問

    Netty如何設(shè)置為Https訪問

    這篇文章主要介紹了Netty如何設(shè)置為Https訪問,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-06-06
  • Springboot啟動原理詳細講解

    Springboot啟動原理詳細講解

    這篇文章主要介紹了SpringBoot啟動原理的分析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2022-07-07
  • 使用Java自制一個一個Nacos

    使用Java自制一個一個Nacos

    Nacos是?Dynamic?Naming?and?Configuration?Service的首字母簡稱,一個更易于構(gòu)建云原生應(yīng)用的動態(tài)服務(wù)發(fā)現(xiàn)、配置管理和服務(wù)管理平臺,本文將嘗試用Java實現(xiàn)一個Nacos,感興趣的可以了解下
    2024-01-01
  • springboot集成swagger過程解析

    springboot集成swagger過程解析

    這篇文章主要介紹了springboot集成swagger過程解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-01-01
  • java顯示目錄文件列表和刪除目錄功能

    java顯示目錄文件列表和刪除目錄功能

    這篇文章主要介紹了java顯示目錄文件列表和刪除目錄功能,文章通過實例代碼給大家介紹的非常詳細,需要的朋友可以參考下
    2017-12-12
  • zookeeper的Leader選舉機制源碼解析

    zookeeper的Leader選舉機制源碼解析

    這篇文章主要為大家介紹了zookeeper的Leader選舉源碼解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-03-03
  • Elasticsearch?Analyzer?內(nèi)置分詞器使用示例詳解

    Elasticsearch?Analyzer?內(nèi)置分詞器使用示例詳解

    這篇文章主要為大家介紹了Elasticsearch?Analyzer?內(nèi)置分詞器使用示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-11-11
  • springboot中request和response的加解密實現(xiàn)代碼

    springboot中request和response的加解密實現(xiàn)代碼

    這篇文章主要介紹了springboot中request和response的加解密實現(xiàn),在springboot中提供了RequestBodyAdviceAdapter和ResponseBodyAdvice,利用這兩個工具可以非常方便的對請求和響應(yīng)進行預處理,需要的朋友可以參考下
    2022-06-06
  • java 出現(xiàn)Zipexception 異常的解決辦法

    java 出現(xiàn)Zipexception 異常的解決辦法

    這篇文章主要介紹了java 出現(xiàn)Zipexception 異常的解決辦法的相關(guān)資料,出現(xiàn) java.util.zip.ZipException: error in opening zip file 異常的原因及解決方法,需要的朋友可以參考下
    2017-08-08
  • mybatis plus條件構(gòu)造器queryWrapper、updateWrapper

    mybatis plus條件構(gòu)造器queryWrapper、updateWrapper

    這篇文章主要介紹了mybatis plus條件構(gòu)造器queryWrapper、updateWrapper,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-09-09

最新評論