SpringBoot整合ActiveMQ的詳細步驟
1. 引入依賴
pom文件引入activemq依賴
<!--activeMq配置--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.15.3</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>2.0.7</version> </dependency>
2. 配置文件
spring: activemq: user: admin password: admin broker-url: failover:(tcp://192.168.43.666:61616) #是否信任所有包(如果傳遞的是對象則需要設置為true,默認是傳字符串) packages: trust-all: true #連接池 pool: enabled: true max-connections: 5 idle-timeout: 30000 # expiry-timeout: 0 jms: #默認使用queue模式,使用topic則需要設置為true pub-sub-domain: true # 是否信任所有包 #spring.activemq.packages.trust-all= # 要信任的特定包的逗號分隔列表(當不信任所有包時) #spring.activemq.packages.trusted= # 當連接請求和池滿時是否阻塞。設置false會拋“JMSException異?!薄? #spring.activemq.pool.block-if-full=true # 如果池仍然滿,則在拋出異常前阻塞時間。 #spring.activemq.pool.block-if-full-timeout=-1ms # 是否在啟動時創(chuàng)建連接??梢栽趩訒r用于加熱池。 #spring.activemq.pool.create-connection-on-startup=true # 是否用Pooledconnectionfactory代替普通的ConnectionFactory。 #spring.activemq.pool.enabled=false # 連接過期超時。 #spring.activemq.pool.expiry-timeout=0ms # 連接空閑超時 #spring.activemq.pool.idle-timeout=30s # 連接池最大連接數 #spring.activemq.pool.max-connections=1 # 每個連接的有效會話的最大數目。 #spring.activemq.pool.maximum-active-session-per-connection=500 # 當有"JMSException"時嘗試重新連接 #spring.activemq.pool.reconnect-on-exception=true # 在空閑連接清除線程之間運行的時間。當為負數時,沒有空閑連接驅逐線程運行。 #spring.activemq.pool.time-between-expiration-check=-1ms # 是否只使用一個MessageProducer #spring.activemq.pool.use-anonymous-producers=true
3. 生產者
package com.gblfy.producer; import org.apache.activemq.ScheduledMessage; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.jms.JmsProperties; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.jms.*; import java.io.Serializable; /** * 發(fā)送消息 * * @author gblfy * @date 2022-11-02 */ @RestController @RequestMapping(value = "/active") public class SendController { //也可以注入JmsTemplate,JmsMessagingTemplate對JmsTemplate進行了封裝 @Autowired private JmsMessagingTemplate jmsMessagingTemplate; /** * 發(fā)送消息接口 * 發(fā)送queue消息 :http://127.0.0.1:8080/active/send?msg=ceshi1234 * 發(fā)送topic 消息: http://127.0.0.1:8080/active/topic/send?msg=ceshi1234 * 發(fā)送queue消息(延遲time毫秒) :http://127.0.0.1:8080/active/send?msg=ceshi1234&time=5000 * * @param msg 消息 * @param type url中參數,非必須 * @param time * @return */ @RequestMapping({"/send", "/{type}/send"}) public String send(@PathVariable(value = "type", required = false) String type, String msg, Long time) { Destination destination = null; if (type == null) { type = ""; } switch (type) { case "topic": //發(fā)送廣播消息 destination = new ActiveMQTopic("active.topic"); break; default: //發(fā)送 隊列消息 destination = new ActiveMQQueue("active.queue"); break; } // System.out.println("開始請求發(fā)送:"+DateUtil.getStringDate(new Date(),"yyyy-MM-dd HH:mm:ss")); if (time != null && time > 0) { //延遲隊列,延遲time毫秒 //延遲隊列需要在 <broker>標簽上增加屬性 schedulerSupport="true" delaySend(destination, msg, time); } else { jmsMessagingTemplate.convertAndSend(destination, msg);//無序 //jmsMessagingTemplate.convertSendAndReceive();//有序 } return "activemq消息發(fā)送成功 隊列消息:" + msg; } /** * 延時發(fā)送 * 說明:延遲隊列需要在 <broker>標簽上增加屬性 schedulerSupport="true" * * @param destination 發(fā)送的隊列 * @param data 發(fā)送的消息 * @param time 延遲時間 /毫秒 */ public <T extends Serializable> void delaySend(Destination destination, T data, Long time) { Connection connection = null; Session session = null; MessageProducer producer = null; // 獲取連接工廠 ConnectionFactory connectionFactory = jmsMessagingTemplate.getConnectionFactory(); try { // 獲取連接 connection = connectionFactory.createConnection(); connection.start(); // 獲取session,true開啟事務,false關閉事務 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 創(chuàng)建一個消息隊列 producer = session.createProducer(destination); producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue()); ObjectMessage message = session.createObjectMessage(data); //設置延遲時間 message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); // 發(fā)送消息 producer.send(message); session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (producer != null) { producer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (Exception e) { e.printStackTrace(); } } } }
4. 配置config
package com.gblfy.config; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.RedeliveryPolicy; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; import javax.jms.Queue; import javax.jms.Topic; /** * 描述: * activemq 有兩種模式 queue 和 topic * queue 模式是單對單,有多個消費者的情況下則是使用輪詢監(jiān)聽 * topic 模式/廣播模式/發(fā)布訂閱模式 是一對多,發(fā)送消息所有的消費者都能夠監(jiān)聽到 * * @author gblfy * @date 2022-11-02 */ @EnableJms @Configuration public class ActiveMQConfig { //隊列名 private static final String queueName = "active.queue"; //主題名 private static final String topicName = "active.topic"; @Value("${spring.activemq.user:}") private String username; @Value("${spring.activemq.password:}") private String password; @Value("${spring.activemq.broker-url:}") private String brokerUrl; @Bean public Queue acQueue() { return new ActiveMQQueue(queueName); } @Bean public Topic acTopic() { return new ActiveMQTopic(topicName); } @Bean public ActiveMQConnectionFactory connectionFactory() { return new ActiveMQConnectionFactory(username, password, brokerUrl); } @Bean public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); // 關閉Session事務,手動確認與事務沖突 bean.setSessionTransacted(false); // 設置消息的簽收模式(自己簽收) /** * AUTO_ACKNOWLEDGE = 1 :自動確認 * CLIENT_ACKNOWLEDGE = 2:客戶端手動確認 * DUPS_OK_ACKNOWLEDGE = 3: 自動批量確認 * SESSION_TRANSACTED = 0:事務提交并確認 * 但是在activemq補充了一個自定義的ACK模式: * INDIVIDUAL_ACKNOWLEDGE = 4:單條消息確認 **/ bean.setSessionAcknowledgeMode(4); //此處設置消息重發(fā)規(guī)則,redeliveryPolicy() 中定義 connectionFactory.setRedeliveryPolicy(redeliveryPolicy()); bean.setConnectionFactory(connectionFactory); return bean; } @Bean public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); // 關閉Session事務,手動確認與事務沖突 bean.setSessionTransacted(false); bean.setSessionAcknowledgeMode(4); //設置為發(fā)布訂閱方式, 默認情況下使用的生產消費者方式 bean.setPubSubDomain(true); bean.setConnectionFactory(connectionFactory); return bean; } /** * 消息的重發(fā)規(guī)則配置 */ @Bean public RedeliveryPolicy redeliveryPolicy() { RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); // 是否在每次嘗試重新發(fā)送失敗后,增長這個等待時間 redeliveryPolicy.setUseExponentialBackOff(true); // 重發(fā)次數五次, 總共六次 redeliveryPolicy.setMaximumRedeliveries(5); // 重發(fā)時間間隔,默認為1000ms(1秒) redeliveryPolicy.setInitialRedeliveryDelay(1000); // 重發(fā)時長遞增的時間倍數2 redeliveryPolicy.setBackOffMultiplier(2); // 是否避免消息碰撞 redeliveryPolicy.setUseCollisionAvoidance(false); // 設置重發(fā)最大拖延時間-1表示無延遲限制 redeliveryPolicy.setMaximumRedeliveryDelay(-1); return redeliveryPolicy; } }
5. queue消費者
package com.gblfy.listener; import org.apache.activemq.command.ActiveMQMessage; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Session; /** * TODO * * @author gblfy * @Date 2022-11-02 **/ @Component public class QueueListener { /** * queue 模式 單對單,兩個消費者監(jiān)聽同一個隊列則通過輪詢接收消息 * containerFactory屬性的值關聯(lián)config類中的聲明 * * @param msg */ @JmsListener(destination = "active.queue", containerFactory = "jmsListenerContainerQueue") public void queueListener(ActiveMQMessage message, Session session, String msg) throws JMSException { try { System.out.println("active queue 接收到消息 " + msg); //手動簽收 message.acknowledge(); } catch (Exception e) { //重新發(fā)送 session.recover(); } } }
6. topic消費者
package com.gblfy.listener; import org.apache.activemq.command.ActiveMQMessage; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Session; /** * TODO * * @author gblfy * @Date 2022-11-02 **/ @Component public class TopicListener { /** * topic 模式/廣播模式/發(fā)布訂閱模式 一對多,多個消費者可同時接收到消息 * topic 模式無死信隊列,死信隊列是queue模式 * containerFactory屬性的值關聯(lián)config類中的聲明 * * @param msg */ @JmsListener(destination = "active.topic", containerFactory = "jmsListenerContainerTopic") public void topicListener(ActiveMQMessage message, Session session, String msg) throws JMSException { try { // System.out.println("接收到消息:" + DateUtil.getStringDate(new Date(), "yyyy-MM-dd HH:mm:ss")); System.out.println("active topic 接收到消息 " + msg); System.out.println(""); //手動簽收 message.acknowledge(); } catch (Exception e) { //重新發(fā)送 session.recover(); } } @JmsListener(destination = "active.topic", containerFactory = "jmsListenerContainerTopic") public void topicListener2(ActiveMQMessage message, Session session, String msg) throws JMSException { try { // System.out.println("接收到消息:" + DateUtil.getStringDate(new Date(), "yyyy-MM-dd HH:mm:ss")); System.out.println("active topic2 接收到消息 " + msg); System.out.println(""); //手動簽收 message.acknowledge(); } catch (Exception e) { //重新發(fā)送 session.recover(); } } }
6. ActiveMQ 消息存儲規(guī)則
QUEUE 點對點:
特點:消息遵循先到先得,消息只能被一個消費者消費。
消息存儲規(guī)則:消費者消費消息成功,MQ服務端消息刪除
TOPIC訂閱模式: 消息屬于廣播(訂閱)模式,消息會被所有的topic消費者消費消息。
消息存儲規(guī)則:所有消費者消費成功,MQ服務端消息刪除,有一個消息沒有沒有消費完成,消息也會存儲在MQ服務端。
舉例:
已經處于運行topic消費者5個,5個消費者消費完成后,MQ服務端消息刪除。
擴展點補充:如果想額外添加topic消費者,如果MQ服務端消息沒有被消費完畢,新增topic消費者可以消費以前未被消費的消息,
正常新增的只會消費新的topic消息。
總結
到此這篇關于SpringBoot整合ActiveMQ的文章就介紹到這了,更多相關SpringBoot整合ActiveMQ內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Java日期接收報錯:could?not?be?parsed,?unparsed?text?found?a
在做Java開發(fā)時肯定會碰到傳遞時間參數的情況,這篇文章主要給大家介紹了關于Java日期接收報錯:could?not?be?parsed,?unparsed?text?found?at?index?10的解決辦法,文中通過代碼介紹的非常詳細,需要的朋友可以參考下2024-01-01Java+ElasticSearch+Pytorch實現(xiàn)以圖搜圖功能
這篇文章主要為大家詳細介紹了Java如何利用ElasticSearch和Pytorch實現(xiàn)以圖搜圖功能,文中的示例代碼講解詳細,具有一定的學習價值,感興趣的小伙伴可以了解一下2023-06-06