解決springboot集成rocketmq關(guān)于tag的坑
springboot集成rocketmq關(guān)于tag的坑
新項(xiàng)目使用springboot的若依框架集成rocketmq,選擇集成RocketMQTemplate這種方式實(shí)現(xiàn)消息的發(fā)送和接收。
1.客戶端發(fā)送代碼
此處回調(diào)方法里有些業(yè)務(wù)不用關(guān)注,只關(guān)心發(fā)送方法
@Component public class RocketMqHelper { Logger logger = LoggerFactory.getLogger(RocketMqHelper.class); @Resource private RocketMQTemplate rocketMQTemplate; public void send(ReqMsg msg){ rocketMQTemplate.asyncSend(msg.getMsg().getTopic()+":"+msg.getMsg().getTags(), msg.getMsg(), new SendCallback(){ @Override public void onSuccess(SendResult sendResult) { logger.debug("msgid:{} 發(fā)送成功" , sendResult.getMsgId()); logger.debug("發(fā)送mq成功后要執(zhí)行的service: {}",msg.getMsg().getSendAfterMethod()); IsaveSendAfterMqLog saveSendAfterMqLog = SpringUtils.getBean(msg.getMsg().getSendAfterMethod()); saveSendAfterMqLog.saveSendAfterMqLog(new SendAfterLog(msg.getMsg(),sendResult,"0")); } @Override public void onException(Throwable throwable) { logger.error("mq發(fā)送異常!{}",throwable.toString()); logger.debug("發(fā)送mq失敗后執(zhí)行的service: {}",msg.getMsg().getSendAfterMethod()); //異常描述截取500 length入庫(kù) msg.getMsg().putUserProperty("exceptionDesc",throwable.toString()); IsaveSendAfterMqLog saveSendAfterMqLog = SpringUtils.getBean(msg.getMsg().getSendAfterMethod()); saveSendAfterMqLog.saveSendAfterMqLog(new SendAfterLog(msg.getMsg(),"1")); } }); } }
2.服務(wù)端監(jiān)聽(tīng)消息
@Service @RocketMQMessageListener(topic = "${rocketmq.topic}", consumerGroup = "${rocketmq.consumer.group}", selectorExpression="${rocketmq.tags}") public class CbiRocketmqConsumer implements RocketMQListener<CbiMsg> { Logger logger = LoggerFactory.getLogger(CbiRocketmqConsumer.class); @Override public void onMessage(CbiMsg message) { String msgBody = new String(message.getBody()); String serviceName = message.getTags(); logger.info("本次消費(fèi)服務(wù)名稱:{}",serviceName); AbSaveReceiveAfter saveReceiveAfter = SpringUtils.getBean(serviceName); saveReceiveAfter.saveReceiveAfter(new RecevieAfterLog(message, Constants.CONSUME_SUCCESS));//默認(rèn)消費(fèi)成功 } }
@RocketMQMessageListener這個(gè)注解里selectorExpression默認(rèn)是*,接收topic下全部消息。想動(dòng)態(tài)對(duì)tags進(jìn)行配置。于是利用springboot獲取yml配置。寫(xiě)死的時(shí)候沒(méi)有問(wèn)題,但是改成$表達(dá)式配置后怎么都收不到消息,經(jīng)排查居然是selectorExpression這個(gè)不支持配置,會(huì)原封的按表達(dá)式進(jìn)入MQ容器初始化。然而注解里面的topic,comsumerGroup都可以正常拿到配置值。
翻源碼發(fā)現(xiàn)問(wèn)題所在,項(xiàng)目啟動(dòng)時(shí),在ListenerContainerConfiguration在這個(gè)類里初始化mq容器時(shí),對(duì)配置進(jìn)行賦值
private DefaultRocketMQListenerContainer createRocketMQListenerContainer(Object bean, RocketMQMessageListener annotation) { DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer(); container.setNameServer(rocketMQProperties.getNameServer()); container.setTopic(environment.resolvePlaceholders(annotation.topic())); container.setConsumerGroup(environment.resolvePlaceholders (annotation.consumerGroup())); container.setRocketMQMessageListener(annotation); container.setRocketMQListener((RocketMQListener) bean); container.setObjectMapper(objectMapper); return container; }
topic和comsumerGroup都在springboot環(huán)境里獲取配置值了,唯獨(dú)selectorExpression這個(gè)沒(méi)有,直接默認(rèn)注解里的。下面的問(wèn)題就是需要自己在項(xiàng)目啟動(dòng),springboot容器起來(lái),但是rocketmq容器未起的時(shí)候,動(dòng)態(tài)去改注解里配置的值。然后讓Rocketmq啟動(dòng)。
** * 因?yàn)镽ocketMQMessageListener不提供動(dòng)態(tài)配置功能 * springboot初始化后rocket容器初始化前利用反射動(dòng)態(tài)改變 * RocketMQMessageListener注解selectorExpression的值 * * */ @Component public class ChangeSelectorExpressionBeforeMqStart implements InitializingBean { @Value("${rocketmq.consumer.tags}") private String tags; @Override public void afterPropertiesSet() throws Exception { RocketMQMessageListener annoTable = CbiRocketmqConsumer.class.getAnnotation(RocketMQMessageListener.class); // 獲取代理處理器 InvocationHandler invocationHandler = Proxy.getInvocationHandler(annoTable); // 獲取私有 memberValues 屬性 Field f = invocationHandler.getClass().getDeclaredField("memberValues"); f.setAccessible(true); // 獲取實(shí)例的屬性map Map<String, Object> memberValues = (Map<String, Object>) f.get(invocationHandler); // 修改屬性值 memberValues.put("selectorExpression", tags); } }
問(wèn)題解決。。
SpringBoot集成RocketMQ及報(bào)錯(cuò)處理
項(xiàng)目場(chǎng)景:
說(shuō)明:springBoot集成RocketMQ開(kāi)發(fā)
環(huán)境:阿里云+Centos8+RocketMQ+SpringBoot+Docker
啟動(dòng):docker start rmqserver rmqbroker[因?yàn)镽ocketMQ安裝在Docket容器中,所以這樣啟動(dòng)]
服務(wù)器broker.conf配置信息:
brokerIP1=外網(wǎng)ip namesrvAddr=外網(wǎng)ip:9876 brokerName=broker_tanhua autoCreateTopicEnable=true
說(shuō)明:
1.brokerIP1 當(dāng)前broker監(jiān)聽(tīng)的IP
2.Broker是RocketMq的核心,負(fù)責(zé)消息的傳遞(提供者=》消費(fèi)者)以及消息的持久化存儲(chǔ),消息的HA機(jī)制以及服務(wù)器過(guò)濾功能。
3.autoCreateTopicEnable:自動(dòng)創(chuàng)建Topic路由
問(wèn)題一描述:
我第一次配置時(shí),broker.conf配置文件中沒(méi)有配置autoCreateTopicEnable,因此在程序運(yùn)行時(shí)會(huì)提示沒(méi)有路由信息:No route info of this topic: tanhua-sso-login
我發(fā)送消息路由名字是tanhua-sso-login
錯(cuò)誤信息:
No route info of this topic: tanhua-sso-login
錯(cuò)誤信息截圖:我沒(méi)有截圖網(wǎng)上找了一個(gè),差不多
解決方式:
我當(dāng)時(shí)也在網(wǎng)上找了很多,有在啟動(dòng)時(shí)添加自動(dòng)創(chuàng)建的也有說(shuō)防火墻開(kāi)啟的原因,但是我感覺(jué)會(huì)這個(gè)的話應(yīng)該都知道關(guān)防火墻。
在啟動(dòng)時(shí)添加自動(dòng)創(chuàng)建可能也好使,但是我沒(méi)試過(guò),因?yàn)槲以谒阉鲿r(shí)發(fā)現(xiàn)問(wèn)題統(tǒng)一指向說(shuō)沒(méi)有自動(dòng)創(chuàng)建,因此我想的是直接在配置文件中進(jìn)行修改,然后重啟
解決方式:
在broker.conf配置文件中添加如下配置:
autoCreateTopicEnable=true
SpringBoot集成信息:
application.properties:
# RocketMQ相關(guān)配置 rocketmq.nameServer=外網(wǎng)IP:9876 rocketmq.producer.group=tanhua rocketmq.producer.send-message-timeout= 6000
【注】:這里配置的開(kāi)通沒(méi)有spring,我之前加spring怎么也連接不上
pom.xml:
<!--RocketMQ相關(guān)--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.4</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.5.1</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-common</artifactId> <version>4.5.1</version> </dependency>
問(wèn)題二描述:
我在修改上面的錯(cuò)誤后,緊接著又報(bào)
錯(cuò)誤信息:
RemotingTooMuchRequestException: sendDefaultImpl call timeout
錯(cuò)誤信息截圖:也是沒(méi)有截圖網(wǎng)上找了一個(gè),差不多
思路:錯(cuò)誤信息中提示call timeout,timeout一般想到到時(shí)連接或響應(yīng)超時(shí),因此在網(wǎng)上找到的是在發(fā)送MQ時(shí)出錯(cuò),網(wǎng)上解決方案是:修改Mq配置文件中的sendMsgTimeout,因此想到修改可以修改SpringBoot連接MQ時(shí)的配置設(shè)置
解決方案:添加rocketmq.producer.send-message-timeout= 6000
說(shuō)明:給大一點(diǎn)發(fā)送信息超時(shí)時(shí)間。
說(shuō)明:同時(shí)在SpringBoot集成RoctetMQ配置中沒(méi)有sendMsgTimeout因此用rocketmq=>輸入'.'=>輸入sendtimeout=>查看有哪些關(guān)于這個(gè)的配置。
完整配置:
# RocketMQ相關(guān)配置 rocketmq.nameServer=外網(wǎng)IP:9876 rocketmq.producer.group=tanhua rocketmq.producer.send-message-timeout= 6000
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
手把手帶你實(shí)現(xiàn)一個(gè)萌芽版的Spring容器
大家好,我是老三,Spring是我們最常用的開(kāi)源框架,經(jīng)過(guò)多年發(fā)展,Spring已經(jīng)發(fā)展成枝繁葉茂的大樹(shù),讓我們難以窺其全貌,這節(jié),我們回歸Spring的本質(zhì),五分鐘手?jǐn)]一個(gè)Spring容器,揭開(kāi)Spring神秘的面紗2022-03-03Apache Commons fileUpload文件上傳多個(gè)示例分享
這篇文章主要為大家分享了Apache Commons fileUpload文件上傳4個(gè)示例,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2016-10-10java swing實(shí)現(xiàn)簡(jiǎn)單計(jì)算器界面
這篇文章主要為大家詳細(xì)介紹了java swing實(shí)現(xiàn)簡(jiǎn)單計(jì)算器界面,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-04-04