淺談使用java實現(xiàn)阿里云消息隊列簡單封裝
一、前言
最近公司有使用阿里云消息隊列的需求,為了更加方便使用,本人用了幾天時間將消息隊列封裝成api調(diào)用方式以方便內(nèi)部系統(tǒng)的調(diào)用,現(xiàn)在已經(jīng)完成,特此記錄其中過程和使用到的相關(guān)技術(shù),與君共勉。
現(xiàn)在阿里云提供了兩種消息服務(wù):mns服務(wù)和ons服務(wù),其中我認(rèn)為mns是簡化版的ons,而且mns的消息消費需要自定義輪詢策略的,相比之下,ons的發(fā)布與訂閱模式功能更加強大(比如相對于mns,ons提供了消息追蹤、日志、監(jiān)控等功能),其api使用起來更加方便,而且聽聞阿里內(nèi)部以后不再對mns進(jìn)行新的開發(fā),只做維護,ons服務(wù)則會逐步替代mns服務(wù)成為阿里消息服務(wù)的主打產(chǎn)品,所以,如果有使用消息隊列的需求,建議不要再使用mns,使用ons是最好的選擇。
涉及到的技術(shù):Spring,反射、動態(tài)代理、Jackson序列化和反序列化
在看下面的文章之前,需要先看上面的文檔以了解相關(guān)概念(Topic、Consumer、Producer、Tag等)以及文檔中提供的簡單的發(fā)送和接收代碼實現(xiàn)。
該博文只針對有消息隊列知識基礎(chǔ)的朋友看,能幫上大家的忙我自然很高興,看不懂的也不要罵,說明你路子不對。
二、設(shè)計方案
1.消息發(fā)送
在一個簡單的cs架構(gòu)中,假設(shè)server會監(jiān)聽一個Topic的Producer發(fā)送的消息,那么它首先應(yīng)該提供client一個api,client只需要簡單的調(diào)用該api,就可以通過producer來生產(chǎn)消息
2.消息接收
由于api是server制定的,所以server當(dāng)然也知道如何消費這些消息
在這個過程中,server實際充當(dāng)著消費者的角色,client實際充當(dāng)著生產(chǎn)者的角色,但是生產(chǎn)者生產(chǎn)消息的規(guī)則則由消費者制定以滿足消費者消費需求。
3.最終目標(biāo)
我們要創(chuàng)建一個單獨的jar包,起名為queue-core為生產(chǎn)者和消費者提供依賴和發(fā)布訂閱的具體實現(xiàn)。
三、消息發(fā)送
1.消費者提供接口
@Topic(name="kdyzm",producerId="kdyzm_producer") public interface UserQueueResource { @Tag("test1") public void handleUserInfo(@Body @Key("userInfoHandler") UserModel user); @Tag("test2") public void handleUserInfo1(@Body @Key("userInfoHandler1") UserModel user); }
由于Topic和producer之間是N:1的關(guān)系,所以這里直接將producerId作為Topic的一個屬性;Tag是一個很關(guān)鍵的過濾條件,消費者通過它進(jìn)行消息的分類做不同的業(yè)務(wù)處理,所以,這里使用Tag作為路由條件。
2.生產(chǎn)者使用消費者提供的api發(fā)送消息
由于消費者只提供了接口給生產(chǎn)者使用,接口是沒有辦法直接使用的,因為沒有辦法實例化,這里使用動態(tài)代理生成對象,在消費者提供的api中,添加如下config,以方便生產(chǎn)者直接導(dǎo)入config即可使用,這里使用了基于java的spring config,請知悉。
@Configuration public class QueueConfig { @Autowired @Bean public UserQueueResource userQueueResource() { return QueueResourceFactory.createProxyQueueResource(UserQueueResource.class); } }
3.queue-core對生產(chǎn)者發(fā)送消息的封裝
以上1中所有的注解(Topic、Tag、Body 、Key)以及2中使用到的QueueResourceFactory類都要在queue-core中定義,其中注解的定義只是定義了規(guī)則,真正的實現(xiàn)實際上是在QueueResourceFactory中
import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.Producer; import com.aliyun.openservices.ons.api.SendResult; import com.wy.queue.core.api.MQConnection; import com.wy.queue.core.utils.JacksonSerializer; import com.wy.queue.core.utils.MQUtils; import com.wy.queue.core.utils.QueueCoreSpringUtils; public class QueueResourceFactory implements InvocationHandler { private static final Logger logger=LoggerFactory.getLogger(QueueResourceFactory.class); private String topicName; private String producerId; private JacksonSerializer serializer=new JacksonSerializer(); private static final String PREFIX="PID_"; public QueueResourceFactory(String topicName,String producerId) { this.topicName = topicName; this.producerId=producerId; } public static <T> T createProxyQueueResource(Class<T> clazz) { String topicName = MQUtils.getTopicName(clazz); String producerId = MQUtils.getProducerId(clazz); T target = (T) Proxy.newProxyInstance(QueueResourceFactory.class.getClassLoader(), new Class<?>[] { clazz }, new QueueResourceFactory(topicName,producerId)); return target; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if(args.length == 0 || args.length>1){ throw new RuntimeException("only accept one param at queueResource interface."); } String tagName=MQUtils.getTagName(method); ProducerFactory producerFactory = QueueCoreSpringUtils.getBean(ProducerFactory.class); MQConnection connectionInfo = QueueCoreSpringUtils.getBean(MQConnection.class); Producer producer = producerFactory.createProducer(PREFIX+connectionInfo.getPrefix()+"_"+producerId); //發(fā)送消息 Message msg = new Message( // // 在控制臺創(chuàng)建的 Topic,即該消息所屬的 Topic 名稱 connectionInfo.getPrefix()+"_"+topicName, // Message Tag, // 可理解為 Gmail 中的標(biāo)簽,對消息進(jìn)行再歸類,方便 Consumer 指定過濾條件在 MQ 服務(wù)器過濾 tagName, // Message Body // 任何二進(jìn)制形式的數(shù)據(jù), MQ 不做任何干預(yù), // 需要 Producer 與 Consumer 協(xié)商好一致的序列化和反序列化方式 serializer.serialize(args[0]).getBytes()); SendResult sendResult = producer.send(msg); logger.info("Send Message success. Message ID is: " + sendResult.getMessageId()); return null; } }
這里特意將自定義包和第三方使用的包名都貼過來了,以便于區(qū)分。
這里到底做了哪些事情呢?
發(fā)送消息的過程就是動態(tài)代理創(chuàng)建一個代理對象,該對象調(diào)用方法的時候會被攔截,首先解析所有的注解,比如topicName、producerId、tag等關(guān)鍵信息從注解中取出來,然后調(diào)用阿里sdk發(fā)送消息,過程很簡單,但是注意,這里發(fā)送消息的時候是分環(huán)境的,一般來講現(xiàn)在企業(yè)中會區(qū)分QA、staging、product三種環(huán)境,其中QA和staging是測試環(huán)境,對于消息隊列來講,也是會有三種環(huán)境的,但是QA和staging環(huán)境往往為了降低成本使用同一個阿里賬號,所以創(chuàng)建的topic和productId會放到同一個區(qū)域下,這樣同名的TopicName是不允許存在的,所以加上了環(huán)境前綴加以區(qū)分,比如QA_TopicName,PID_Staging_ProducerId等等;另外,queue-core提供了MQConnection接口,以獲取配置信息,生產(chǎn)者服務(wù)只需要實現(xiàn)該接口即可。
4.生產(chǎn)者發(fā)送消息
@Autowired private UserQueueResource userQueueResource; @Override public void sendMessage() { UserModel userModel=new UserModel(); userModel.setName("kdyzm"); userModel.setAge(25); userQueueResource.handleUserInfo(userModel); }
只需要數(shù)行代碼即可將消息發(fā)送到指定的Topic,相對于原生的發(fā)送代碼,精簡了太多。
四、消息消費
相對于消息發(fā)送,消息的消費要復(fù)雜一些。
1.消息消費設(shè)計
由于Topic和Consumer之間是N:N的關(guān)系,所以將ConsumerId放到消費者具體實現(xiàn)的方法上
@Controller @QueueResource public class UserQueueResourceImpl implements UserQueueResource { private Logger logger = LoggerFactory.getLogger(this.getClass()); @Override @ConsumerAnnotation("kdyzm_consumer") public void handleUserInfo(UserModel user) { logger.info("收到消息1:{}", new Gson().toJson(user)); } @Override @ConsumerAnnotation("kdyzm_consumer1") public void handleUserInfo1(UserModel user) { logger.info("收到消息2:{}", new Gson().toJson(user)); } }
這里又有兩個新的注解@QueueResource和@ConsumerAnnotation,這兩個注解后續(xù)會討論如何使用。有人會問我為什么要使用ConsumerAnnotation這個名字而不使用Consumer這個名字,因為Consumer這個名字和aliyun提供的sdk中的名字沖突了。。。。
在這里, 消費者提供api 接口給生產(chǎn)者以方便生產(chǎn)者發(fā)送消息,消費者則實現(xiàn)該接口以消費生產(chǎn)者發(fā)送的消息,如何實現(xiàn)api接口就實現(xiàn)了監(jiān)聽,這點是比較關(guān)鍵的邏輯。
2.queue-core實現(xiàn)消息隊列監(jiān)聽核心邏輯
第一步:使用sping 容器的監(jiān)聽方法獲取所有加上QueueResource注解的Bean
第二步:分發(fā)處理Bean
如何處理這些Bean呢,每個Bean實際上都是一個對象,有了對象,比如上面例子中的UserQueueResourceImpl 對象,我們可以拿到該對象實現(xiàn)的接口字節(jié)碼對象,進(jìn)而可以拿到該接口UserQueueRerousce上的注解以及方法上和方法中的注解,當(dāng)然UserQueueResourceImpl實現(xiàn)方法上的注解也能拿得到,這里我將獲取到的信息以consumerId為key,其余相關(guān)信息封裝為Value緩存到了一個Map對象中,核心代碼如下:
Class<?> clazz = resourceImpl.getClass(); Class<?> clazzIf = clazz.getInterfaces()[0]; Method[] methods = clazz.getMethods(); String topicName = MQUtils.getTopicName(clazzIf); for (Method m : methods) { ConsumerAnnotation consumerAnno = m.getAnnotation(ConsumerAnnotation.class); if (null == consumerAnno) { // logger.error("method={} need Consumer annotation.", m.getName()); continue; } String consuerId = consumerAnno.value(); if (StringUtils.isEmpty(consuerId)) { logger.error("method={} ConsumerId can't be null", m.getName()); continue; } Class<?>[] parameterTypes = m.getParameterTypes(); Method resourceIfMethod = null; try { resourceIfMethod = clazzIf.getMethod(m.getName(), parameterTypes); } catch (NoSuchMethodException | SecurityException e) { logger.error("can't find method={} at super interface={} .", m.getName(), clazzIf.getCanonicalName(), e); continue; } String tagName = MQUtils.getTagName(resourceIfMethod); consumersMap.put(consuerId, new MethodInfo(topicName, tagName, m)); }
第三步:通過反射實現(xiàn)消費的動作
首先,先確定好反射動作執(zhí)行的時機,那就是監(jiān)聽到了新的消息
其次,如何執(zhí)行反射動作?不贅述,有反射相關(guān)基礎(chǔ)的童鞋都知道怎么做,核心代碼如下所示:
MQConnection connectionInfo = QueueCoreSpringUtils.getBean(MQConnection.class); String topicPrefix=connectionInfo.getPrefix()+"_"; String consumerIdPrefix=PREFIX+connectionInfo.getPrefix()+"_"; for(String consumerId:consumersMap.keySet()){ MethodInfo methodInfo=consumersMap.get(consumerId); Properties connectionProperties=convertToProperties(connectionInfo); // 您在控制臺創(chuàng)建的 Consumer ID connectionProperties.put(PropertyKeyConst.ConsumerId, consumerIdPrefix+consumerId); Consumer consumer = ONSFactory.createConsumer(connectionProperties); consumer.subscribe(topicPrefix+methodInfo.getTopicName(), methodInfo.getTagName(), new MessageListener() { //訂閱多個Tag public Action consume(Message message, ConsumeContext context) { try { String messageBody=new String(message.getBody(),"UTF-8"); logger.info("receive message from topic={},tag={},consumerId={},message={}",topicPrefix+methodInfo.getTopicName(),methodInfo.getTagName(),consumerIdPrefix+consumerId,messageBody); Method method=methodInfo.getMethod(); Class<?> parameType = method.getParameterTypes()[0]; Object arg = jacksonSerializer.deserialize(messageBody, parameType); Object[] args={arg}; method.invoke(resourceImpl, args); } catch (Exception e) { logger.error("",e); } return Action.CommitMessage; } }); consumer.start(); logger.info("consumer={} has started.",consumerIdPrefix+consumerId); }
五、完整代碼見下面的git鏈接
https://github.com/kdyzm/queue-core.git
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
揭秘SpringBoot!一分鐘教你實現(xiàn)配置的動態(tài)神刷新
在今天的指南中,我們將深入探索SpringBoot?動態(tài)刷新的強大功能,讓你的應(yīng)用保持最新鮮的狀態(tài),想象一下,無需重啟,你的應(yīng)用就能實時更新配置,是不是很酷?跟我一起,讓我們揭開這項技術(shù)如何讓開發(fā)變得更加靈活和高效的秘密吧!2024-03-03Java動態(tài)代理機制詳解_動力節(jié)點Java學(xué)院整理
這篇文章主要為大家詳細(xì)介紹了Java動態(tài)代理機制,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-06-06詳解hashCode()和equals()的本質(zhì)區(qū)別和聯(lián)系
這篇文章主要介紹了詳解hashCode()和equals()的本質(zhì)區(qū)別和聯(lián)系,本文先對兩種方法作了介紹,然后對二者聯(lián)系進(jìn)行分析,具有一定參考價值,需要的朋友可以了解下。2017-09-09HTTP基本認(rèn)證(Basic Authentication)的JAVA實例代碼
下面小編就為大家?guī)硪黄狧TTP基本認(rèn)證(Basic Authentication)的JAVA實例代碼。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-11-11使用@Service注解出現(xiàn)No bean named 'xxxx'&
這篇文章主要介紹了使用@Service注解出現(xiàn)No bean named 'xxxx' available]錯誤的解決方案,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-08-08Java的Hibernate框架中一對多的單向和雙向關(guān)聯(lián)映射
建立對SQL語句的映射是Hibernate框架操作數(shù)據(jù)庫的主要手段,這里我們列舉實例來為大家講解Java的Hibernate框架中一對多的單向和雙向關(guān)聯(lián)映射2016-06-06