解決springboot集成rocketmq關(guān)于tag的坑
springboot集成rocketmq關(guān)于tag的坑
新項目使用springboot的若依框架集成rocketmq,選擇集成RocketMQTemplate這種方式實現(xiàn)消息的發(fā)送和接收。
1.客戶端發(fā)送代碼
此處回調(diào)方法里有些業(yè)務(wù)不用關(guān)注,只關(guān)心發(fā)送方法
@Component public class RocketMqHelper { Logger logger = LoggerFactory.getLogger(RocketMqHelper.class); @Resource private RocketMQTemplate rocketMQTemplate; public void send(ReqMsg msg){ rocketMQTemplate.asyncSend(msg.getMsg().getTopic()+":"+msg.getMsg().getTags(), msg.getMsg(), new SendCallback(){ @Override public void onSuccess(SendResult sendResult) { logger.debug("msgid:{} 發(fā)送成功" , sendResult.getMsgId()); logger.debug("發(fā)送mq成功后要執(zhí)行的service: {}",msg.getMsg().getSendAfterMethod()); IsaveSendAfterMqLog saveSendAfterMqLog = SpringUtils.getBean(msg.getMsg().getSendAfterMethod()); saveSendAfterMqLog.saveSendAfterMqLog(new SendAfterLog(msg.getMsg(),sendResult,"0")); } @Override public void onException(Throwable throwable) { logger.error("mq發(fā)送異常!{}",throwable.toString()); logger.debug("發(fā)送mq失敗后執(zhí)行的service: {}",msg.getMsg().getSendAfterMethod()); //異常描述截取500 length入庫 msg.getMsg().putUserProperty("exceptionDesc",throwable.toString()); IsaveSendAfterMqLog saveSendAfterMqLog = SpringUtils.getBean(msg.getMsg().getSendAfterMethod()); saveSendAfterMqLog.saveSendAfterMqLog(new SendAfterLog(msg.getMsg(),"1")); } }); } }
2.服務(wù)端監(jiān)聽消息
@Service @RocketMQMessageListener(topic = "${rocketmq.topic}", consumerGroup = "${rocketmq.consumer.group}", selectorExpression="${rocketmq.tags}") public class CbiRocketmqConsumer implements RocketMQListener<CbiMsg> { Logger logger = LoggerFactory.getLogger(CbiRocketmqConsumer.class); @Override public void onMessage(CbiMsg message) { String msgBody = new String(message.getBody()); String serviceName = message.getTags(); logger.info("本次消費服務(wù)名稱:{}",serviceName); AbSaveReceiveAfter saveReceiveAfter = SpringUtils.getBean(serviceName); saveReceiveAfter.saveReceiveAfter(new RecevieAfterLog(message, Constants.CONSUME_SUCCESS));//默認(rèn)消費成功 } }
@RocketMQMessageListener這個注解里selectorExpression默認(rèn)是*,接收topic下全部消息。想動態(tài)對tags進(jìn)行配置。于是利用springboot獲取yml配置。寫死的時候沒有問題,但是改成$表達(dá)式配置后怎么都收不到消息,經(jīng)排查居然是selectorExpression這個不支持配置,會原封的按表達(dá)式進(jìn)入MQ容器初始化。然而注解里面的topic,comsumerGroup都可以正常拿到配置值。
翻源碼發(fā)現(xiàn)問題所在,項目啟動時,在ListenerContainerConfiguration在這個類里初始化mq容器時,對配置進(jìn)行賦值
private DefaultRocketMQListenerContainer createRocketMQListenerContainer(Object bean, RocketMQMessageListener annotation) { DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer(); container.setNameServer(rocketMQProperties.getNameServer()); container.setTopic(environment.resolvePlaceholders(annotation.topic())); container.setConsumerGroup(environment.resolvePlaceholders (annotation.consumerGroup())); container.setRocketMQMessageListener(annotation); container.setRocketMQListener((RocketMQListener) bean); container.setObjectMapper(objectMapper); return container; }
topic和comsumerGroup都在springboot環(huán)境里獲取配置值了,唯獨selectorExpression這個沒有,直接默認(rèn)注解里的。下面的問題就是需要自己在項目啟動,springboot容器起來,但是rocketmq容器未起的時候,動態(tài)去改注解里配置的值。然后讓Rocketmq啟動。
** * 因為RocketMQMessageListener不提供動態(tài)配置功能 * springboot初始化后rocket容器初始化前利用反射動態(tài)改變 * RocketMQMessageListener注解selectorExpression的值 * * */ @Component public class ChangeSelectorExpressionBeforeMqStart implements InitializingBean { @Value("${rocketmq.consumer.tags}") private String tags; @Override public void afterPropertiesSet() throws Exception { RocketMQMessageListener annoTable = CbiRocketmqConsumer.class.getAnnotation(RocketMQMessageListener.class); // 獲取代理處理器 InvocationHandler invocationHandler = Proxy.getInvocationHandler(annoTable); // 獲取私有 memberValues 屬性 Field f = invocationHandler.getClass().getDeclaredField("memberValues"); f.setAccessible(true); // 獲取實例的屬性map Map<String, Object> memberValues = (Map<String, Object>) f.get(invocationHandler); // 修改屬性值 memberValues.put("selectorExpression", tags); } }
問題解決。。
SpringBoot集成RocketMQ及報錯處理
項目場景:
說明:springBoot集成RocketMQ開發(fā)
環(huán)境:阿里云+Centos8+RocketMQ+SpringBoot+Docker
啟動:docker start rmqserver rmqbroker[因為RocketMQ安裝在Docket容器中,所以這樣啟動]
服務(wù)器broker.conf配置信息:
brokerIP1=外網(wǎng)ip namesrvAddr=外網(wǎng)ip:9876 brokerName=broker_tanhua autoCreateTopicEnable=true
說明:
1.brokerIP1 當(dāng)前broker監(jiān)聽的IP
2.Broker是RocketMq的核心,負(fù)責(zé)消息的傳遞(提供者=》消費者)以及消息的持久化存儲,消息的HA機制以及服務(wù)器過濾功能。
3.autoCreateTopicEnable:自動創(chuàng)建Topic路由
問題一描述:
我第一次配置時,broker.conf配置文件中沒有配置autoCreateTopicEnable,因此在程序運行時會提示沒有路由信息:No route info of this topic: tanhua-sso-login
我發(fā)送消息路由名字是tanhua-sso-login
錯誤信息:
No route info of this topic: tanhua-sso-login
錯誤信息截圖:我沒有截圖網(wǎng)上找了一個,差不多
解決方式:
我當(dāng)時也在網(wǎng)上找了很多,有在啟動時添加自動創(chuàng)建的也有說防火墻開啟的原因,但是我感覺會這個的話應(yīng)該都知道關(guān)防火墻。
在啟動時添加自動創(chuàng)建可能也好使,但是我沒試過,因為我在搜索時發(fā)現(xiàn)問題統(tǒng)一指向說沒有自動創(chuàng)建,因此我想的是直接在配置文件中進(jìn)行修改,然后重啟
解決方式:
在broker.conf配置文件中添加如下配置:
autoCreateTopicEnable=true
SpringBoot集成信息:
application.properties:
# RocketMQ相關(guān)配置 rocketmq.nameServer=外網(wǎng)IP:9876 rocketmq.producer.group=tanhua rocketmq.producer.send-message-timeout= 6000
【注】:這里配置的開通沒有spring,我之前加spring怎么也連接不上
pom.xml:
<!--RocketMQ相關(guān)--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.4</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.5.1</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-common</artifactId> <version>4.5.1</version> </dependency>
問題二描述:
我在修改上面的錯誤后,緊接著又報
錯誤信息:
RemotingTooMuchRequestException: sendDefaultImpl call timeout
錯誤信息截圖:也是沒有截圖網(wǎng)上找了一個,差不多
思路:錯誤信息中提示call timeout,timeout一般想到到時連接或響應(yīng)超時,因此在網(wǎng)上找到的是在發(fā)送MQ時出錯,網(wǎng)上解決方案是:修改Mq配置文件中的sendMsgTimeout,因此想到修改可以修改SpringBoot連接MQ時的配置設(shè)置
解決方案:添加rocketmq.producer.send-message-timeout= 6000
說明:給大一點發(fā)送信息超時時間。
說明:同時在SpringBoot集成RoctetMQ配置中沒有sendMsgTimeout因此用rocketmq=>輸入'.'=>輸入sendtimeout=>查看有哪些關(guān)于這個的配置。
完整配置:
# RocketMQ相關(guān)配置 rocketmq.nameServer=外網(wǎng)IP:9876 rocketmq.producer.group=tanhua rocketmq.producer.send-message-timeout= 6000
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
Apache Commons fileUpload文件上傳多個示例分享
這篇文章主要為大家分享了Apache Commons fileUpload文件上傳4個示例,具有一定的參考價值,感興趣的小伙伴們可以參考一下2016-10-10