SpringMVC和rabbitmq集成的使用案例
1.添加maven依賴
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.5.1</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.4.5.RELEASE</version> </dependency>
2.spring主配置文件中加入rabbitMQ xml文件的配置
<!-- rabbitMQ 配置 --> <import resource="/application-mq.xml"/>
3.jdbc配置文件中加入 rabbitmq的鏈接配置
#rabbitMQ配置 mq.host=localhost mq.username=donghao mq.password=donghao mq.port=5672 mq.vhost=testMQ
4.新建application-mq.xml文件,添加配置信息
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" >
<description>rabbitmq 連接服務(wù)配置</description>
<!-- 連接配置 -->
<rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" port="${mq.port}" virtual-host="${mq.vhost}"/>
<rabbit:admin connection-factory="connectionFactory"/>
<!-- spring template聲明-->
<rabbit:template exchange="koms" id="amqpTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" />
<!-- 消息對象json轉(zhuǎn)換類 -->
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
<!--
durable:是否持久化
exclusive: 僅創(chuàng)建者可以使用的私有隊列,斷開后自動刪除
auto_delete: 當(dāng)所有消費客戶端連接斷開后,是否自動刪除隊列
-->
<!-- 申明一個消息隊列Queue -->
<rabbit:queue id="order" name="order" durable="true" auto-delete="false" exclusive="false" />
<rabbit:queue id="activity" name="activity" durable="true" auto-delete="false" exclusive="false" />
<rabbit:queue id="mail" name="mail" durable="true" auto-delete="false" exclusive="false" />
<rabbit:queue id="stock" name="stock" durable="true" auto-delete="false" exclusive="false" />
<rabbit:queue id="autoPrint" name="autoPrint" durable="true" auto-delete="false" exclusive="false" />
<!--
rabbit:direct-exchange:定義exchange模式為direct,意思就是消息與一個特定的路由鍵完全匹配,才會轉(zhuǎn)發(fā)。
rabbit:binding:設(shè)置消息queue匹配的key
-->
<!-- 交換機定義 -->
<rabbit:direct-exchange name="koms" durable="true" auto-delete="false" id="koms">
<rabbit:bindings>
<rabbit:binding queue="order" key="order"/>
<rabbit:binding queue="activity" key="activity"/>
<rabbit:binding queue="mail" key="mail"/>
<rabbit:binding queue="stock" key="stock"/>
<rabbit:binding queue="autoPrint" key="autoPrint"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<!--
queues:監(jiān)聽的隊列,多個的話用逗號(,)分隔
ref:監(jiān)聽器
-->
<!-- 配置監(jiān)聽 acknowledeg = "manual" 設(shè)置手動應(yīng)答 當(dāng)消息處理失敗時:會一直重發(fā) 直到消息處理成功 -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
<!-- 配置監(jiān)聽器 -->
<rabbit:listener queues="activity" ref="activityListener"/>
<rabbit:listener queues="order" ref="orderListener"/>
<rabbit:listener queues="mail" ref="mailListener"/>
<rabbit:listener queues="stock" ref="stockListener"/>
<rabbit:listener queues="autoPrint" ref="autoPrintListener"/>
</rabbit:listener-container>
</beans>
5.新增公共入隊類
@Service
public class MQProducerImpl{
@Resource
private AmqpTemplate amqpTemplate;
private final static Logger logger = LoggerFactory.getLogger(MQProducerImpl.class);
//公共入隊方法
public void sendDataToQueue(String queueKey, Object object) {
try {
amqpTemplate.convertAndSend(queueKey, object);
} catch (Exception e) {
logger.error(e.toString());
}
}
}
6.創(chuàng)建監(jiān)聽類

import java.io.IOException;
import java.util.List;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.utils.SerializationUtils;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import com.cn.framework.domain.BaseDto;
import com.cn.framework.util.ConstantUtils;
import com.cn.framework.util.RabbitMq.producer.MQProducer;
import com.kxs.service.activityService.IActivityService;
import com.kxs.service.messageService.IMessageService;
import com.rabbitmq.client.Channel;
/**
* 活動處理listener
* @author
* @date 2017年6月30日
**/
@Component
public class ActivityListener implements ChannelAwareMessageListener {
private static final Logger log = LoggerFactory.getLogger(ActivityListener.class);
@Override
@Transactional
public void onMessage(Message message,Channel channel) {
}
}

項目啟動后 控制臺會打印出監(jiān)聽的日志信息 這里寫圖片描述
結(jié)尾:僅供參考,自己用作學(xué)習(xí)記錄,不喜勿噴,共勉!
補充:RabbitMQ與SpringMVC集成并實現(xiàn)發(fā)送消息和接收消息(持久化)方案
RabbitMQ本篇不介紹了,直接描述RabbitMQ與SpringMVC集成并實現(xiàn)發(fā)送消息和接收消息(持久化)。
使用了Spring-rabbit 發(fā)送消息和接收消息,我們使用的Maven來管理Jar包,在Maven的pom.xml文件中引入jar包
<span style="font-size:18px;"> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.3.6.RELEASE</version> </dependency></span>
1.實現(xiàn)生產(chǎn)者
第一步:是要設(shè)置調(diào)用安裝RabbitMQ的IP、端口等
配置一個global.properties文件

第二步:通過SpringMVC把global.properties文件讀進來
<span style="font-size:18px;"><!-- 注入屬性文件 -->
<bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>classpath:global.properties</value>
</list>
</property>
</bean> </span>
第三步:配置 RabbitMQ服務(wù)器連接、創(chuàng)建rabbitTemplate 消息模板類等,在SpringMVC的配置文件加入下面這些
<bean id="rmqProducer2" class="cn.test.spring.rabbitmq.RmqProducer"></bean>
<span style="font-size:18px;"> <!-- 創(chuàng)建連接類 -->
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="localhost" />
<property name="username" value="${rmq.manager.user}" />
<property name="password" value="${rmq.manager.password}" />
<property name="host" value="${rmq.ip}" />
<property name="port" value="${rmq.port}" />
</bean>
<bean id="rabbitAdmin" class="org.springframework.amqp.rabbit.core.RabbitAdmin">
<constructor-arg ref="connectionFactory" />
</bean>
<!-- 創(chuàng)建rabbitTemplate 消息模板類 -->
<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<constructor-arg ref="connectionFactory"></constructor-arg>
</bean> </span>
第四步:實現(xiàn)消息類實體和發(fā)送消息
類實體
<span style="font-size:18px;">/**
* 消息
*
*/
public class RabbitMessage implements Serializable
{
private static final long serialVersionUID = -6487839157908352120L;
private Class<?>[] paramTypes;//參數(shù)類型
private String exchange;//交換器
private Object[] params;
private String routeKey;//路由key
public RabbitMessage(){}
public RabbitMessage(String exchange,String routeKey,Object...params)
{
this.params=params;
this.exchange=exchange;
this.routeKey=routeKey;
}
@SuppressWarnings("rawtypes")
public RabbitMessage(String exchange,String routeKey,String methodName,Object...params)
{
this.params=params;
this.exchange=exchange;
this.routeKey=routeKey;
int len=params.length;
Class[] clazzArray=new Class[len];
for(int i=0;i<len;i++)
clazzArray[i]=params[i].getClass();
this.paramTypes=clazzArray;
}
public byte[] getSerialBytes()
{
byte[] res=new byte[0];
ByteArrayOutputStream baos=new ByteArrayOutputStream();
ObjectOutputStream oos;
try {
oos = new ObjectOutputStream(baos);
oos.writeObject(this);
oos.close();
res=baos.toByteArray();
} catch (IOException e) {
e.printStackTrace();
}
return res;
}
public String getRouteKey() {
return routeKey;
}
public String getExchange() {
return exchange;
}
public void setExchange(String exchange) {
this.exchange = exchange;
}
public void setRouteKey(String routeKey) {
this.routeKey = routeKey;
}
public Class<?>[] getParamTypes() {
return paramTypes;
}
public Object[] getParams() {
return params;
}
}
</span>
發(fā)送消息
<span style="font-size:18px;">/**
* 生產(chǎn)著
*
*/
public class RmqProducer
{
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 發(fā)送信息
* @param msg
*/
public void sendMessage(RabbitMessage msg)
{
try {
System.out.println(rabbitTemplate.getConnectionFactory().getHost());
System.out.println(rabbitTemplate.getConnectionFactory().getPort());
//發(fā)送信息
rabbitTemplate.convertAndSend(msg.getExchange(), msg.getRouteKey(), msg);
} catch (Exception e) {
}
}
}</span>
說明:
1. rabbitTemplate.convertAndSend(msg.getExchange(), msg.getRouteKey(), msg);
源代碼中的send調(diào)用的方法,一些發(fā)送消息幫我們實現(xiàn)好了。

2.上面的代碼實現(xiàn)沒申明交換器和隊列,RabbitMQ不知交換器和隊列他們的綁定關(guān)系,如果RabbitMQ管理器上沒有對應(yīng)的交換器和隊列是不會新建的和關(guān)聯(lián)的,需要手動關(guān)聯(lián)。

我們也可以用代碼申明:
rabbitAdmin要申明:eclareExchange方法 參數(shù)是交換器
BindingBuilder.bind(queue).to(directExchange).with(queueName);//將queue綁定到exchange rabbitAdmin.declareBinding(binding);//聲明綁定關(guān)系
源代碼有這些方法:

這樣就可以實現(xiàn)交換器和隊列的綁定關(guān)系
交換器我們可以申明為持久化,還有使用完不會自動刪除
TopicExchange 參數(shù)的說明:name是交換器名稱,durable:true 是持久化 autoDelete:false使用完不刪除
源代碼:

隊列也可以申明為持久化

第五步:實現(xiàn)測試類
<span style="font-size:18px;">@Resource
private RmqProducer rmqProducer2;
@Test
public void test() throws IOException
{
String exchange="testExchange";交換器
String routeKey="testQueue";//隊列
String methodName="test";//調(diào)用的方法
//參數(shù)
Map<String,Object> param=new HashMap<String, Object>();
param.put("data","hello");
RabbitMessage msg=new RabbitMessage(exchange,routeKey, methodName, param);
//發(fā)送消息
rmqProducer2.sendMessage(msg);
}</span>
結(jié)果:RabbitMQ有一條消息

2.消費者
第一步:RabbitMQ服務(wù)器連接這些在生產(chǎn)者那邊已經(jīng)介紹了,這邊就不介紹了,我們要配置 RabbitMQ服務(wù)器連接、創(chuàng)建rabbitTemplate 消息模板類、消息轉(zhuǎn)換器、消息轉(zhuǎn)換器監(jiān)聽器等,在SpringMVC的配置文件加入下面這些
<span style="font-size:18px;"> <!-- 創(chuàng)建連接類 -->
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="localhost" />
<property name="username" value="${rmq.manager.user}" />
<property name="password" value="${rmq.manager.password}" />
<property name="host" value="${rmq.ip}" />
<property name="port" value="${rmq.port}" />
</bean>
<bean id="rabbitAdmin" class="org.springframework.amqp.rabbit.core.RabbitAdmin">
<constructor-arg ref="connectionFactory" />
</bean>
<!-- 創(chuàng)建rabbitTemplate 消息模板類 -->
<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<constructor-arg ref="connectionFactory"></constructor-arg>
</bean>
<!-- 創(chuàng)建消息轉(zhuǎn)換器為SimpleMessageConverter -->
<bean id="serializerMessageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter"></bean>
<!-- 設(shè)置持久化的隊列 -->
<bean id="queue" class="org.springframework.amqp.core.Queue">
<constructor-arg index="0" value="testQueue"></constructor-arg>
<constructor-arg index="1" value="true"></constructor-arg>
<constructor-arg index="2" value="false"></constructor-arg>
<constructor-arg index="3" value="false"></constructor-arg>
</bean>
<!--創(chuàng)建交換器的類型 并持久化-->
<bean id="topicExchange" class="org.springframework.amqp.core.TopicExchange">
<constructor-arg index="0" value="testExchange"></constructor-arg>
<constructor-arg index="1" value="true"></constructor-arg>
<constructor-arg index="2" value="false"></constructor-arg>
</bean>
<util:map id="arguments">
</util:map>
<!-- 綁定交換器、隊列 -->
<bean id="binding" class="org.springframework.amqp.core.Binding">
<constructor-arg index="0" value="testQueue"></constructor-arg>
<constructor-arg index="1" value="QUEUE"></constructor-arg>
<constructor-arg index="2" value="testExchange"></constructor-arg>
<constructor-arg index="3" value="testQueue"></constructor-arg>
<constructor-arg index="4" value="#{arguments}"></constructor-arg>
</bean>
<!-- 用于接收消息的處理類 -->
<bean id="rmqConsumer" class="cn.test.spring.rabbitmq.RmqConsumer"></bean>
<bean id="messageListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="rmqConsumer" />
<property name="defaultListenerMethod" value="rmqProducerMessage"></property>
<property name="messageConverter" ref="serializerMessageConverter"></property>
</bean>
<!-- 用于消息的監(jiān)聽的容器類SimpleMessageListenerContainer,監(jiān)聽隊列 queues可以傳多個-->
<bean id="listenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<property name="queues" ref="queue"></property>
<property name="connectionFactory" ref="connectionFactory"></property>
<property name="messageListener" ref="messageListenerAdapter"></property>
</bean>
</span>
說明:
1.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer中的queues可以傳入多個隊列

2.org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter
有哪個消費者適配器來處理 ,參數(shù)defaultListenerMethod是默認調(diào)用方法來處理消息。
3.交換器和隊列的持久化在生產(chǎn)者有介紹過了。
4.org.springframework.amqp.core.Binding這個類的綁定,在SpringMVC配置文件中配置時,
DestinationType這個參數(shù)要注意點
源代碼:

第二步:處理消息
<span style="font-size:18px;">/**
* 消費者
*
*/
public class RmqConsumer
{
public void rmqProducerMessage(Object object){
RabbitMessage rabbitMessage=(RabbitMessage) object;
System.out.println(rabbitMessage.getExchange());
System.out.println(rabbitMessage.getRouteKey());
System.out.println(rabbitMessage.getParams().toString());
}
}</span>
在啟動過程中會報這樣的錯誤,可能是你的交換器和隊列沒配置好

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。如有錯誤或未考慮完全的地方,望不吝賜教。
- SpringBoot 整合 RabbitMQ 的使用方式(代碼示例)
- RabbitMQ在Spring Boot中的使用步驟
- Springboot RabbitMQ 消息隊列使用示例詳解
- Spring Boot中RabbitMQ自動配置的介紹、原理和使用方法
- 詳解SpringBoot中使用RabbitMQ的RPC功能
- SpringBoot+RabbitMq具體使用的幾種姿勢
- 詳解Spring Cloud Stream使用延遲消息實現(xiàn)定時任務(wù)(RabbitMQ)
- SpringBoot之RabbitMQ的使用方法
- spring boot使用RabbitMQ實現(xiàn)topic 主題
- Spring3?中?RabbitMQ?的使用與常見場景分析
相關(guān)文章
java集合PriorityQueue優(yōu)先級隊列方法實例
這篇文章主要為大家介紹了java集合PriorityQueue優(yōu)先級隊列方法實例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-12-12
java ArrayBlockingQueue的方法及缺點分析
在本篇內(nèi)容里小編給大家整理的是一篇關(guān)于java ArrayBlockingQueue的方法及缺點分析,對此有興趣的朋友們可以跟著學(xué)習(xí)下。2021-01-01
MybatisPlus使用queryWrapper如何實現(xiàn)復(fù)雜查詢
這篇文章主要介紹了MybatisPlus使用queryWrapper如何實現(xiàn)復(fù)雜查詢,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教。2022-01-01

