spring?boot學習筆記之操作ActiveMQ指南
前言
消息隊列中間件是分布式系統(tǒng)中重要的組件,主要解決應(yīng)用耦合、異步消息、流量削鋒等問題,實現(xiàn)高性能、高可用、可伸縮和最終一致性架構(gòu),是大型分布式系統(tǒng)不可缺少的中間件。
目前在生產(chǎn)環(huán)境中使用較多的消息隊列有 ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ 等。
特性
- 異步性:將耗時的同步操作通過以發(fā)送消息的方式進行了異步化處理,減少了同步等待的時間。
- 松耦合:消息隊列減少了服務(wù)之間的耦合性,不同的服務(wù)可以通過消息隊列進行通信,而不用關(guān)心彼此的實現(xiàn)細節(jié),只要定義好消息的格式就行。
- 分布式:通過對消費者的橫向擴展,降低了消息隊列阻塞的風險,以及單個消費者產(chǎn)生單點故障的可能性(當然消息隊列本身也可以做成分布式集群)。
- 可靠性:消息隊列一般會把接收到的消息存儲到本地硬盤上(當消息被處理完之后,存儲信息根據(jù)不同的消息隊列實現(xiàn),有可能將其刪除),這樣即使應(yīng)用掛掉或者消息隊列本身掛掉,消息也能夠重新加載。
JMS 規(guī)范
JMS 即 Java 消息服務(wù)(Java Message Service)應(yīng)用程序接口,是一個 Java 平臺中關(guān)于面向消息中間件(MOM)的 API,用于在兩個應(yīng)用程序之間,或分布式系統(tǒng)中發(fā)送消息,進行異步通信。Java 消息服務(wù)是一個與具體平臺無關(guān)的 API,絕大多數(shù) MOM 提供商都對 JMS 提供支持。
JMS 的消息機制有 2 種模型,一種是 Point to Point,表現(xiàn)為隊列的形式,發(fā)送的消息,只能被一個接收者取走;另一種是 Topic,可以被多個訂閱者訂閱,類似于群發(fā)。
ActiveMQ 是 JMS 的一個實現(xiàn)。
ActiveMQ 介紹
ActiveMQ 是 Apache 軟件基金下的一個開源軟件,它遵循 JMS1.1 規(guī)范(Java Message Service),是消息驅(qū)動中間件軟件(MOM)。它為企業(yè)消息傳遞提供高可用、出色性能、可擴展、穩(wěn)定和安全保障。ActiveMQ 使用 Apache 許可協(xié)議,因此,任何人都可以使用和修改它而不必反饋任何改變。
ActiveMQ 的目標是在盡可能多的平臺和語言上提供一個標準的,消息驅(qū)動的應(yīng)用集成。ActiveMQ 實現(xiàn) JMS 規(guī)范并在此之上提供大量額外的特性。ActiveMQ 支持隊列和訂閱兩種模式的消息發(fā)送。
Spring Boot 提供了 ActiveMQ 組件 spring-boot-starter-activemq,用來支持 ActiveMQ 在 Spring Boot 體系內(nèi)使用,下面我們來詳細了解如何使用。
添加依賴
主要添加組件:spring-boot-starter-activemq。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>
配置文件
在 application.properties 中添加配置。
# 基于內(nèi)存的 ActiveMQ spring.activemq.in-memory=true # 不適應(yīng)連接池 spring.activemq.pool.enabled=false # 獨立安裝的 ActiveMQ #spring.activemq.broker-url=tcp://192.168.0.1:61616 #spring.activemq.user=admin #spring.activemq.password=admin
在使用 ActiveMQ 時有兩種使用方式,一種是使用獨立安裝的 ActiveMQ,在生產(chǎn)環(huán)境推薦使用這種;另一種是使用基于內(nèi)存 ActiveMQ ,在調(diào)試階段建議使用這種方式。
隊列(Queue)
隊列發(fā)送的消息,只能被一個消費者接收。
創(chuàng)建隊列
@Configuration public class MqConfig { @Bean public Queue queue() { return new ActiveMQQueue("neo.queue"); } }
使用 @Configuration 注解在項目啟動時,定義了一個隊列 queue 命名為:neo.queue。
消息生產(chǎn)者
創(chuàng)建一個消息的生產(chǎn)者:
@Component public class Producer{ @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired private Queue queue; public void sendQueue(String msg) { System.out.println("send queue msg :"+msg); this.jmsMessagingTemplate.convertAndSend(this.queue, msg); } }
JmsMessagingTemplate 是 Spring 提供發(fā)送消息的工具類,使用 JmsMessagingTemplate 和創(chuàng)建好的 queue 對消息進行發(fā)送。
消息消費者
@Component public class Consumer { @JmsListener(destination = "neo.queue") public void receiveQueue(String text) { System.out.println("Consumer queue msg : "+text); } }
使用注解 @JmsListener(destination = "neo.queue"),表示此方法監(jiān)控了名為 neo.queue 的隊列。當隊列 neo.queue 中有消息發(fā)送時會觸發(fā)此方法的執(zhí)行,text 為消息內(nèi)容。
測試
創(chuàng)建 SampleActiveMqTests 測試類,注入創(chuàng)建好的消息生產(chǎn)者。
@RunWith(SpringRunner.class) @SpringBootTest public class SampleActiveMqTests { @Autowired private Producer producer; @Rule public OutputCapture outputCapture = new OutputCapture(); }
OutputCapture 是 Spring Boot 提供的一個測試類,它能捕獲 System.out 和 System.err 的輸出,我們可以利用這個特性來判斷程序中的輸出是否執(zhí)行。
@Test public void sendSimpleQueueMessage() throws InterruptedException { this.producer.sendQueue("Test queue message"); Thread.sleep(1000L); assertThat(this.outputCapture.toString().contains("Test queue")).isTrue(); }
創(chuàng)建測試方式,使用 producer 發(fā)送消息,為了保證容器可以接收到消息,讓測試方法等待 1 秒,最后使用 outputCapture 判斷是否執(zhí)行成功。
測試多消費者
上面的案例只是一個生產(chǎn)者一個消費者,我們在模擬一個生產(chǎn)者和多個消費者隊列的執(zhí)行情況。我們復制上面的消費者 Consumer 重新命名為 Consumer2,并且將輸出內(nèi)容加上 2 的關(guān)鍵字,如下:
@Component public class Consumer2 { @JmsListener(destination = "neo.queue") public void receiveQueue(String text) { System.out.println("Consumer2 queue msg : "+text); } }
在剛才的測試類中添加一個 send100QueueMessage() 方法,模式發(fā)送 100 條消息時,兩個消費者是如何消費消息的。
@Test public void send100QueueMessage() throws InterruptedException { for (int i=0;i<100;i++){ this.producer.sendQueue("Test queue message"+i); } Thread.sleep(1000L); }
控制臺輸出結(jié)果:
Consumer queue msg : Test queue message0
Consumer2 queue msg : Test queue message1
Consumer queue msg : Test queue message2
Consumer2 queue msg : Test queue message3
...
根據(jù)控制臺輸出的消息可以看出,當有多個消費者監(jiān)聽一個隊列時,消費者會自動均衡負載的接收消息,并且每個消息只能有一個消費者所接收。
注意:控制臺輸出 javax.jms.JMSException: peer (vm://localhost#1) stopped. 報錯信息可以忽略,這是 Info 級別的錯誤,是 ActiveMQ 的一個 bug。
廣播(Topic)
廣播發(fā)送的消息,可以被多個消費者接收。
創(chuàng)建 Topic
@Configuration public class MqConfig { @Bean public Topic topic() { return new ActiveMQTopic("neo.topic"); } }
使用 @Configuration 注解在項目啟動時,定義了一個廣播 Topic 命名為:neo.topic。
消息生產(chǎn)者
創(chuàng)建一個消息的生產(chǎn)者:
@Component public class Producer{ @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired private Topic topic; public void sendTopic(String msg) { System.out.println("send topic msg :"+msg); this.jmsMessagingTemplate.convertAndSend(this.topic, msg); } }
和上面的生產(chǎn)者對比只是 convertAndSend() 方法傳入的第一個參數(shù)變成了 Topic。
消息消費者
@Component public class Consumer { @JmsListener(destination = "neo.topic") public void receiveTopic(String text) { System.out.println("Consumer topic msg : "+text); } }
消費者也沒有變化,只是監(jiān)聽的名改為上面的 neo.topic,因為模擬多個消費者,復制一份 Consumer 命名為 Consumer2,代碼相同在輸出中標明來自 Consumer2。
測試
創(chuàng)建 SampleActiveMqTests 測試類,注入創(chuàng)建好的消息生產(chǎn)者。
@Test public void sendSimpleTopicMessage() throws InterruptedException { this.producer.sendTopic("Test Topic message"); Thread.sleep(1000L); }
測試方法執(zhí)行成功后,會看到控制臺輸出信息,如下:
send topic msg :Test Topic message
Consumer topic msg : Test Topic message
Consumer2 topic msg : Test Topic message
可以看出兩個消費者都收到了發(fā)送的消息,從而驗證廣播(Topic)是一個發(fā)送者多個消費者的模式。
同時支持隊列(Queue)和廣播(Topic)
Spring Boot 集成 ActiveMQ 的項目默認只支持隊列或者廣播中的一種,通過配置項 spring.jms.pub-sub-domain 的值來控制,true 為廣播模式,false 為隊列模式,默認情況下支持隊列模式。
如果需要在同一項目中既支持隊列模式也支持廣播模式,可以通過 DefaultJmsListenerContainerFactory 創(chuàng)建自定義的 JmsListenerContainerFactory 實例,之后在 @JmsListener 注解中通過 containerFactory 屬性引用它。
分別創(chuàng)建兩個自定義的 JmsListenerContainerFactory 實例,通過 pubSubDomain 來控制是支持隊列模式還是廣播模式。
@Configuration @EnableJms public class ActiveMQConfig { @Bean("queueListenerFactory") public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPubSubDomain(false); return factory; } @Bean("topicListenerFactory") public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPubSubDomain(true); return factory; } }
然后在消費者接收的方法中,指明使用 containerFactory 接收消息。
@Component public class Consumer { @JmsListener(destination = "neo.queue", containerFactory = "queueListenerFactory") public void receiveQueue(String text) { System.out.println("Consumer queue msg : "+text); } @JmsListener(destination = "neo.topic", containerFactory = "topicListenerFactory") public void receiveTopic(String text) { System.out.println("Consumer topic msg : "+text); } }
改造完成之后,再次執(zhí)行隊列和廣播的測試方法,就會發(fā)現(xiàn)項目同時支持了兩種類型的消息收發(fā)。
總結(jié)
消息中間件廣泛應(yīng)用在大型互聯(lián)網(wǎng)架構(gòu)中,利用消息中間件隊列和廣播各自的特性可以支持很多業(yè)務(wù),比如群發(fā)發(fā)送短信、給單個用戶發(fā)送郵件等。ActiveMQ 是一款非常流行的消息中間件,它的特點是部署簡單、使用方便,比較適合中小型團隊。Spring Boot 提供了集成 ActiveMQ 對應(yīng)的組件,在 Spring Boot 中使用 ActiveMQ 只需要添加相關(guān)注解即可。
到此這篇關(guān)于spring?boot學習筆記之操作ActiveMQ指南的文章就介紹到這了,更多相關(guān)spring?boot操作ActiveMQ指南內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java17中record替代Lombok部分功能使用場景探究
這篇文章主要介紹了使用Java17中的record替代Lombok的部分功能,本文來為大家小小的總結(jié)下,我們可以在哪些地方,利用record來替換Lombok2024-01-01解決idea默認帶的equals和hashcode引起的bug
這篇文章主要介紹了解決idea默認帶的equals和hashcode引起的bug,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-07-07java虛擬機參數(shù)-D、-X和-XX的區(qū)別小結(jié)
本文主要介紹了java虛擬機參數(shù)-D、-X和-XX的區(qū)別小結(jié),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2023-06-06