欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot整合ActiveMQ的詳細步驟

 更新時間:2022年11月03日 10:08:21   作者:gblfy  
昨天仔細研究了activeMQ消息隊列,也遇到了些坑,下面這篇文章主要給大家介紹了關于SpringBoot整合ActiveMQ的相關資料,文中通過實例代碼介紹的非常詳細,需要的朋友可以參考下

1. 引入依賴

pom文件引入activemq依賴

    <!--activeMq配置-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.15.3</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.7</version>
        </dependency>

2. 配置文件

spring:
  activemq:
    user: admin
    password: admin
    broker-url: failover:(tcp://192.168.43.666:61616)
    #是否信任所有包(如果傳遞的是對象則需要設置為true,默認是傳字符串)
    packages:
      trust-all: true
    #連接池
    pool:
      enabled: true
      max-connections: 5
      idle-timeout: 30000
#      expiry-timeout: 0
    jms:
      #默認使用queue模式,使用topic則需要設置為true
      pub-sub-domain: true

      # 是否信任所有包
      #spring.activemq.packages.trust-all=
      # 要信任的特定包的逗號分隔列表(當不信任所有包時)
      #spring.activemq.packages.trusted=
      # 當連接請求和池滿時是否阻塞。設置false會拋“JMSException異?!薄?
      #spring.activemq.pool.block-if-full=true
      # 如果池仍然滿,則在拋出異常前阻塞時間。
      #spring.activemq.pool.block-if-full-timeout=-1ms
      # 是否在啟動時創(chuàng)建連接??梢栽趩訒r用于加熱池。
      #spring.activemq.pool.create-connection-on-startup=true
      # 是否用Pooledconnectionfactory代替普通的ConnectionFactory。
      #spring.activemq.pool.enabled=false
      # 連接過期超時。
      #spring.activemq.pool.expiry-timeout=0ms
      # 連接空閑超時
      #spring.activemq.pool.idle-timeout=30s
      # 連接池最大連接數
      #spring.activemq.pool.max-connections=1
      # 每個連接的有效會話的最大數目。
      #spring.activemq.pool.maximum-active-session-per-connection=500
      # 當有"JMSException"時嘗試重新連接
      #spring.activemq.pool.reconnect-on-exception=true
      # 在空閑連接清除線程之間運行的時間。當為負數時,沒有空閑連接驅逐線程運行。
      #spring.activemq.pool.time-between-expiration-check=-1ms
      # 是否只使用一個MessageProducer
      #spring.activemq.pool.use-anonymous-producers=true

3. 生產者

package com.gblfy.producer;

import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jms.JmsProperties;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.jms.*;
import java.io.Serializable;

/**
 * 發(fā)送消息
 *
 * @author gblfy
 * @date 2022-11-02
 */
@RestController
@RequestMapping(value = "/active")
public class SendController {
    //也可以注入JmsTemplate,JmsMessagingTemplate對JmsTemplate進行了封裝
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    /**
     * 發(fā)送消息接口
     * 發(fā)送queue消息 :http://127.0.0.1:8080/active/send?msg=ceshi1234
     * 發(fā)送topic 消息: http://127.0.0.1:8080/active/topic/send?msg=ceshi1234
     * 發(fā)送queue消息(延遲time毫秒) :http://127.0.0.1:8080/active/send?msg=ceshi1234&time=5000
     *
     * @param msg  消息
     * @param type url中參數,非必須
     * @param time
     * @return
     */
    @RequestMapping({"/send", "/{type}/send"})
    public String send(@PathVariable(value = "type", required = false) String type, String msg, Long time) {
        Destination destination = null;
        if (type == null) {
            type = "";
        }
        switch (type) {
            case "topic":
                //發(fā)送廣播消息
                destination = new ActiveMQTopic("active.topic");
                break;
            default:
                //發(fā)送 隊列消息
                destination = new ActiveMQQueue("active.queue");
                break;
        }
        // System.out.println("開始請求發(fā)送:"+DateUtil.getStringDate(new Date(),"yyyy-MM-dd HH:mm:ss"));
        if (time != null && time > 0) {
            //延遲隊列,延遲time毫秒
            //延遲隊列需要在 <broker>標簽上增加屬性 schedulerSupport="true"
            delaySend(destination, msg, time);
        } else {
            jmsMessagingTemplate.convertAndSend(destination, msg);//無序
            //jmsMessagingTemplate.convertSendAndReceive();//有序
        }
        return "activemq消息發(fā)送成功 隊列消息:" + msg;
    }

    /**
     * 延時發(fā)送
     * 說明:延遲隊列需要在 <broker>標簽上增加屬性 schedulerSupport="true"
     *
     * @param destination 發(fā)送的隊列
     * @param data        發(fā)送的消息
     * @param time        延遲時間 /毫秒
     */
    public <T extends Serializable> void delaySend(Destination destination, T data, Long time) {
        Connection connection = null;
        Session session = null;
        MessageProducer producer = null;
        // 獲取連接工廠
        ConnectionFactory connectionFactory = jmsMessagingTemplate.getConnectionFactory();
        try {
            // 獲取連接
            connection = connectionFactory.createConnection();
            connection.start();
            // 獲取session,true開啟事務,false關閉事務
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 創(chuàng)建一個消息隊列
            producer = session.createProducer(destination);
            producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue());
            ObjectMessage message = session.createObjectMessage(data);
            //設置延遲時間
            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
            // 發(fā)送消息
            producer.send(message);
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (producer != null) {
                    producer.close();
                }
                if (session != null) {
                    session.close();
                }
                if (connection != null) {
                    connection.close();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

4. 配置config

package com.gblfy.config;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;

import javax.jms.Queue;
import javax.jms.Topic;

/**
 * 描述:
 * activemq 有兩種模式 queue 和 topic
 * queue 模式是單對單,有多個消費者的情況下則是使用輪詢監(jiān)聽
 * topic 模式/廣播模式/發(fā)布訂閱模式 是一對多,發(fā)送消息所有的消費者都能夠監(jiān)聽到
 *
 * @author gblfy
 * @date 2022-11-02
 */
@EnableJms
@Configuration
public class ActiveMQConfig {
    //隊列名
    private static final String queueName = "active.queue";
    //主題名
    private static final String topicName = "active.topic";

    @Value("${spring.activemq.user:}")
    private String username;
    @Value("${spring.activemq.password:}")
    private String password;
    @Value("${spring.activemq.broker-url:}")
    private String brokerUrl;

    @Bean
    public Queue acQueue() {
        return new ActiveMQQueue(queueName);
    }

    @Bean
    public Topic acTopic() {
        return new ActiveMQTopic(topicName);
    }

    @Bean
    public ActiveMQConnectionFactory connectionFactory() {
        return new ActiveMQConnectionFactory(username, password, brokerUrl);
    }

    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        // 關閉Session事務,手動確認與事務沖突
        bean.setSessionTransacted(false);
        // 設置消息的簽收模式(自己簽收)
        /**
         * AUTO_ACKNOWLEDGE = 1 :自動確認
         * CLIENT_ACKNOWLEDGE = 2:客戶端手動確認
         * DUPS_OK_ACKNOWLEDGE = 3: 自動批量確認
         * SESSION_TRANSACTED = 0:事務提交并確認
         * 但是在activemq補充了一個自定義的ACK模式:
         * INDIVIDUAL_ACKNOWLEDGE = 4:單條消息確認
         **/
        bean.setSessionAcknowledgeMode(4);
        //此處設置消息重發(fā)規(guī)則,redeliveryPolicy() 中定義
        connectionFactory.setRedeliveryPolicy(redeliveryPolicy());
        bean.setConnectionFactory(connectionFactory);
        return bean;
    }

    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        // 關閉Session事務,手動確認與事務沖突
        bean.setSessionTransacted(false);
        bean.setSessionAcknowledgeMode(4);
        //設置為發(fā)布訂閱方式, 默認情況下使用的生產消費者方式
        bean.setPubSubDomain(true);
        bean.setConnectionFactory(connectionFactory);
        return bean;
    }

    /**
     * 消息的重發(fā)規(guī)則配置
     */
    @Bean
    public RedeliveryPolicy redeliveryPolicy() {
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        // 是否在每次嘗試重新發(fā)送失敗后,增長這個等待時間
        redeliveryPolicy.setUseExponentialBackOff(true);
        // 重發(fā)次數五次, 總共六次
        redeliveryPolicy.setMaximumRedeliveries(5);
        // 重發(fā)時間間隔,默認為1000ms(1秒)
        redeliveryPolicy.setInitialRedeliveryDelay(1000);
        // 重發(fā)時長遞增的時間倍數2
        redeliveryPolicy.setBackOffMultiplier(2);
        // 是否避免消息碰撞
        redeliveryPolicy.setUseCollisionAvoidance(false);
        // 設置重發(fā)最大拖延時間-1表示無延遲限制
        redeliveryPolicy.setMaximumRedeliveryDelay(-1);
        return redeliveryPolicy;
    }
}

5. queue消費者

package com.gblfy.listener;

import org.apache.activemq.command.ActiveMQMessage;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Session;

/**
 * TODO
 *
 * @author gblfy
 * @Date 2022-11-02
 **/
@Component
public class QueueListener {

    /**
     * queue 模式 單對單,兩個消費者監(jiān)聽同一個隊列則通過輪詢接收消息
     * containerFactory屬性的值關聯(lián)config類中的聲明
     *
     * @param msg
     */
    @JmsListener(destination = "active.queue", containerFactory = "jmsListenerContainerQueue")
    public void queueListener(ActiveMQMessage message, Session session, String msg) throws JMSException {
        try {
            System.out.println("active queue 接收到消息 " + msg);
            //手動簽收
            message.acknowledge();
        } catch (Exception e) {
            //重新發(fā)送
            session.recover();
        }
    }
}

6. topic消費者

package com.gblfy.listener;

import org.apache.activemq.command.ActiveMQMessage;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Session;

/**
 * TODO
 *
 * @author gblfy
 * @Date 2022-11-02
 **/
@Component
public class TopicListener {

    /**
     * topic 模式/廣播模式/發(fā)布訂閱模式 一對多,多個消費者可同時接收到消息
     * topic 模式無死信隊列,死信隊列是queue模式
     * containerFactory屬性的值關聯(lián)config類中的聲明
     *
     * @param msg
     */
    @JmsListener(destination = "active.topic", containerFactory = "jmsListenerContainerTopic")
    public void topicListener(ActiveMQMessage message, Session session, String msg) throws JMSException {
        try {
            // System.out.println("接收到消息:" + DateUtil.getStringDate(new Date(), "yyyy-MM-dd HH:mm:ss"));
            System.out.println("active topic 接收到消息 " + msg);
            System.out.println("");
            //手動簽收
            message.acknowledge();
        } catch (Exception e) {
            //重新發(fā)送
            session.recover();
        }
    }

    @JmsListener(destination = "active.topic", containerFactory = "jmsListenerContainerTopic")
    public void topicListener2(ActiveMQMessage message, Session session, String msg) throws JMSException {
        try {
            // System.out.println("接收到消息:" + DateUtil.getStringDate(new Date(), "yyyy-MM-dd HH:mm:ss"));
            System.out.println("active topic2 接收到消息 " + msg);
            System.out.println("");
            //手動簽收
            message.acknowledge();
        } catch (Exception e) {
            //重新發(fā)送
            session.recover();
        }
    }
}

6. ActiveMQ 消息存儲規(guī)則

QUEUE 點對點:

特點:消息遵循先到先得,消息只能被一個消費者消費。

消息存儲規(guī)則:消費者消費消息成功,MQ服務端消息刪除

TOPIC訂閱模式: 消息屬于廣播(訂閱)模式,消息會被所有的topic消費者消費消息。

消息存儲規(guī)則:所有消費者消費成功,MQ服務端消息刪除,有一個消息沒有沒有消費完成,消息也會存儲在MQ服務端。

舉例:

已經處于運行topic消費者5個,5個消費者消費完成后,MQ服務端消息刪除。

擴展點補充:如果想額外添加topic消費者,如果MQ服務端消息沒有被消費完畢,新增topic消費者可以消費以前未被消費的消息,
正常新增的只會消費新的topic消息。

總結

到此這篇關于SpringBoot整合ActiveMQ的文章就介紹到這了,更多相關SpringBoot整合ActiveMQ內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • Mac電腦安裝多個JDK版本的詳細圖文教程

    Mac電腦安裝多個JDK版本的詳細圖文教程

    目前使用的主流版本還是JDK 8,但偶爾會想體驗下新版本(或者舊版本),如果能裝多個版本的JDK,而且很方便的切換就好了,這篇文章主要給大家介紹了關于Mac電腦安裝多個JDK版本的相關資料,需要的朋友可以參考下
    2024-03-03
  • SpringBoot整合ActiveMQ過程解析

    SpringBoot整合ActiveMQ過程解析

    這篇文章主要介紹了SpringBoot整合ActiveMQ過程解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2019-09-09
  • SpringBoot簡單實現(xiàn)文件上傳

    SpringBoot簡單實現(xiàn)文件上傳

    這篇文章主要介紹了SpringBoot簡單實現(xiàn)文件上傳,文章圍繞主題展開詳細的內容介紹,具有一定的參考價值,感興趣的小伙伴可以參考一下
    2022-09-09
  • Spring?異步接口返回結果的四種方式

    Spring?異步接口返回結果的四種方式

    這篇文章主要介紹了Spring?異步接口返回結果的四種方式,文章圍繞主題展開詳細的內容介紹,具有一定的參考價值,感興趣的小伙伴可以參考一下
    2022-08-08
  • 解決pageHelper分頁失效以及如何配置問題

    解決pageHelper分頁失效以及如何配置問題

    這篇文章主要介紹了解決pageHelper分頁失效以及如何配置問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-04-04
  • java中實現(xiàn)四則運算代碼

    java中實現(xiàn)四則運算代碼

    本文給大家分享了幾個java中實現(xiàn)四則運算的代碼,有個人的也有網友的,代碼寫的不是很好,難免會有BUG,忘發(fā)現(xiàn)BUG的親們能提醒我下,好讓我改進
    2015-08-08
  • Java日期接收報錯:could?not?be?parsed,?unparsed?text?found?at?index?10解決辦法

    Java日期接收報錯:could?not?be?parsed,?unparsed?text?found?a

    在做Java開發(fā)時肯定會碰到傳遞時間參數的情況,這篇文章主要給大家介紹了關于Java日期接收報錯:could?not?be?parsed,?unparsed?text?found?at?index?10的解決辦法,文中通過代碼介紹的非常詳細,需要的朋友可以參考下
    2024-01-01
  • Java中Runnable和Thread的區(qū)別分析

    Java中Runnable和Thread的區(qū)別分析

    在java中可有兩種方式實現(xiàn)多線程,一種是繼承Thread類,一種是實現(xiàn)Runnable接口,下面就拉分別介紹一下這兩種方法的優(yōu)缺點
    2013-03-03
  • Java并發(fā)之異步的八種實現(xiàn)方式

    Java并發(fā)之異步的八種實現(xiàn)方式

    本文主要介紹了Java并發(fā)之異步的八種實現(xiàn)方式,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-06-06
  • Java+ElasticSearch+Pytorch實現(xiàn)以圖搜圖功能

    Java+ElasticSearch+Pytorch實現(xiàn)以圖搜圖功能

    這篇文章主要為大家詳細介紹了Java如何利用ElasticSearch和Pytorch實現(xiàn)以圖搜圖功能,文中的示例代碼講解詳細,具有一定的學習價值,感興趣的小伙伴可以了解一下
    2023-06-06

最新評論