淺談Springboot整合RocketMQ使用心得
一、阿里云官網(wǎng)---幫助文檔
https://help.aliyun.com/document_detail/29536.html?spm=5176.doc29535.6.555.WWTIUh
按照官網(wǎng)步驟,創(chuàng)建Topic、申請(qǐng)發(fā)布(生產(chǎn)者)、申請(qǐng)訂閱(消費(fèi)者)
二、代碼
1、配置:
public class MqConfig {
/**
* 啟動(dòng)測(cè)試之前請(qǐng)?zhí)鎿Q如下 XXX 為您的配置
*/
public static final String PUBLIC_TOPIC = "test";//公網(wǎng)測(cè)試
public static final String PUBLIC_PRODUCER_ID = "PID_SCHEDULER";
public static final String PUBLIC_CONSUMER_ID = "CID_SERVICE";
public static final String ACCESS_KEY = "123";
public static final String SECRET_KEY = "123";
public static final String TAG = "";
public static final String THREAD_NUM = "25";//消費(fèi)端線程數(shù)
/**
* ONSADDR 請(qǐng)根據(jù)不同Region進(jìn)行配置
* 公網(wǎng)測(cè)試: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
* 公有云生產(chǎn): http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
* 杭州金融云: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
* 深圳金融云: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal
*/
public static final String ONSADDR = "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal";
}
ONSADDR 阿里云用 公有云生產(chǎn),測(cè)試用公網(wǎng)
不同的業(yè)務(wù)可以設(shè)置不同的tag,但是如果發(fā)送消息量大的話,建議新建TOPIC
2、生產(chǎn)者
方式1:
配置文件:producer.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd">
<beans>
<bean id="producer" class="com.aliyun.openservices.ons.api.bean.ProducerBean"
init-method="start" destroy-method="shutdown">
<property name="properties">
<map>
<entry key="ProducerId" value="" /> <!-- PID,請(qǐng)?zhí)鎿Q -->
<entry key="AccessKey" value="" /> <!-- ACCESS_KEY,請(qǐng)?zhí)鎿Q -->
<entry key="SecretKey" value="" /> <!-- SECRET_KEY,請(qǐng)?zhí)鎿Q -->
<!--PropertyKeyConst.ONSAddr 請(qǐng)根據(jù)不同Region進(jìn)行配置
公網(wǎng)測(cè)試: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
公有云生產(chǎn): http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
杭州金融云: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
深圳金融云: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal -->
<entry key="ONSAddr" value="http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal"/>
</map>
</property>
</bean>
</beans>
啟動(dòng)方式1,在使用類(lèi)的全局里設(shè)置:
//初始化生產(chǎn)者
private ApplicationContext ctx;
private ProducerBean producer;
@Value("${producerConfig.enabled}")//開(kāi)關(guān),spring配置項(xiàng),true為開(kāi)啟,false關(guān)閉
private boolean producerConfigEnabled;
@PostConstruct
public void init(){
if (true == producerConfigEnabled) {
ctx = new ClassPathXmlApplicationContext("producer.xml");
producer = (ProducerBean) ctx.getBean("producer");
}
}
PS:最近發(fā)現(xiàn)一個(gè)坑,如果producer用上面這種方式啟動(dòng)的話,一旦啟動(dòng)的多了,會(huì)造成fullGC,所以可以換成下面這種注解方式啟動(dòng),在用到的地方手動(dòng)start、shutdown
方式2:配置類(lèi)(不需要xml)
@Configuration
public class ProducerBeanConfig {
@Value("${openservices.ons.producerBean.producerId}")
private String producerId;
@Value("${openservices.ons.producerBean.accessKey}")
private String accessKey;
@Value("${openservices.ons.producerBean.secretKey}")
private String secretKey;
private ProducerBean producerBean;
@Value("${openservices.ons.producerBean.ONSAddr}")
private String ONSAddr;
@Bean
public ProducerBean oneProducer() {
ProducerBean producerBean = new ProducerBean();
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.ProducerId, producerId);
properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
properties.setProperty(PropertyKeyConst.ONSAddr, ONSAddr);
producerBean.setProperties(properties);
return producerBean;
}
}
PS:經(jīng)過(guò)這次雙11發(fā)現(xiàn),以上2種方式在大數(shù)據(jù)量,多線程情況下都不太適用, 性能很差,所以推薦用3
方式3:(不需要xml)
@Component
public class ProducerBeanSingleTon {
@Value("${openservices.ons.producerBean.producerId}")
private String producerId;
@Value("${openservices.ons.producerBean.accessKey}")
private String accessKey;
@Value("${openservices.ons.producerBean.secretKey}")
private String secretKey;
@Value("${openservices.ons.producerBean.ONSAddr}")
private String ONSAddr;
private static Producer producer;
private static class SingletonHolder {
private static final ProducerBeanSingleTon INSTANCE = new ProducerBeanSingleTon();
}
private ProducerBeanSingleTon (){}
public static final ProducerBeanSingleTon getInstance() {
return SingletonHolder.INSTANCE;
}
@PostConstruct
public void init(){
// producer 實(shí)例配置初始化
Properties properties = new Properties();
//您在控制臺(tái)創(chuàng)建的Producer ID
properties.setProperty(PropertyKeyConst.ProducerId, producerId);
// AccessKey 阿里云身份驗(yàn)證,在阿里云服務(wù)器管理控制臺(tái)創(chuàng)建
properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
// SecretKey 阿里云身份驗(yàn)證,在阿里云服務(wù)器管理控制臺(tái)創(chuàng)建
properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
//設(shè)置發(fā)送超時(shí)時(shí)間,單位毫秒
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// 設(shè)置 TCP 接入域名(此處以公共云生產(chǎn)環(huán)境為例)
properties.setProperty(PropertyKeyConst.ONSAddr, ONSAddr);
producer = ONSFactory.createProducer(properties);
// 在發(fā)送消息前,必須調(diào)用start方法來(lái)啟動(dòng)Producer,只需調(diào)用一次即可
producer.start();
}
public Producer getProducer(){
return producer;
}
}
spring配置
spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect consumerConfig.enabled = true producerConfig.enabled = true #方式1: scheduling.enabled = false #方式2、3:rocketMQ \u516C\u7F51\u914D\u7F6E openservices.ons.producerBean.producerId = pid openservices.ons.producerBean.accessKey = openservices.ons.producerBean.secretKey = openservices.ons.producerBean.ONSAddr = 公網(wǎng)、杭州公有云生產(chǎn)
方式1投遞消息代碼:
try {
String jsonC = JsonUtils.toJson(elevenMessage);
Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, jsonC.getBytes());
SendResult sendResult = producer.send(message);
if (sendResult != null) {
logger.info(".Send mq message success!”;
} else {
logger.warn(".sendResult is null.........");
}
} catch (Exception e) {
logger.warn("DoubleElevenAllPreService");
Thread.sleep(1000);//如果有異常,休眠1秒
}
方式2投遞消息代碼:(可以每發(fā)1000個(gè)啟動(dòng)/關(guān)閉一次)
producerBean.start();
try {
String jsonC = JsonUtils.toJson(elevenMessage);
Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, jsonC.getBytes());
SendResult sendResult = producer.send(message);
if (sendResult != null) {
logger.info(".Send mq message success!”;
} else {
logger.warn(".sendResult is null.........");
}
} catch (Exception e) {
logger.warn("DoubleElevenAllPreService");
Thread.sleep(1000);//如果有異常,休眠1秒
}
producerBean.shutdown();
方式3:投遞消息
try {
String jsonC = JsonUtils.toJson(elevenMessage);
Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, jsonC.getBytes());
Producer producer = ProducerBeanSingleTon.getInstance().getProducer();
SendResult sendResult = producer.send(message);
if (sendResult != null) {
logger.info("DoubleElevenMidService.Send mq message success! Topic is:"”;
} else {
logger.warn("DoubleElevenMidService.sendResult is null.........");
}
} catch (Exception e) {
logger.error("DoubleElevenMidService Thread.sleep 1 s___error is "+e.getMessage(), e);
Thread.sleep(1000);//如果有異常,休眠1秒
}
發(fā)送消息的代碼一定要捕獲異常,不然會(huì)重復(fù)發(fā)送。
這里的TOPIC用自己創(chuàng)建的,elevenMessage是要發(fā)送的內(nèi)容,我這里是自己建的對(duì)象
3、消費(fèi)者
配置啟動(dòng)類(lèi):
@Configuration
@ConditionalOnProperty(value = "consumerConfig.enabled", havingValue = "true", matchIfMissing = true)
public class ConsumerConfig {
private Logger logger = LoggerFactory.getLogger(LoggerAppenderType.smsdist.name());
@Bean
public Consumer consumerFactory(){//不同消費(fèi)者 這里不能重名
Properties consumerProperties = new Properties();
consumerProperties.setProperty(PropertyKeyConst.ConsumerId, MqConfig.CONSUMER_ID);
consumerProperties.setProperty(PropertyKeyConst.AccessKey, MqConfig.ACCESS_KEY);
consumerProperties.setProperty(PropertyKeyConst.SecretKey, MqConfig.SECRET_KEY);
//consumerProperties.setProperty(PropertyKeyConst.ConsumeThreadNums,MqConfig.THREAD_NUM);
consumerProperties.setProperty(PropertyKeyConst.ONSAddr, MqConfig.ONSADDR);
Consumer consumer = ONSFactory.createConsumer(consumerProperties);
consumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new DoubleElevenMessageListener());//new對(duì)應(yīng)的監(jiān)聽(tīng)器
consumer.start();
logger.info("ConsumerConfig start success.");
return consumer;
}
}
CID和ONSADDR一點(diǎn)要選對(duì),用自己的,消費(fèi)者線程數(shù)等可以在這里配置
創(chuàng)建消息監(jiān)聽(tīng)器類(lèi),消費(fèi)消息:
@Component
public class MessageListener implements MessageListener {
private Logger logger = LoggerFactory.getLogger("remind");
protected static ElevenReposity elevenReposity;
@Resource
public void setElevenReposity(ElevenReposity elevenReposity){
MessageListener .elevenReposity=elevenReposity;
}
@Override
public Action consume(Message message, ConsumeContext consumeContext) {
if(message.getTopic().equals("自己的TOPIC")){//避免消費(fèi)到其他消息 json轉(zhuǎn)換報(bào)錯(cuò)
try {
byte[] body = message.getBody();
String res = new String(body);
//res 是生產(chǎn)者傳過(guò)來(lái)的消息內(nèi)容
//業(yè)務(wù)代碼
}else{
logger.warn("!");
}
} catch (Exception e) {
logger.error("MessageListener.consume error:" + e.getMessage(), e);
}
logger.info("MessageListener.Receive message”);
//如果想測(cè)試消息重投的功能,可以將Action.CommitMessage 替換成Action.ReconsumeLater
return Action.CommitMessage;
}else{
logger.warn();
return Action.ReconsumeLater;
}
}
注意,由于消費(fèi)者是多線程的,所以對(duì)象要用static+set注入,把對(duì)象的級(jí)別提升到進(jìn)程,這樣多個(gè)線程就可以共用,但是無(wú)法調(diào)用父類(lèi)的方法和變量

消費(fèi)者狀態(tài)可以查看消費(fèi)者是否連接成功,消費(fèi)是否延遲,消費(fèi)速度等
重置消費(fèi)位點(diǎn)可以清空所有消息
三、注意事項(xiàng)
1、發(fā)送的消息體 最大為256KB
2、消息最多存在3天
3、消費(fèi)端默認(rèn)線程數(shù)是20
4、如果運(yùn)行過(guò)程中出現(xiàn)java掛掉或者cpu占用異常高,可以在發(fā)送消息的時(shí)候,每發(fā)送1000條讓線程休息1s
5、本地測(cè)試或啟動(dòng)的時(shí)候,把ONSADDR換成公網(wǎng),不然報(bào)錯(cuò)無(wú)法啟動(dòng)
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
- springBoot整合RocketMQ及坑的示例代碼
- SpringBoot整合RocketMQ實(shí)現(xiàn)消息發(fā)送和接收的詳細(xì)步驟
- Springboot RocketMq實(shí)現(xiàn)過(guò)程詳解
- 解決SpringBoot整合RocketMQ遇到的坑
- SpringBoot整合RocketMQ實(shí)現(xiàn)發(fā)送同步消息
- Springboot詳解RocketMQ實(shí)現(xiàn)消息發(fā)送與接收流程
- SpringBoot集成RocketMQ的使用示例
- SpringBoot項(xiàng)目嵌入RocketMQ的實(shí)現(xiàn)示例
- SpringBoot+RocketMQ實(shí)現(xiàn)延遲消息的示例代碼
相關(guān)文章
java算法之Math.random()隨機(jī)概率玩法實(shí)例演示
最近打算整理排序算法,發(fā)現(xiàn)很有必要準(zhǔn)備一下生成隨機(jī)數(shù)的工具類(lèi),下面這篇文章主要給大家介紹了關(guān)于java算法之Math.random()隨機(jī)概率玩法的相關(guān)資料,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-05-05
IDEA+maven+SpringBoot+JPA+Thymeleaf實(shí)現(xiàn)Crud及分頁(yè)
這篇文章主要介紹了不需要電腦任何操作基于IDEA + maven + SpringBoot + JPA + Thymeleaf實(shí)現(xiàn)CRUD及分頁(yè),需要的朋友可以參考下2018-03-03
快速搭建Spring Boot+MyBatis的項(xiàng)目IDEA(附源碼下載)
這篇文章主要介紹了快速搭建Spring Boot+MyBatis的項(xiàng)目IDEA(附源碼下載),本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-12-12
springboot讀取bootstrap配置及knife4j版本兼容性問(wèn)題及解決
這篇文章主要介紹了springboot讀取bootstrap配置及knife4j版本兼容性問(wèn)題及解決,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-06-06
Netty分布式固定長(zhǎng)度解碼器實(shí)現(xiàn)原理剖析
這篇文章主要為大家介紹了Netty分布式固定長(zhǎng)度解碼器原理剖析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-03-03
Java位運(yùn)算和邏輯運(yùn)算的區(qū)別實(shí)例
Java位運(yùn)算和邏輯運(yùn)算的區(qū)別實(shí)例,請(qǐng)參考下面代碼,希望對(duì)你有所幫助2013-02-02
Java微信公眾平臺(tái)開(kāi)發(fā)(12) 微信用戶(hù)信息的獲取
這篇文章主要為大家詳細(xì)介紹了Java微信公眾平臺(tái)開(kāi)發(fā)第十二步,微信用戶(hù)信息的獲取,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-04-04
SpringCloud基于RestTemplate微服務(wù)項(xiàng)目案例解析
這篇文章主要介紹了SpringCloud基于RestTemplate微服務(wù)項(xiàng)目案例,在寫(xiě)SpringCloud搭建微服務(wù)之前,先搭建一個(gè)不通過(guò)springcloud只通過(guò)SpringBoot和Mybatis進(jìn)行模塊之間通訊,通過(guò)一個(gè)案例給大家詳細(xì)說(shuō)明,需要的朋友可以參考下2022-05-05

