欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

淺談Springboot整合RocketMQ使用心得

 更新時(shí)間:2018年01月15日 16:44:49   作者:HenryZhou2  
本篇文章主要介紹了Springboot整合RocketMQ使用心得,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧

一、阿里云官網(wǎng)---幫助文檔

https://help.aliyun.com/document_detail/29536.html?spm=5176.doc29535.6.555.WWTIUh

按照官網(wǎng)步驟,創(chuàng)建Topic、申請(qǐng)發(fā)布(生產(chǎn)者)、申請(qǐng)訂閱(消費(fèi)者)

二、代碼

1、配置:

public class MqConfig {
  /**
   * 啟動(dòng)測(cè)試之前請(qǐng)?zhí)鎿Q如下 XXX 為您的配置
   */
  public static final String PUBLIC_TOPIC = "test";//公網(wǎng)測(cè)試
  public static final String PUBLIC_PRODUCER_ID = "PID_SCHEDULER";
  public static final String PUBLIC_CONSUMER_ID = "CID_SERVICE";

  public static final String ACCESS_KEY = "123";
  public static final String SECRET_KEY = "123";
  public static final String TAG = "";
  public static final String THREAD_NUM = "25";//消費(fèi)端線程數(shù)
  /**
   * ONSADDR 請(qǐng)根據(jù)不同Region進(jìn)行配置
   * 公網(wǎng)測(cè)試: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
   * 公有云生產(chǎn): http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
   * 杭州金融云: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
   * 深圳金融云: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal
   */
  public static final String ONSADDR = "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal";
}

ONSADDR 阿里云用 公有云生產(chǎn),測(cè)試用公網(wǎng)

不同的業(yè)務(wù)可以設(shè)置不同的tag,但是如果發(fā)送消息量大的話,建議新建TOPIC

2、生產(chǎn)者

方式1:

配置文件:producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd">
<beans>
  <bean id="producer" class="com.aliyun.openservices.ons.api.bean.ProducerBean"
     init-method="start" destroy-method="shutdown">
    <property name="properties">
      <map>
        <entry key="ProducerId" value="" /> <!-- PID,請(qǐng)?zhí)鎿Q -->
        <entry key="AccessKey" value="" /> <!-- ACCESS_KEY,請(qǐng)?zhí)鎿Q -->
        <entry key="SecretKey" value="" /> <!-- SECRET_KEY,請(qǐng)?zhí)鎿Q -->
        <!--PropertyKeyConst.ONSAddr 請(qǐng)根據(jù)不同Region進(jìn)行配置
         公網(wǎng)測(cè)試: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
         公有云生產(chǎn): http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
         杭州金融云: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
         深圳金融云: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal -->
        <entry key="ONSAddr" value="http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal"/>
      </map>
    </property>
  </bean>
</beans>

啟動(dòng)方式1,在使用類的全局里設(shè)置:

//初始化生產(chǎn)者
  private ApplicationContext ctx;
  private ProducerBean producer;

  @Value("${producerConfig.enabled}")//開(kāi)關(guān),spring配置項(xiàng),true為開(kāi)啟,false關(guān)閉
  private boolean producerConfigEnabled;

  @PostConstruct
  public void init(){
    if (true == producerConfigEnabled) {
      ctx = new ClassPathXmlApplicationContext("producer.xml");
      producer = (ProducerBean) ctx.getBean("producer");
    }
  }

PS:最近發(fā)現(xiàn)一個(gè)坑,如果producer用上面這種方式啟動(dòng)的話,一旦啟動(dòng)的多了,會(huì)造成fullGC,所以可以換成下面這種注解方式啟動(dòng),在用到的地方手動(dòng)start、shutdown

方式2:配置類(不需要xml)

@Configuration
public class ProducerBeanConfig {

  @Value("${openservices.ons.producerBean.producerId}")
  private String producerId;

  @Value("${openservices.ons.producerBean.accessKey}")
  private String accessKey;

  @Value("${openservices.ons.producerBean.secretKey}")
  private String secretKey;

  private ProducerBean producerBean;

  @Value("${openservices.ons.producerBean.ONSAddr}")
  private String ONSAddr;

  @Bean
  public ProducerBean oneProducer() {
    ProducerBean producerBean = new ProducerBean();
    Properties properties = new Properties();
    properties.setProperty(PropertyKeyConst.ProducerId, producerId);
    properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
    properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
    properties.setProperty(PropertyKeyConst.ONSAddr, ONSAddr);

    producerBean.setProperties(properties);
    return producerBean;
  }
}

PS:經(jīng)過(guò)這次雙11發(fā)現(xiàn),以上2種方式在大數(shù)據(jù)量,多線程情況下都不太適用, 性能很差,所以推薦用3

方式3:(不需要xml)

@Component
public class ProducerBeanSingleTon {

  @Value("${openservices.ons.producerBean.producerId}")
  private String producerId;

  @Value("${openservices.ons.producerBean.accessKey}")
  private String accessKey;

  @Value("${openservices.ons.producerBean.secretKey}")
  private String secretKey;

  @Value("${openservices.ons.producerBean.ONSAddr}")
  private String ONSAddr;

  private static Producer producer;

  private static class SingletonHolder {
    private static final ProducerBeanSingleTon INSTANCE = new ProducerBeanSingleTon();
  }

  private ProducerBeanSingleTon (){}

  public static final ProducerBeanSingleTon getInstance() {
    return SingletonHolder.INSTANCE;
  }

  @PostConstruct
  public void init(){
    // producer 實(shí)例配置初始化
    Properties properties = new Properties();
    //您在控制臺(tái)創(chuàng)建的Producer ID
    properties.setProperty(PropertyKeyConst.ProducerId, producerId);
    // AccessKey 阿里云身份驗(yàn)證,在阿里云服務(wù)器管理控制臺(tái)創(chuàng)建
    properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
    // SecretKey 阿里云身份驗(yàn)證,在阿里云服務(wù)器管理控制臺(tái)創(chuàng)建
    properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
    //設(shè)置發(fā)送超時(shí)時(shí)間,單位毫秒
    properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
    // 設(shè)置 TCP 接入域名(此處以公共云生產(chǎn)環(huán)境為例)
    properties.setProperty(PropertyKeyConst.ONSAddr, ONSAddr);
    producer = ONSFactory.createProducer(properties);
    // 在發(fā)送消息前,必須調(diào)用start方法來(lái)啟動(dòng)Producer,只需調(diào)用一次即可
    producer.start();
  }

  public Producer getProducer(){
    return producer;
  }
}

spring配置

spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect

consumerConfig.enabled = true

producerConfig.enabled = true #方式1:

scheduling.enabled = false

#方式2、3:rocketMQ \u516C\u7F51\u914D\u7F6E
openservices.ons.producerBean.producerId = pid
openservices.ons.producerBean.accessKey = 
openservices.ons.producerBean.secretKey = 

openservices.ons.producerBean.ONSAddr = 公網(wǎng)、杭州公有云生產(chǎn)

方式1投遞消息代碼:

 try {
   String jsonC = JsonUtils.toJson(elevenMessage);
   Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, jsonC.getBytes());
   SendResult sendResult = producer.send(message);
   if (sendResult != null) {
     logger.info(".Send mq message success!”;

   } else {
     logger.warn(".sendResult is null.........");
   }
   } catch (Exception e) {
      logger.warn("DoubleElevenAllPreService");
      Thread.sleep(1000);//如果有異常,休眠1秒
   }

方式2投遞消息代碼:(可以每發(fā)1000個(gè)啟動(dòng)/關(guān)閉一次)

   producerBean.start();
try {
   String jsonC = JsonUtils.toJson(elevenMessage);
   Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, jsonC.getBytes());
   SendResult sendResult = producer.send(message);
   if (sendResult != null) {
     logger.info(".Send mq message success!”;

   } else {
     logger.warn(".sendResult is null.........");
   }
   } catch (Exception e) {
      logger.warn("DoubleElevenAllPreService");
      Thread.sleep(1000);//如果有異常,休眠1秒
   }

   producerBean.shutdown();

方式3:投遞消息

 try {
   String jsonC = JsonUtils.toJson(elevenMessage);
   Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, jsonC.getBytes());
   Producer producer = ProducerBeanSingleTon.getInstance().getProducer();
   SendResult sendResult = producer.send(message);
   if (sendResult != null) {
     logger.info("DoubleElevenMidService.Send mq message success! Topic is:"”;

   } else {
     logger.warn("DoubleElevenMidService.sendResult is null.........");
   }
   } catch (Exception e) {
     logger.error("DoubleElevenMidService Thread.sleep 1 s___error is "+e.getMessage(), e);
     Thread.sleep(1000);//如果有異常,休眠1秒
   }

發(fā)送消息的代碼一定要捕獲異常,不然會(huì)重復(fù)發(fā)送。

這里的TOPIC用自己創(chuàng)建的,elevenMessage是要發(fā)送的內(nèi)容,我這里是自己建的對(duì)象

3、消費(fèi)者

配置啟動(dòng)類:

@Configuration
@ConditionalOnProperty(value = "consumerConfig.enabled", havingValue = "true", matchIfMissing = true)
public class ConsumerConfig {

  private Logger logger = LoggerFactory.getLogger(LoggerAppenderType.smsdist.name());

  @Bean
  public Consumer consumerFactory(){//不同消費(fèi)者 這里不能重名
    Properties consumerProperties = new Properties();
    consumerProperties.setProperty(PropertyKeyConst.ConsumerId, MqConfig.CONSUMER_ID);
    consumerProperties.setProperty(PropertyKeyConst.AccessKey, MqConfig.ACCESS_KEY);
    consumerProperties.setProperty(PropertyKeyConst.SecretKey, MqConfig.SECRET_KEY);
    //consumerProperties.setProperty(PropertyKeyConst.ConsumeThreadNums,MqConfig.THREAD_NUM);
    consumerProperties.setProperty(PropertyKeyConst.ONSAddr, MqConfig.ONSADDR);
    Consumer consumer = ONSFactory.createConsumer(consumerProperties);
    consumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new DoubleElevenMessageListener());//new對(duì)應(yīng)的監(jiān)聽(tīng)器
    consumer.start();
    logger.info("ConsumerConfig start success.");
    

    return consumer;

  }
}

CID和ONSADDR一點(diǎn)要選對(duì),用自己的,消費(fèi)者線程數(shù)等可以在這里配置

創(chuàng)建消息監(jiān)聽(tīng)器類,消費(fèi)消息:

@Component
public class MessageListener implements MessageListener {
  private Logger logger = LoggerFactory.getLogger("remind");

  protected static ElevenReposity elevenReposity;
  @Resource
  public void setElevenReposity(ElevenReposity elevenReposity){
    MessageListener .elevenReposity=elevenReposity;
  }


  @Override
  public Action consume(Message message, ConsumeContext consumeContext) {

    if(message.getTopic().equals("自己的TOPIC")){//避免消費(fèi)到其他消息 json轉(zhuǎn)換報(bào)錯(cuò)
      try {

      byte[] body = message.getBody();
      String res = new String(body);
      
      //res 是生產(chǎn)者傳過(guò)來(lái)的消息內(nèi)容

        //業(yè)務(wù)代碼

      }else{
        logger.warn("!");
      }

      } catch (Exception e) {
        logger.error("MessageListener.consume error:" + e.getMessage(), e);
      }

      logger.info("MessageListener.Receive message”);
      //如果想測(cè)試消息重投的功能,可以將Action.CommitMessage 替換成Action.ReconsumeLater
      return Action.CommitMessage;
    }else{
      logger.warn();
      return Action.ReconsumeLater;
    }

  }

注意,由于消費(fèi)者是多線程的,所以對(duì)象要用static+set注入,把對(duì)象的級(jí)別提升到進(jìn)程,這樣多個(gè)線程就可以共用,但是無(wú)法調(diào)用父類的方法和變量

消費(fèi)者狀態(tài)可以查看消費(fèi)者是否連接成功,消費(fèi)是否延遲,消費(fèi)速度等

重置消費(fèi)位點(diǎn)可以清空所有消息

三、注意事項(xiàng)

1、發(fā)送的消息體 最大為256KB

2、消息最多存在3天

3、消費(fèi)端默認(rèn)線程數(shù)是20

4、如果運(yùn)行過(guò)程中出現(xiàn)java掛掉或者cpu占用異常高,可以在發(fā)送消息的時(shí)候,每發(fā)送1000條讓線程休息1s

5、本地測(cè)試或啟動(dòng)的時(shí)候,把ONSADDR換成公網(wǎng),不然報(bào)錯(cuò)無(wú)法啟動(dòng)

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • java的Guava工具包介紹

    java的Guava工具包介紹

    Java開(kāi)發(fā)的同學(xué)應(yīng)該都使用或者聽(tīng)說(shuō)過(guò)Google提供的Guava工具包。日常使用最多的肯定是集合相關(guān)的工具類,還有Guava cache,除了這些之外Guava還提供了很多有用的功能,鑒于日常想用的時(shí)候找不到,這里就梳理一下Guava中那些好用的工具類,想優(yōu)化代碼的時(shí)候不妨過(guò)來(lái)看看
    2021-04-04
  • 用java實(shí)現(xiàn)楊輝三角的示例代碼

    用java實(shí)現(xiàn)楊輝三角的示例代碼

    本篇文章主要介紹了用java實(shí)現(xiàn)楊輝三角的示例代碼,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2018-01-01
  • Java 邏輯控制詳解分析

    Java 邏輯控制詳解分析

    在程序開(kāi)發(fā)的過(guò)程之中一共會(huì)存在有三種程序邏輯:順序結(jié)構(gòu)、分支結(jié)構(gòu)、循環(huán)結(jié)構(gòu),對(duì)于之前所編寫(xiě)的代碼大部分都是順序結(jié)構(gòu)的定義,即:所有的程序?qū)凑斩x的代碼順序依次執(zhí)行
    2021-11-11
  • 詳解Java多線程處理List數(shù)據(jù)

    詳解Java多線程處理List數(shù)據(jù)

    這篇文章主要介紹了Java多線程處理List數(shù)據(jù),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-03-03
  • 基于java Springboot實(shí)現(xiàn)教務(wù)管理系統(tǒng)詳解

    基于java Springboot實(shí)現(xiàn)教務(wù)管理系統(tǒng)詳解

    這篇文章主要介紹了Java 實(shí)現(xiàn)簡(jiǎn)易教務(wù)管理系統(tǒng)的代碼,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-08-08
  • Spring 日志規(guī)范及作用

    Spring 日志規(guī)范及作用

    日志是在系統(tǒng)運(yùn)行過(guò)程中關(guān)鍵的節(jié)點(diǎn)的數(shù),這個(gè)些日志的記錄方便當(dāng)系統(tǒng)出現(xiàn)問(wèn)題方便問(wèn)題查找,這篇文章主要介紹了Spring 日志規(guī)范及作用,需要的朋友可以參考下
    2024-03-03
  • Spring實(shí)戰(zhàn)之ResourceLoaderAware加載資源用法示例

    Spring實(shí)戰(zhàn)之ResourceLoaderAware加載資源用法示例

    這篇文章主要介紹了Spring實(shí)戰(zhàn)之ResourceLoaderAware加載資源用法,結(jié)合實(shí)例形式分析了spring使用ResourceLoaderAware加載資源相關(guān)配置與操作技巧,需要的朋友可以參考下
    2020-01-01
  • 淺談Java 將圖片打包到j(luò)ar中的路徑問(wèn)題

    淺談Java 將圖片打包到j(luò)ar中的路徑問(wèn)題

    下面小編就為大家分享一篇淺談Java 將圖片打包到j(luò)ar中的路徑問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2018-02-02
  • mybatis項(xiàng)目兼容mybatis-plus問(wèn)題

    mybatis項(xiàng)目兼容mybatis-plus問(wèn)題

    這篇文章主要介紹了mybatis項(xiàng)目兼容mybatis-plus問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-02-02
  • SpringBoot 利用RestTemplate http測(cè)試

    SpringBoot 利用RestTemplate http測(cè)試

    這篇文章主要介紹了SpringBoot 利用RestTemplate http測(cè)試,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-08-08

最新評(píng)論