SpringBoot整合ActiveMQ的詳細(xì)步驟
1. 引入依賴
pom文件引入activemq依賴
<!--activeMq配置--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.15.3</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>2.0.7</version> </dependency>
2. 配置文件
spring: activemq: user: admin password: admin broker-url: failover:(tcp://192.168.43.666:61616) #是否信任所有包(如果傳遞的是對(duì)象則需要設(shè)置為true,默認(rèn)是傳字符串) packages: trust-all: true #連接池 pool: enabled: true max-connections: 5 idle-timeout: 30000 # expiry-timeout: 0 jms: #默認(rèn)使用queue模式,使用topic則需要設(shè)置為true pub-sub-domain: true # 是否信任所有包 #spring.activemq.packages.trust-all= # 要信任的特定包的逗號(hào)分隔列表(當(dāng)不信任所有包時(shí)) #spring.activemq.packages.trusted= # 當(dāng)連接請(qǐng)求和池滿時(shí)是否阻塞。設(shè)置false會(huì)拋“JMSException異?!?。 #spring.activemq.pool.block-if-full=true # 如果池仍然滿,則在拋出異常前阻塞時(shí)間。 #spring.activemq.pool.block-if-full-timeout=-1ms # 是否在啟動(dòng)時(shí)創(chuàng)建連接。可以在啟動(dòng)時(shí)用于加熱池。 #spring.activemq.pool.create-connection-on-startup=true # 是否用Pooledconnectionfactory代替普通的ConnectionFactory。 #spring.activemq.pool.enabled=false # 連接過期超時(shí)。 #spring.activemq.pool.expiry-timeout=0ms # 連接空閑超時(shí) #spring.activemq.pool.idle-timeout=30s # 連接池最大連接數(shù) #spring.activemq.pool.max-connections=1 # 每個(gè)連接的有效會(huì)話的最大數(shù)目。 #spring.activemq.pool.maximum-active-session-per-connection=500 # 當(dāng)有"JMSException"時(shí)嘗試重新連接 #spring.activemq.pool.reconnect-on-exception=true # 在空閑連接清除線程之間運(yùn)行的時(shí)間。當(dāng)為負(fù)數(shù)時(shí),沒有空閑連接驅(qū)逐線程運(yùn)行。 #spring.activemq.pool.time-between-expiration-check=-1ms # 是否只使用一個(gè)MessageProducer #spring.activemq.pool.use-anonymous-producers=true
3. 生產(chǎn)者
package com.gblfy.producer; import org.apache.activemq.ScheduledMessage; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.jms.JmsProperties; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.jms.*; import java.io.Serializable; /** * 發(fā)送消息 * * @author gblfy * @date 2022-11-02 */ @RestController @RequestMapping(value = "/active") public class SendController { //也可以注入JmsTemplate,JmsMessagingTemplate對(duì)JmsTemplate進(jìn)行了封裝 @Autowired private JmsMessagingTemplate jmsMessagingTemplate; /** * 發(fā)送消息接口 * 發(fā)送queue消息 :http://127.0.0.1:8080/active/send?msg=ceshi1234 * 發(fā)送topic 消息: http://127.0.0.1:8080/active/topic/send?msg=ceshi1234 * 發(fā)送queue消息(延遲time毫秒) :http://127.0.0.1:8080/active/send?msg=ceshi1234&time=5000 * * @param msg 消息 * @param type url中參數(shù),非必須 * @param time * @return */ @RequestMapping({"/send", "/{type}/send"}) public String send(@PathVariable(value = "type", required = false) String type, String msg, Long time) { Destination destination = null; if (type == null) { type = ""; } switch (type) { case "topic": //發(fā)送廣播消息 destination = new ActiveMQTopic("active.topic"); break; default: //發(fā)送 隊(duì)列消息 destination = new ActiveMQQueue("active.queue"); break; } // System.out.println("開始請(qǐng)求發(fā)送:"+DateUtil.getStringDate(new Date(),"yyyy-MM-dd HH:mm:ss")); if (time != null && time > 0) { //延遲隊(duì)列,延遲time毫秒 //延遲隊(duì)列需要在 <broker>標(biāo)簽上增加屬性 schedulerSupport="true" delaySend(destination, msg, time); } else { jmsMessagingTemplate.convertAndSend(destination, msg);//無(wú)序 //jmsMessagingTemplate.convertSendAndReceive();//有序 } return "activemq消息發(fā)送成功 隊(duì)列消息:" + msg; } /** * 延時(shí)發(fā)送 * 說明:延遲隊(duì)列需要在 <broker>標(biāo)簽上增加屬性 schedulerSupport="true" * * @param destination 發(fā)送的隊(duì)列 * @param data 發(fā)送的消息 * @param time 延遲時(shí)間 /毫秒 */ public <T extends Serializable> void delaySend(Destination destination, T data, Long time) { Connection connection = null; Session session = null; MessageProducer producer = null; // 獲取連接工廠 ConnectionFactory connectionFactory = jmsMessagingTemplate.getConnectionFactory(); try { // 獲取連接 connection = connectionFactory.createConnection(); connection.start(); // 獲取session,true開啟事務(wù),false關(guān)閉事務(wù) session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 創(chuàng)建一個(gè)消息隊(duì)列 producer = session.createProducer(destination); producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue()); ObjectMessage message = session.createObjectMessage(data); //設(shè)置延遲時(shí)間 message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); // 發(fā)送消息 producer.send(message); session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (producer != null) { producer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (Exception e) { e.printStackTrace(); } } } }
4. 配置config
package com.gblfy.config; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.RedeliveryPolicy; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; import javax.jms.Queue; import javax.jms.Topic; /** * 描述: * activemq 有兩種模式 queue 和 topic * queue 模式是單對(duì)單,有多個(gè)消費(fèi)者的情況下則是使用輪詢監(jiān)聽 * topic 模式/廣播模式/發(fā)布訂閱模式 是一對(duì)多,發(fā)送消息所有的消費(fèi)者都能夠監(jiān)聽到 * * @author gblfy * @date 2022-11-02 */ @EnableJms @Configuration public class ActiveMQConfig { //隊(duì)列名 private static final String queueName = "active.queue"; //主題名 private static final String topicName = "active.topic"; @Value("${spring.activemq.user:}") private String username; @Value("${spring.activemq.password:}") private String password; @Value("${spring.activemq.broker-url:}") private String brokerUrl; @Bean public Queue acQueue() { return new ActiveMQQueue(queueName); } @Bean public Topic acTopic() { return new ActiveMQTopic(topicName); } @Bean public ActiveMQConnectionFactory connectionFactory() { return new ActiveMQConnectionFactory(username, password, brokerUrl); } @Bean public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); // 關(guān)閉Session事務(wù),手動(dòng)確認(rèn)與事務(wù)沖突 bean.setSessionTransacted(false); // 設(shè)置消息的簽收模式(自己簽收) /** * AUTO_ACKNOWLEDGE = 1 :自動(dòng)確認(rèn) * CLIENT_ACKNOWLEDGE = 2:客戶端手動(dòng)確認(rèn) * DUPS_OK_ACKNOWLEDGE = 3: 自動(dòng)批量確認(rèn) * SESSION_TRANSACTED = 0:事務(wù)提交并確認(rèn) * 但是在activemq補(bǔ)充了一個(gè)自定義的ACK模式: * INDIVIDUAL_ACKNOWLEDGE = 4:?jiǎn)螚l消息確認(rèn) **/ bean.setSessionAcknowledgeMode(4); //此處設(shè)置消息重發(fā)規(guī)則,redeliveryPolicy() 中定義 connectionFactory.setRedeliveryPolicy(redeliveryPolicy()); bean.setConnectionFactory(connectionFactory); return bean; } @Bean public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); // 關(guān)閉Session事務(wù),手動(dòng)確認(rèn)與事務(wù)沖突 bean.setSessionTransacted(false); bean.setSessionAcknowledgeMode(4); //設(shè)置為發(fā)布訂閱方式, 默認(rèn)情況下使用的生產(chǎn)消費(fèi)者方式 bean.setPubSubDomain(true); bean.setConnectionFactory(connectionFactory); return bean; } /** * 消息的重發(fā)規(guī)則配置 */ @Bean public RedeliveryPolicy redeliveryPolicy() { RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); // 是否在每次嘗試重新發(fā)送失敗后,增長(zhǎng)這個(gè)等待時(shí)間 redeliveryPolicy.setUseExponentialBackOff(true); // 重發(fā)次數(shù)五次, 總共六次 redeliveryPolicy.setMaximumRedeliveries(5); // 重發(fā)時(shí)間間隔,默認(rèn)為1000ms(1秒) redeliveryPolicy.setInitialRedeliveryDelay(1000); // 重發(fā)時(shí)長(zhǎng)遞增的時(shí)間倍數(shù)2 redeliveryPolicy.setBackOffMultiplier(2); // 是否避免消息碰撞 redeliveryPolicy.setUseCollisionAvoidance(false); // 設(shè)置重發(fā)最大拖延時(shí)間-1表示無(wú)延遲限制 redeliveryPolicy.setMaximumRedeliveryDelay(-1); return redeliveryPolicy; } }
5. queue消費(fèi)者
package com.gblfy.listener; import org.apache.activemq.command.ActiveMQMessage; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Session; /** * TODO * * @author gblfy * @Date 2022-11-02 **/ @Component public class QueueListener { /** * queue 模式 單對(duì)單,兩個(gè)消費(fèi)者監(jiān)聽同一個(gè)隊(duì)列則通過輪詢接收消息 * containerFactory屬性的值關(guān)聯(lián)config類中的聲明 * * @param msg */ @JmsListener(destination = "active.queue", containerFactory = "jmsListenerContainerQueue") public void queueListener(ActiveMQMessage message, Session session, String msg) throws JMSException { try { System.out.println("active queue 接收到消息 " + msg); //手動(dòng)簽收 message.acknowledge(); } catch (Exception e) { //重新發(fā)送 session.recover(); } } }
6. topic消費(fèi)者
package com.gblfy.listener; import org.apache.activemq.command.ActiveMQMessage; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Session; /** * TODO * * @author gblfy * @Date 2022-11-02 **/ @Component public class TopicListener { /** * topic 模式/廣播模式/發(fā)布訂閱模式 一對(duì)多,多個(gè)消費(fèi)者可同時(shí)接收到消息 * topic 模式無(wú)死信隊(duì)列,死信隊(duì)列是queue模式 * containerFactory屬性的值關(guān)聯(lián)config類中的聲明 * * @param msg */ @JmsListener(destination = "active.topic", containerFactory = "jmsListenerContainerTopic") public void topicListener(ActiveMQMessage message, Session session, String msg) throws JMSException { try { // System.out.println("接收到消息:" + DateUtil.getStringDate(new Date(), "yyyy-MM-dd HH:mm:ss")); System.out.println("active topic 接收到消息 " + msg); System.out.println(""); //手動(dòng)簽收 message.acknowledge(); } catch (Exception e) { //重新發(fā)送 session.recover(); } } @JmsListener(destination = "active.topic", containerFactory = "jmsListenerContainerTopic") public void topicListener2(ActiveMQMessage message, Session session, String msg) throws JMSException { try { // System.out.println("接收到消息:" + DateUtil.getStringDate(new Date(), "yyyy-MM-dd HH:mm:ss")); System.out.println("active topic2 接收到消息 " + msg); System.out.println(""); //手動(dòng)簽收 message.acknowledge(); } catch (Exception e) { //重新發(fā)送 session.recover(); } } }
6. ActiveMQ 消息存儲(chǔ)規(guī)則
QUEUE 點(diǎn)對(duì)點(diǎn):
特點(diǎn):消息遵循先到先得,消息只能被一個(gè)消費(fèi)者消費(fèi)。
消息存儲(chǔ)規(guī)則:消費(fèi)者消費(fèi)消息成功,MQ服務(wù)端消息刪除
TOPIC訂閱模式: 消息屬于廣播(訂閱)模式,消息會(huì)被所有的topic消費(fèi)者消費(fèi)消息。
消息存儲(chǔ)規(guī)則:所有消費(fèi)者消費(fèi)成功,MQ服務(wù)端消息刪除,有一個(gè)消息沒有沒有消費(fèi)完成,消息也會(huì)存儲(chǔ)在MQ服務(wù)端。
舉例:
已經(jīng)處于運(yùn)行topic消費(fèi)者5個(gè),5個(gè)消費(fèi)者消費(fèi)完成后,MQ服務(wù)端消息刪除。
擴(kuò)展點(diǎn)補(bǔ)充:如果想額外添加topic消費(fèi)者,如果MQ服務(wù)端消息沒有被消費(fèi)完畢,新增topic消費(fèi)者可以消費(fèi)以前未被消費(fèi)的消息,
正常新增的只會(huì)消費(fèi)新的topic消息。
總結(jié)
到此這篇關(guān)于SpringBoot整合ActiveMQ的文章就介紹到這了,更多相關(guān)SpringBoot整合ActiveMQ內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot簡(jiǎn)單實(shí)現(xiàn)文件上傳
這篇文章主要介紹了SpringBoot簡(jiǎn)單實(shí)現(xiàn)文件上傳,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,感興趣的小伙伴可以參考一下2022-09-09Java日期接收?qǐng)?bào)錯(cuò):could?not?be?parsed,?unparsed?text?found?a
在做Java開發(fā)時(shí)肯定會(huì)碰到傳遞時(shí)間參數(shù)的情況,這篇文章主要給大家介紹了關(guān)于Java日期接收?qǐng)?bào)錯(cuò):could?not?be?parsed,?unparsed?text?found?at?index?10的解決辦法,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下2024-01-01Java并發(fā)之異步的八種實(shí)現(xiàn)方式
本文主要介紹了Java并發(fā)之異步的八種實(shí)現(xiàn)方式,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-06-06Java+ElasticSearch+Pytorch實(shí)現(xiàn)以圖搜圖功能
這篇文章主要為大家詳細(xì)介紹了Java如何利用ElasticSearch和Pytorch實(shí)現(xiàn)以圖搜圖功能,文中的示例代碼講解詳細(xì),具有一定的學(xué)習(xí)價(jià)值,感興趣的小伙伴可以了解一下2023-06-06