Spring Boot教程之利用ActiveMQ實(shí)現(xiàn)延遲消息
一、安裝activeMQ
Linux環(huán)境ActiveMQ部署方法:http://www.dbjr.com.cn/article/162320.htm
安裝步驟參照上面這篇文章,本文不做介紹
Windows下安裝ActiveMQ:
到官網(wǎng)(http://activemq.apache.org/download-archives.html)下載最新發(fā)布的壓縮包(我下的是5.15.9)到本地后解壓(我解壓到D盤Dev目錄下)即可。進(jìn)入解壓后的bin目錄,我是64位機(jī)器,再進(jìn)入win64目錄后,雙擊activemq.bat啟動(dòng):
wrapper | --> Wrapper Started as Console
wrapper | Launching a JVM...
jvm 1 | Wrapper (Version 3.2.3) http://wrapper.tanukisoftware.org
jvm 1 | Copyright 1999-2006 Tanuki Software, Inc. All Rights Reserved.
jvm 1 |
jvm 1 | Java Runtime: Oracle Corporation 1.8.0_181 C:\Program Files\Java\jre1.8.0_181
jvm 1 | Heap sizes: current=125952k free=115299k max=932352k
jvm 1 | JVM args: -Dactivemq.home=../.. -Dactivemq.base=../.. -Djavax.net.ssl.keyStorePassword=password -Djavax.net.ssl.trustStorePassword=password -Djavax.net.ssl.keyStore=../../conf/broker.ks -Djavax.net.ssl.trustStore=../../conf/broker.ts -Dcom.sun.management.jmxremote -Dorg.apache.activemq.UseDedicatedTaskRunner=true -Djava.util.logging.config.file=logging.properties -Dactivemq.conf=../../conf -Dactivemq.data=../../data -Djava.security.auth.login.config=../../conf/login.config -Xmx1024m -Djava.library.path=../../bin/win64 -Dwrapper.key=mChNCWMZ2FoXhZ9g -Dwrapper.port=32000 -Dwrapper.jvm.port.min=31000 -Dwrapper.jvm.port.max=31999 -Dwrapper.pid=3500 -Dwrapper.version=3.2.3 -Dwrapper.native_library=wrapper -Dwrapper.cpu.timeout=10 -Dwrapper.jvmid=1
jvm 1 | Extensions classpath:
jvm 1 | [..\..\lib,..\..\lib\camel,..\..\lib\optional,..\..\lib\web,..\..\lib\extra]
jvm 1 | ACTIVEMQ_HOME: ..\..
jvm 1 | ACTIVEMQ_BASE: ..\..
jvm 1 | ACTIVEMQ_CONF: ..\..\conf
jvm 1 | ACTIVEMQ_DATA: ..\..\data
jvm 1 | Loading message broker from: xbean:activemq.xml
jvm 1 | INFO | Refreshing org.apache.activemq.xbean.XBeanBrokerFactory$1@f0ef68d: startup date [Fri May 24 15:16:21 CST 2019]; root of context hierarchy
jvm 1 | INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[D:\Dev\apache-activemq-5.15.9\bin\win64\..\..\data\kahadb]
jvm 1 | INFO | PListStore:[D:\Dev\apache-activemq-5.15.9\bin\win64\..\..\data\localhost\tmp_storage] started
jvm 1 | INFO | Apache ActiveMQ 5.15.9 (localhost, ID:wulf00-51057-1558682182909-0:1) is starting
jvm 1 | INFO | Listening for connections at: tcp://wulf00:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600
jvm 1 | INFO | Connector openwire started
jvm 1 | INFO | Listening for connections at: amqp://wulf00:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600
jvm 1 | INFO | Connector amqp started
jvm 1 | INFO | Listening for connections at: stomp://wulf00:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600
jvm 1 | INFO | Connector stomp started
jvm 1 | INFO | Listening for connections at: mqtt://wulf00:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600
jvm 1 | INFO | Connector mqtt started
jvm 1 | INFO | Starting Jetty server
jvm 1 | INFO | Creating Jetty connector
jvm 1 | WARN | ServletContext@o.e.j.s.ServletContextHandler@17bc7c8a{/,null,STARTING} has uncovered http methods for path: /
jvm 1 | INFO | Listening for connections at ws://wulf00:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600
jvm 1 | INFO | Connector ws started
jvm 1 | INFO | Apache ActiveMQ 5.15.9 (localhost, ID:wulf00-51057-1558682182909-0:1) started
jvm 1 | INFO | For help or more information please see: http://activemq.apache.org
jvm 1 | WARN | Store limit is 102400 mb (current store usage is 0 mb). The data directory: D:\Dev\apache-activemq-5.15.9\bin\win64\..\..\data\kahadb only has 92649 mb of usable space. - resetting to maximum available disk space: 92649 mb
jvm 1 | INFO | No Spring WebApplicationInitializer types detected on classpath
jvm 1 | INFO | ActiveMQ WebConsole available at http://0.0.0.0:8161/
jvm 1 | INFO | ActiveMQ Jolokia REST API available at http://0.0.0.0:8161/api/jolokia/
jvm 1 | INFO | Initializing Spring FrameworkServlet 'dispatcher'
jvm 1 | INFO | No Spring WebApplicationInitializer types detected on classpath
jvm 1 | INFO | jolokia-agent: Using policy access restrictor classpath:/jolokia-access.xml
默認(rèn)端口8161,訪問下http://localhost:8161/admin,用戶名密碼都是admin,進(jìn)入控制臺(tái)頁面:

我們用坐上方的Queues來創(chuàng)建一個(gè)叫vboxlog的隊(duì)列:
二、修改activeMQ配置文件
broker新增配置信息 schedulerSupport="true"
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true" >
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" >
<!-- The constantPendingMessageLimitStrategy is used to prevent
slow topic consumers to block producers and affect other consumers
by limiting the number of messages that are retained
For more information, see:
http://activemq.apache.org/slow-consumer-handling.html
-->
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
三、創(chuàng)建SpringBoot工程
1、配置ActiveMQ工廠信息,信任包必須配置否則會(huì)報(bào)錯(cuò)
package com.example.demoactivemq.config;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.ArrayList;
import java.util.List;
/**
* @author shanks on 2019-11-12
*/
@Configuration
public class ActiveMqConfig {
@Bean
public ActiveMQConnectionFactory factory(@Value("${spring.activemq.broker-url}") String url){
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
// 設(shè)置信任序列化包集合
List<String> models = new ArrayList<>();
models.add("com.example.demoactivemq.domain");
factory.setTrustedPackages(models);
return factory;
}
}
消息實(shí)體類
package com.example.demoactivemq.domain;
import lombok.Builder;
import lombok.Data;
import java.io.Serializable;
/**
* @author shanks on 2019-11-12
*/
@Builder
@Data
public class MessageModel implements Serializable {
private String titile;
private String message;
}
生產(chǎn)者
package com.example.demoactivemq.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jms.JmsProperties;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import javax.jms.*;
import java.io.Serializable;
/**
* 消息生產(chǎn)者
*
* @author shanks
*/
@Service
@Slf4j
public class Producer {
public static final Destination DEFAULT_QUEUE = new ActiveMQQueue("delay.queue");
@Autowired
private JmsMessagingTemplate template;
/**
* 發(fā)送消息
*
* @param destination destination是發(fā)送到的隊(duì)列
* @param message message是待發(fā)送的消息
*/
public <T extends Serializable> void send(Destination destination, T message) {
template.convertAndSend(destination, message);
}
/**
* 延時(shí)發(fā)送
*
* @param destination 發(fā)送的隊(duì)列
* @param data 發(fā)送的消息
* @param time 延遲時(shí)間
*/
public <T extends Serializable> void delaySend(Destination destination, T data, Long time) {
Connection connection = null;
Session session = null;
MessageProducer producer = null;
// 獲取連接工廠
ConnectionFactory connectionFactory = template.getConnectionFactory();
try {
// 獲取連接
connection = connectionFactory.createConnection();
connection.start();
// 獲取session,true開啟事務(wù),false關(guān)閉事務(wù)
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 創(chuàng)建一個(gè)消息隊(duì)列
producer = session.createProducer(destination);
producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue());
ObjectMessage message = session.createObjectMessage(data);
//設(shè)置延遲時(shí)間
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
// 發(fā)送消息
producer.send(message);
log.info("發(fā)送消息:{}", data);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (producer != null) {
producer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
消費(fèi)者
package com.example.demoactivemq.producer;
import com.example.demoactivemq.domain.MessageModel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/**
* 消費(fèi)者
*/
@Component
@Slf4j
public class Consumer {
@JmsListener(destination = "delay.queue")
public void receiveQueue(MessageModel message) {
log.info("收到消息:{}", message);
}
}
application.yml
spring: activemq: broker-url: tcp://localhost:61616
測(cè)試類
package com.example.demoactivemq;
import com.example.demoactivemq.domain.MessageModel;
import com.example.demoactivemq.producer.Producer;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest(classes = DemoActivemqApplication.class)
@RunWith(SpringRunner.class)
class DemoActivemqApplicationTests {
/**
* 消息生產(chǎn)者
*/
@Autowired
private Producer producer;
/**
* 及時(shí)消息隊(duì)列測(cè)試
*/
@Test
public void test() {
MessageModel messageModel = MessageModel.builder()
.message("測(cè)試消息")
.titile("消息000")
.build();
// 發(fā)送消息
producer.send(Producer.DEFAULT_QUEUE, messageModel);
}
/**
* 延時(shí)消息隊(duì)列測(cè)試
*/
@Test
public void test2() {
for (int i = 0; i < 5; i++) {
MessageModel messageModel = MessageModel.builder()
.titile("延遲10秒執(zhí)行")
.message("測(cè)試消息" + i)
.build();
// 發(fā)送延遲消息
producer.delaySend(Producer.DEFAULT_QUEUE, messageModel, 10000L);
}
try {
// 休眠100秒,等等消息執(zhí)行
Thread.currentThread().sleep(100000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
執(zhí)行結(jié)果
2019-11-12 22:18:52.939 INFO 17263 --- [ main] c.e.demoactivemq.producer.Producer : 發(fā)送消息:MessageModel(titile=延遲10秒執(zhí)行, message=測(cè)試消息0)
2019-11-12 22:18:52.953 INFO 17263 --- [ main] c.e.demoactivemq.producer.Producer : 發(fā)送消息:MessageModel(titile=延遲10秒執(zhí)行, message=測(cè)試消息1)
2019-11-12 22:18:52.958 INFO 17263 --- [ main] c.e.demoactivemq.producer.Producer : 發(fā)送消息:MessageModel(titile=延遲10秒執(zhí)行, message=測(cè)試消息2)
2019-11-12 22:18:52.964 INFO 17263 --- [ main] c.e.demoactivemq.producer.Producer : 發(fā)送消息:MessageModel(titile=延遲10秒執(zhí)行, message=測(cè)試消息3)
2019-11-12 22:18:52.970 INFO 17263 --- [ main] c.e.demoactivemq.producer.Producer : 發(fā)送消息:MessageModel(titile=延遲10秒執(zhí)行, message=測(cè)試消息4)
2019-11-12 22:19:03.012 INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer : 收到消息:MessageModel(titile=延遲10秒執(zhí)行, message=測(cè)試消息0)
2019-11-12 22:19:03.017 INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer : 收到消息:MessageModel(titile=延遲10秒執(zhí)行, message=測(cè)試消息1)
2019-11-12 22:19:03.019 INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer : 收到消息:MessageModel(titile=延遲10秒執(zhí)行, message=測(cè)試消息2)
2019-11-12 22:19:03.020 INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer : 收到消息:MessageModel(titile=延遲10秒執(zhí)行, message=測(cè)試消息3)
2019-11-12 22:19:03.021 INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer : 收到消息:MessageModel(titile=延遲10秒執(zhí)行, message=測(cè)試消息4)
比你優(yōu)秀的人比你還努力,你有什么資格不去奮斗!!!
總結(jié)
以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,謝謝大家對(duì)腳本之家的支持。
相關(guān)文章
spring boot中各個(gè)版本的redis配置問題詳析
這篇文章主要給大家介紹了關(guān)于spring boot中各個(gè)版本的redis配置問題的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2018-12-12
Java的Netty進(jìn)階之Future和Promise詳解
這篇文章主要介紹了Java的Netty進(jìn)階之Future和Promise詳解,Netty 是基于 Java NIO 的異步事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用框架,使用 Netty 可以快速開發(fā)網(wǎng)絡(luò)應(yīng)用,Netty 提供了高層次的抽象來簡化 TCP 和 UDP 服務(wù)器的編程,但是你仍然可以使用底層的 API,需要的朋友可以參考下2023-11-11
Spring?Boot實(shí)現(xiàn)WebSocket實(shí)時(shí)通信
本文主要介紹了Spring?Boot實(shí)現(xiàn)WebSocket實(shí)時(shí)通信,包含實(shí)現(xiàn)實(shí)時(shí)消息傳遞和群發(fā)消息等功能,具有一定的參考價(jià)值,感興趣的可以了解一下2024-05-05
在Map中實(shí)現(xiàn)key唯一不重復(fù)操作
這篇文章主要介紹了在Map中實(shí)現(xiàn)key唯一不重復(fù)操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08
詳解SpringCloud Finchley Gateway 統(tǒng)一異常處理
這篇文章主要介紹了詳解SpringCloud Finchley Gateway 統(tǒng)一異常處理,非常具有實(shí)用價(jià)值,需要的朋友可以參考下2018-10-10

