SpringBoot集成ActiveMQ的實(shí)戰(zhàn)全過程
前言
在項(xiàng)目開發(fā)的過程中我們經(jīng)常會(huì)遇到類似的業(yè)務(wù)場(chǎng)景:用戶申請(qǐng)?zhí)岈F(xiàn),后臺(tái)進(jìn)行賬務(wù)處理、發(fā)送提現(xiàn)短信、調(diào)用銀行打款通道。
在這個(gè)過程中調(diào)用三方通道(短信或銀行通道)都比較耗時(shí),同時(shí)賬務(wù)處理可能也是由專門的賬務(wù)系統(tǒng)進(jìn)行處理。那么,為了提高并發(fā)和相應(yīng)速度,后面的三個(gè)操作都可以通過異步進(jìn)行處理。這就用到了消息隊(duì)列。
消息隊(duì)列中間件是分布式系統(tǒng)中重要的組件,主要解決應(yīng)用耦合、異步消息、流量削鋒等問題,實(shí)現(xiàn)高性能、高可用、可伸縮和最終一致性架構(gòu),是大型分布式系統(tǒng)不可缺少的中間件。
市面上比較常見的消息隊(duì)列有:ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ。
在Spring Boot的starter中專門集成了ActiveMQ,因此,本篇文章我們就來講講對(duì)ActiveMQ的集成。
JMS規(guī)范
JMS即Java消息服務(wù)(Java Message Service)應(yīng)用程序接口,是一個(gè)Java平臺(tái)中關(guān)于面向消息中間件(MOM)的API,用于在兩個(gè)應(yīng)用程序之間,或分布式系統(tǒng)中發(fā)送消息,進(jìn)行異步通信。Java消息服務(wù)是一個(gè)與具體平臺(tái)無關(guān)的API,絕大多數(shù)MOM提供商都對(duì)JMS提供支持。
JMS的消息機(jī)制有2種模型,一種是隊(duì)列的形式(Point to Point—)發(fā)送的消息只能被一個(gè)消費(fèi)者消費(fèi);一種是訂閱(Topic)模式,可以被多個(gè)訂閱者訂閱,訂閱者都會(huì)接收到同樣的消息。
而ActiveMQ就是對(duì)JMS的實(shí)現(xiàn)之一。
ActiveMQ介紹
ActiveMQ是一種開源的基于JMS(Java Message Servie)規(guī)范的一種消息中間件的實(shí)現(xiàn),ActiveMQ的設(shè)計(jì)目標(biāo)是提供標(biāo)準(zhǔn)的、面向消息的、能夠跨越多語(yǔ)言和多系統(tǒng)的應(yīng)用集成消息通信中間件。
它為企業(yè)應(yīng)用中消息傳遞提供高可用、出色性能、可擴(kuò)展、穩(wěn)定和安全保障。
ActiveMQ實(shí)現(xiàn)JMS規(guī)范并在此之上提供大量額外的特性。ActiveMQ支持隊(duì)列和訂閱兩種模式的消息發(fā)送。
AcitveMQ的數(shù)據(jù)傳送流程如下圖:
ActiveMQ的兩種消息傳遞類型:
(1)點(diǎn)對(duì)點(diǎn)傳輸,即一個(gè)生產(chǎn)者對(duì)應(yīng)一個(gè)消費(fèi)者,生產(chǎn)者向broke推送數(shù)據(jù),數(shù)據(jù)存儲(chǔ)在broke的一個(gè)隊(duì)列中,當(dāng)消費(fèi)者接受該條隊(duì)列里的數(shù)據(jù)。
(2)基于發(fā)布/訂閱模式的傳輸,即根據(jù)訂閱話題來接收相應(yīng)數(shù)據(jù),一個(gè)生產(chǎn)者可向多個(gè)消費(fèi)者推送數(shù)據(jù),與MQTT協(xié)議的實(shí)現(xiàn)是類似的。
兩種消息傳遞類型的不同,點(diǎn)對(duì)點(diǎn)傳輸消費(fèi)者可以接收到在連接之前生產(chǎn)者所推送的數(shù)據(jù),而基于發(fā)布/訂閱模式的傳輸方式消費(fèi)者只能接收到連接之后生產(chǎn)者推送的數(shù)據(jù)。
Spring Boot集成ActiveMQ
Spring Boot針對(duì)ActiveMQ專門提供了spring-boot-starter-activemq,用來支持ActiveMQ在Spring Boot的自動(dòng)集成配置。在此基礎(chǔ)上我們可以很輕易的進(jìn)行集成和使用。
創(chuàng)建項(xiàng)目并引入依賴
創(chuàng)建標(biāo)準(zhǔn)的Spring Boot項(xiàng)目,并在項(xiàng)目中引入以下依賴:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>
此時(shí)如果不需要web或其他相關(guān)處理,只引入該依賴即可。如果使用pool的話, 就需要在pom中加入以下依賴:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> </dependency>
配置文件
在application.properties中添加如下配置:
# 基于內(nèi)存的ActiveMQ spring.activemq.in-memory=true # 不使用連接池,如果使用連接池還需在pom中添加activemq-pool的依賴 spring.activemq.pool.enabled=false # 獨(dú)立安裝的ActiveMQ #spring.activemq.broker-url=tcp://127.0.0.1:61616 #spring.activemq.user=admin #spring.activemq.password=admin
上述配置中有兩套配置,Spring Boot支持基于內(nèi)存ActiveMQ和基于獨(dú)立安裝的ActiveMQ。正常請(qǐng)求基于內(nèi)存的形式是為了方便測(cè)試而使用,基于獨(dú)立安裝的形式才是真正用于生產(chǎn)環(huán)境。此處為了講解功能,方便測(cè)試,采用基于內(nèi)存的形式。
隊(duì)列模式實(shí)例
首先,我們來實(shí)現(xiàn)基于隊(duì)列(Queue)形式的實(shí)現(xiàn)。這里需要用到兩個(gè)類ActiveMQQueue和JmsMessagingTemplate。前者是由ActiveMQ對(duì)javax.jms.Queue的接口實(shí)現(xiàn)。后者為Spring提供發(fā)送消息的工具類,結(jié)合Queue對(duì)消息進(jìn)行發(fā)送。
JmsMessagingTemplate默認(rèn)已經(jīng)被實(shí)例化,直接拿來使用即可。而ActiveMQQueue則需要我們進(jìn)行實(shí)例化,并傳入消息隊(duì)列的名稱。
@Configuration public class MyMqConfig { @Bean public Queue queue() { return new ActiveMQQueue("sms.queue"); } }
Spring Boot中很常規(guī)的實(shí)例化操作,不再贅述。當(dāng)實(shí)例化完ActiveMQQueue之后,我們的隊(duì)列便創(chuàng)建完成,下面創(chuàng)建對(duì)應(yīng)的生產(chǎn)者和消費(fèi)者。
生產(chǎn)者對(duì)應(yīng)代碼如下:
@Component public class Producer { @Resource private JmsMessagingTemplate jmsMessagingTemplate; @Resource private Queue queue; public void sendMsg(String msg) { System.out.println("發(fā)送消息內(nèi)容 :" + msg); this.jmsMessagingTemplate.convertAndSend(this.queue, msg); } }
此處用到JmsMessagingTemplate和Queue,上面已經(jīng)提到,這兩個(gè)類都已經(jīng)完成了初始化。消費(fèi)者對(duì)應(yīng)的配置如下:
@Component public class Consumer { @JmsListener(destination = "sms.queue") public void receiveMsg(String text) { System.out.println("接收到消息 : "+text); } }
Spring提供了注解式監(jiān)聽器端點(diǎn):使用@JmsListener。使用@JmsListener托管bean的帶注釋方法對(duì)其進(jìn)行訂閱。在Java8中,@JmsListener是一個(gè)可重復(fù)的注解,可以關(guān)聯(lián)多個(gè)JMS destinations到同一個(gè)方法中。而在Java 6和7中,可以使用@JmsListeners注解。
其中destination指定監(jiān)控的消息隊(duì)列名稱為“sms.queue”。當(dāng)隊(duì)列sms.queue中有消息發(fā)送時(shí)會(huì)觸發(fā)此方法的執(zhí)行,text為消息內(nèi)容。
上面完成了隊(duì)列初始化、生產(chǎn)者和消費(fèi)者代碼的編寫,下面通過單元測(cè)試來驗(yàn)證是否能夠正確發(fā)送和處理消息。
@RunWith(SpringRunner.class) @SpringBootTest public class ActiveMqTests { @Autowired private Producer producer; @Test public void sendSimpleQueueMessage() { this.producer.sendMsg("提現(xiàn)200.00元"); } }
執(zhí)行單元測(cè)試,會(huì)發(fā)現(xiàn)在日志中打印如下信息:
發(fā)送消息內(nèi)容 :提現(xiàn)200.00元
接收到消息 : 提現(xiàn)200.00元
說明消息可以正常發(fā)送和接收。如果是基于內(nèi)存模式,在執(zhí)行單元測(cè)試時(shí)會(huì)打印出“javax.jms.JMSException: peer (vm://localhost#1) stopped.”異常日志,這是Info級(jí)別的錯(cuò)誤,是ActiveMQ的一個(gè)bug。
訂閱模式實(shí)例
廣播發(fā)送的消息,可以被多個(gè)消費(fèi)者接收。這里我們就在原有的基礎(chǔ)上進(jìn)行廣播消息的添加。
首先,Spring Boot集成ActiveMQ時(shí)默認(rèn)只支持隊(duì)列或者廣播之一,通過配置項(xiàng)spring.jms.pub-sub-domain來指定,true 為廣播模式,false為隊(duì)列模式,默認(rèn)情況下支持隊(duì)列模式。
此時(shí)要使用廣播模式,則需在配置文件中添加如下配置:
spring.jms.pub-sub-domain=true
需要注意的是,此時(shí)隊(duì)列模式不可正常工作。
然后在MyMqConfig中添加:
@Bean public Topic topic() { return new ActiveMQTopic("sms.topic"); }
這里創(chuàng)建了ActiveMQTopic,并將topic的名稱指定為sms.topic。
Producer中新增如下代碼:
@Resource private Topic topic; public void sendTopic(String msg) { System.out.println("發(fā)送Topic消息內(nèi)容 :"+msg); this.jmsMessagingTemplate.convertAndSend(this.topic, msg); }
為了演示多個(gè)廣播接收者,在Comsumer中新增兩個(gè)消費(fèi)者:
@JmsListener(destination = "sms.topic") public void receiveTopic1(String text) { System.out.println("receiveTopic1接收到Topic消息 : " + text); } @JmsListener(destination = "sms.topic") public void receiveTopic2(String text) { System.out.println("receiveTopic2接收到Topic消息 : " + text); }
單元測(cè)試類中新增如下測(cè)試:
@Test public void sendSimpleTopicMessage() { this.producer.sendTopic("提現(xiàn)200.00元"); }
此時(shí),執(zhí)行單元測(cè)試,便可看到如下日志信息:
發(fā)送Topic消息內(nèi)容 :提現(xiàn)200.00元
receiveTopic2接收到Topic消息 : 提現(xiàn)200.00元
receiveTopic1接收到Topic消息 : 提現(xiàn)200.00元
說明消息發(fā)送成功。
同時(shí)支持兩種形式
在上面的實(shí)例中,要么支持隊(duì)列模式要么支持廣播模式,如果在生產(chǎn)環(huán)境中兩者都需要支持,那么就需要自定義JmsListenerContainerFactory實(shí)例。當(dāng)然,如果Spring Boot默認(rèn)的配置無法滿足需求,也可以自定義該類,這里只是其中場(chǎng)景之一。
基本配置和使用步驟:通過DefaultJmsListenerContainerFactory創(chuàng)建自定義的JmsListenerContainerFactory實(shí)例,在@JmsListener注解中通過containerFactory屬性進(jìn)行引用。
在MyMqConfig配置類中新增如下配置:
@Bean("queueListenerFactory") public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPubSubDomain(false); return factory; } @Bean("topicListenerFactory") public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); //設(shè)置為發(fā)布訂閱方式, 默認(rèn)情況下使用的生產(chǎn)消費(fèi)者方式 factory.setPubSubDomain(true); return factory; }
這里分別實(shí)例化了基于隊(duì)列和訂閱的工廠類。然后分別在對(duì)應(yīng)的消費(fèi)者方法上添加containerFactory屬性。示例代碼如下:
@JmsListener(destination = "sms.queue", containerFactory = "queueListenerFactory") public void receiveMsg(String text) { System.out.println("接收到消息 : " + text); } @JmsListener(destination = "sms.topic", containerFactory = "topicListenerFactory") public void receiveTopic1(String text) { System.out.println("receiveTopic1接收到Topic消息 : " + text); }
分別執(zhí)行兩種形式的消息,發(fā)現(xiàn)都正?;ダ?。同時(shí),此時(shí)配置文件中的項(xiàng)spring.jms.pub-sub-domain也無效了。
其他事項(xiàng)
1、activeMq的端口號(hào)是61616;
2、使用topic,需要配置spring.jms.pub-sub-domain=true;
3、queue如果沒有消費(fèi)者,會(huì)將信息存儲(chǔ)到queue中;
4、發(fā)送的消息為對(duì)象的時(shí)候,需要將對(duì)象序列化;消費(fèi)者接收對(duì)象信息時(shí)需要使用ObjectMessage進(jìn)行轉(zhuǎn)化;
5、使用JmsListener注解中的containerFactory屬性,可以配置spring.jms.pub-sub屬性,實(shí)現(xiàn)同時(shí)接收queque和topic;
6、queue為點(diǎn)對(duì)點(diǎn)模式;tipic為發(fā)布訂閱模式;
7、示例中的消息隊(duì)列名稱(sms.queue和sms.topic)可根據(jù)需要設(shè)置成配置屬性;
源碼地址:github.com/secbr/sprin…
參考文章:?
http://www.dbjr.com.cn/article/230259.htm
http://www.dbjr.com.cn/article/230269.htm
總結(jié)
到此這篇關(guān)于SpringBoot集成ActiveMQ的文章就介紹到這了,更多相關(guān)SpringBoot集成ActiveMQ內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
通過Mybatis實(shí)現(xiàn)單表內(nèi)一對(duì)多的數(shù)據(jù)展示示例代碼
最近做項(xiàng)目遇到這樣的需求要求將表中的數(shù)據(jù),按照一級(jí)二級(jí)分類返回給前端json數(shù)據(jù),下面通過本文給大家分享通過Mybatis實(shí)現(xiàn)單表內(nèi)一對(duì)多的數(shù)據(jù)展示示例代碼,感興趣的朋友參考下吧2017-08-08mybatis 解決從列名到屬性名的自動(dòng)映射失敗問題
這篇文章主要介紹了mybatis 解決從列名到屬性名的自動(dòng)映射失敗問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06Elasticsearch?計(jì)數(shù)分詞中的token使用實(shí)例
這篇文章主要為大家介紹了Elasticsearch?計(jì)數(shù)分詞中的token使用示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-01-01Android應(yīng)用開發(fā)的一般文件組織結(jié)構(gòu)講解
這篇文章主要介紹了Android應(yīng)用開發(fā)的一般文件組織結(jié)構(gòu)講解,同時(shí)附帶介紹了一個(gè)獲取Android的文件列表的方法,需要的朋友可以參考下2015-12-12