spring整合JMS實現(xiàn)同步收發(fā)消息(基于ActiveMQ的實現(xiàn))
本文介紹了spring整合JMS實現(xiàn)同步收發(fā)消息(基于ActiveMQ的實現(xiàn)),分享給大家,具體如下:
1. 安裝ActiveMQ
注意:JDK版本需要1.7及以上才行
到Apache官方網(wǎng)站下載最新的ActiveMQ的安裝包,并解壓到本地目錄下,下載鏈接如下:http://activemq.apache.org/download.html,解壓后的目錄結(jié)構(gòu)如下:
bin目錄結(jié)構(gòu)如下:


如果我們是32位的機器,就雙擊win32目錄下的activemq.bat,如果是64位機器,則雙擊win64目錄下的activemq.bat,運行結(jié)果如下:

啟動成功!成功之后在瀏覽器輸入http://127.0.0.1:8161/地址,可以看到ActiveMQ的管理頁面,用戶名和密碼默認都是admin,如下:

2. 新建一個Maven工程,并配置pom文件如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.chhliu.myself</groupId>
<artifactId>activemq_start</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>activemq_start</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring-version>3.2.5.RELEASE</spring-version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring-version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring-version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring-version}</version>
</dependency>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>jsr250-api</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.13.3</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.0</version>
</dependency>
</dependencies>
</project>
3. 配置連接工廠(ConnectionFactory)
Spring給我們提供了如下的連接工廠:

其中SingleConnectionFactory保證每次返回的都是同一個連接,CachingConnectionFactory繼承了SingleConnectionFactory,在保證同一連接的同時,增加了緩存的功能,可以緩存Session以及生產(chǎn)者,消費者。當然,JMS提供的連接工廠只是用來實現(xiàn)管理的,并不是真正連接MQ的,真正的連接工廠需要具體的MQ廠商提供,下面我們以ActiveMQ為例來說明,配置如下:
<!-- 真正可以產(chǎn)生Connection的ConnectionFactory,由對應(yīng)的 JMS服務(wù)廠商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
為了減少我們連接的資源消耗,ActiveMQ為我們提供了一個連接工廠管理池--PooledConnectionFactory,通過連接工廠池,可以將Connection,Session等都放在池里面,用的時候直接返回池里面的內(nèi)容,無需臨時建立連接,節(jié)約開銷。配置如下:
<!-- 真正可以產(chǎn)生Connection的ConnectionFactory,由對應(yīng)的 JMS服務(wù)廠商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<!-- 通過往PooledConnectionFactory注入一個ActiveMQConnectionFactory可以用來將Connection,Session和MessageProducer池化這樣可以大大減少我們的資源消耗, -->
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory" ref="targetConnectionFactory" />
<property name="maxConnections" value="10" />
</bean>
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="pooledConnectionFactory" />
</bean>
4. 配置JmsTemplate
配置好連接工廠之后,就需要配置JMS的JmsTemplate,JmsTemplate的作用和JdbcTemplate類似,我們發(fā)送和接收消息,都是通過JmsTemplate來實現(xiàn)的,配置如下:
<!-- 配置生產(chǎn)者:配置好ConnectionFactory之后我們就需要配置生產(chǎn)者。生產(chǎn)者負責產(chǎn)生消息并發(fā)送到JMS服務(wù)器,這通常對應(yīng)的是我們的一個業(yè)務(wù)邏輯服務(wù)實現(xiàn)類。 但是我們的服務(wù)實現(xiàn)類是怎么進行消息的發(fā)送的呢?這通常是利用Spring為我們提供的JmsTemplate類來實現(xiàn)的, 所以配置生產(chǎn)者其實最核心的就是配置進行消息發(fā)送的JmsTemplate。對于消息發(fā)送者而言,它在發(fā)送消息的時候要知道自己該往哪里發(fā), 為此,我們在定義JmsTemplate的時候需要往里面注入一個Spring提供的ConnectionFactory對象 -->
<!-- Spring提供的JMS工具類,它可以進行消息發(fā)送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 這個connectionFactory對應(yīng)的是我們定義的Spring提供的那個ConnectionFactory對象 -->
<property name="connectionFactory" ref="connectionFactory" />
</bean>
5. 生產(chǎn)者實現(xiàn)
配置完這些之后,我們就可以寫代碼實現(xiàn)生產(chǎn)者和消費者了,生產(chǎn)者主要用來生產(chǎn)消息,并向目的隊列中推送消息,接口定義如下:
public interface ProducerService {
void sendMessage(Destination destination, final String message);
}
實現(xiàn)類代碼如下:
@Service("producerServiceImpl")
public class ProducerServiceImpl implements ProducerService {
/**
* 注入JmsTemplate
*/
@Resource(name="jmsTemplate")
private JmsTemplate jTemplate;
/**
* attention:
* Details:發(fā)送消息
* @author chhliu
* 創(chuàng)建時間:2016-7-28 下午2:33:14
* @param destination
* @param message
*/
@Override
public void sendMessage(Destination receivedestination, final String message) {
System.out.println("================生產(chǎn)者創(chuàng)建了一條消息==============");
jTemplate.send(receivedestination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage("hello acticeMQ:"+message);
}
});
}
}
6. 消費者實現(xiàn)
假設(shè)生產(chǎn)者已經(jīng)創(chuàng)建了一條消息,并推送到了對應(yīng)的隊列中,消費者需要從這個隊列中取出消息,并同時回復一條報文,自己已經(jīng)收到了這條消息,為了測試回復報文的功能,我們下面會將回復報文放到另一個隊列中,此例使用同步接收消息的方式,而不是異步監(jiān)聽的方式實現(xiàn),接口定義如下:
public interface ConsumerService {
String receiveMessage(Destination destination, Destination replyDestination);
}
實現(xiàn)類代碼如下:
@Service("consumerServiceImpl")
public class ConsumerServiceImpl implements ConsumerService {
/**
* 注入JmsTemplate
*/
@Resource(name="jmsTemplate")
private JmsTemplate jTemplate;
/**
* attention:
* Details:接收消息,同時回復消息
* @author chhliu
* 創(chuàng)建時間:2016-7-28 下午2:39:45
* @param destination
* @return
*/
@Override
public String receiveMessage(Destination destination, Destination replyDestination) {
/**
* 接收消息隊列中的消息
*/
Message message = jTemplate.receive(destination);
try {
/**
* 此處為了更好的容錯性,可以使用instanceof來判斷下消息類型
*/
if(message instanceof TextMessage){
String receiveMessage = ((TextMessage) message).getText();
System.out.println("收到生產(chǎn)者的消息:"+receiveMessage);
/**
* 收到消息之后,將回復報文放到回復隊列里面去
*/
jTemplate.send(replyDestination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage("消費者已經(jīng)收到生產(chǎn)者的消息了,這是一條確認報文!");
}
});
return receiveMessage;
}
} catch (JMSException e) {
e.printStackTrace();
}
return "";
}
}
生產(chǎn)者和消費者實現(xiàn)之后,我們要做的就是配置隊列了,下面給出項目完整的配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:p="http://www.springframework.org/schema/p" xmlns:cache="http://www.springframework.org/schema/cache"
xmlns:jpa="http://www.springframework.org/schema/data/jpa"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.2.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.2.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.2.xsd
http://www.springframework.org/schema/cache
http://www.springframework.org/schema/cache/spring-cache-3.2.xsd
http://www.springframework.org/schema/data/jpa
http://www.springframework.org/schema/data/jpa/spring-jpa-1.3.xsd">
<!-- 掃描注解包 -->
<context:annotation-config />
<context:component-scan base-package="com.chhliu.myself.activemq.start"></context:component-scan>
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory" ref="targetConnectionFactory" />
<property name="maxConnections" value="10" />
</bean>
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="pooledConnectionFactory" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
</bean>
<!-- 在真正利用JmsTemplate進行消息發(fā)送的時候,我們需要知道消息發(fā)送的目的地,即destination。 在Jms中有一個用來表示目的地的Destination接口,它里面沒有任何方法定義,只是用來做一個標識而已。當我們在使用JmsTemplate進行消息發(fā)送時沒有指定destination的時候?qū)⑹褂媚J的Destination。 默認Destination可以通過在定義jmsTemplate bean對象時通過屬性defaultDestination或defaultDestinationName來進行注入, defaultDestinationName對應(yīng)的就是一個普通字符串 -->
<!--這個是隊列目的地,點對點的 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>NTF_MOCK_INPUT</value>
</constructor-arg>
</bean>
<!--這個是回復隊列,點對點的 -->
<bean id="responseQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>NTF_MOCK_OUTPUT</value>
</constructor-arg>
</bean>
</beans>
到這里,所有的代碼和配置文件就都整好了,下面就是進行測試,測試代碼如下:
生產(chǎn)者測試代碼:
package com.chhliu.myself.activemq.start.sync;
import javax.annotation.Resource;
import javax.jms.Destination;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:applicationContext.xml" })
public class SyncProducerActiveMQTest {
@Resource(name="producerServiceImpl")
private ProducerService pService;
@Resource(name="queueDestination")
private Destination receiveQueue;
@Test
public void producerTest(){
pService.sendMessage(receiveQueue, "my name is chhliu!");
}
}
消費者測試代碼:
package com.chhliu.myself.activemq.start.sync;
import javax.annotation.Resource;
import javax.jms.Destination;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:applicationContext.xml" })
public class SyncConsumerActiveMQTest {
@Resource(name="consumerServiceImpl")
private ConsumerService cService;
@Resource(name="queueDestination")
private Destination receiveQueue;
@Resource(name="responseQueue")
private Destination replyQueue;
@Test
public void producerTest(){
String result = cService.receiveMessage(receiveQueue, replyQueue);
System.out.println(result);
}
}
測試結(jié)果如下:
生產(chǎn)者測試結(jié)果: ================生產(chǎn)者創(chuàng)建了一條消息============== 消費者測試結(jié)果: 收到生產(chǎn)者的消息:hello acticeMQ:my name is chhliu! hello acticeMQ:my name is chhliu!
再來看下ActiveMQ的管理頁面的結(jié)果:

從管理頁面中可以看到,生產(chǎn)者生產(chǎn)了消息,并且入隊列了,同時消費者也消費了消息,并將回復消息放到了回復隊列中,測試成功。
但是這種同步取消息的方式有個缺點,每次只會取一條消息消費,取完之后就會一直阻塞,下面來測試一下:首先讓生產(chǎn)者再生產(chǎn)5條消息,然后運行消費者程序,發(fā)現(xiàn)會只消費一條消息,除非我們在消費者程序里面加while(true),一直輪詢隊列,這種實現(xiàn)方式不僅耗內(nèi)存,效率也不是很高,后面,我們會對這種方式進行改進,使用異步監(jiān)聽模式,測試效果如下:
生產(chǎn)者創(chuàng)建了5條消息:
=======生產(chǎn)者創(chuàng)建了一條消息========
=======生產(chǎn)者創(chuàng)建了一條消息========
=======生產(chǎn)者創(chuàng)建了一條消息========
=======生產(chǎn)者創(chuàng)建了一條消息========
======生產(chǎn)者創(chuàng)建了一條消息=========
ActiveMQ管理頁面如下:

消費者消費一條消息:
收到生產(chǎn)者的消息:hello acticeMQ:my name is chhliu!
hello acticeMQ:my name is chhliu!
消費者消費消息后,ActiveMQ管理頁面如下:

從上面的對比中,我們可以看出來,同步模式下,消費者消費消息時,是逐條消費,每次只消費一條消息。
以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
SpringCloud?Gateway詳細分析實現(xiàn)負載均衡與熔斷和限流
這篇文章主要介紹了SpringCloud?Gateway實現(xiàn)路由轉(zhuǎn)發(fā),負載均衡,熔斷和限流,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-07-07
springboot集成redis實現(xiàn)簡單秒殺系統(tǒng)
這篇文章主要為大家詳細介紹了springboot集成redis實現(xiàn)簡單秒殺系統(tǒng),文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-12-12
MyBatis-Plus通用枚舉自動關(guān)聯(lián)注入的實現(xiàn)
本文主要介紹了MyBatis-Plus通用枚舉自動關(guān)聯(lián)注入的實現(xiàn),解決了繁瑣的配置,讓 mybatis 優(yōu)雅的使用枚舉屬性,感興趣的可以一起來了解一下2021-06-06
Java中使用正則表達式獲取網(wǎng)頁中所有圖片的路徑
這篇文章主要介紹了Java中使用正則表達式獲取網(wǎng)頁中所有圖片的路徑,本文直接給出實例代碼,需要的朋友可以參考下2015-06-06

