消息隊(duì)列 RabbitMQ 與 Spring 整合使用的實(shí)例代碼
一、什么是 RabbitMQ
RabbitMQ 是實(shí)現(xiàn) AMQP(高級(jí)消息隊(duì)列協(xié)議)的消息中間件的一種,最初起源于金融系統(tǒng),用于在分布式系統(tǒng)中存儲(chǔ)轉(zhuǎn)發(fā)消息,在易用性、擴(kuò)展性、高可用性等方面表現(xiàn)不俗。消息中間件主要用于組件之間的解耦,消息的發(fā)送者無需知道消息使用者的存在,反之亦然。
RabbitMQ 是由 Erlang 語言開發(fā),安裝 RabbitMQ 服務(wù)需要先安裝 Erlang 語言包。
二、如何與 Spring 集成
1. 我們都需要哪些 Jar 包?
拋開單獨(dú)使用 Spring 的包不說,引入 RabbitMQ 我們還需要兩個(gè):
<!-- RabbitMQ --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.5.1</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.4.5.RELEASE</version> </dependency>
2. 使用外部參數(shù)文件 application.properties:
mq.host=127.0.0.1 mq.username=queue mq.password=1234 mq.port=8001 # 統(tǒng)一XML配置中易變部分的命名 mq.queue=test_mq
易變指的是在實(shí)際項(xiàng)目中,如果測試與生產(chǎn)環(huán)境使用的同一個(gè) RabbitMQ 服務(wù)器。那我們在部署時(shí)直接修改 properties 文件的參數(shù)即可,防止測試與生產(chǎn)環(huán)境混淆。
修改 applicationContext.xml 文件,引入我們創(chuàng)建的 properties 文件
<context:property-placeholder location="classpath:application.properties"/> <util:properties id="appConfig" location="classpath:application.properties"></util:properties>
3. 連接 RabbitMQ 服務(wù)器
<!-- 連接配置 -->
<rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}"
password="${mq.password}" port="${mq.port}" />
<rabbit:admin connection-factory="connectionFactory"/>
4. 聲明一個(gè) RabbitMQ Template
<rabbit:template id="amqpTemplate" exchange="${mq.queue}_exchange" connection-factory="connectionFactory" />
5. 在 applicationContext.xml 中聲明一個(gè)交換機(jī),name 屬性需配置到 RabbitMQ 服務(wù)器。
<rabbit:topic-exchange name="${mq.queue}_exchange" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="test_queue" pattern="${mq.queue}_patt"/>
</rabbit:bindings>
</rabbit:topic-exchange>
交換機(jī)的四種模式:
- direct:轉(zhuǎn)發(fā)消息到 routigKey 指定的隊(duì)列。
- topic:按規(guī)則轉(zhuǎn)發(fā)消息(最靈活)。
- headers:(這個(gè)還沒有接觸到)
- fanout:轉(zhuǎn)發(fā)消息到所有綁定隊(duì)列
交換器的屬性:
- 持久性:如果啟用,交換器將會(huì)在server重啟前都有效。
- 自動(dòng)刪除:如果啟用,那么交換器將會(huì)在其綁定的隊(duì)列都被刪除掉之后自動(dòng)刪除掉自身。
- 惰性:如果沒有聲明交換器,那么在執(zhí)行到使用的時(shí)候會(huì)導(dǎo)致異常,并不會(huì)主動(dòng)聲明。
如果沒有隊(duì)列綁定在交換機(jī)上,則發(fā)送到該交換機(jī)上的消息會(huì)丟失。
一個(gè)交換機(jī)可以綁定多個(gè)隊(duì)列,一個(gè)隊(duì)列可以被多個(gè)交換機(jī)綁定。
topic 類型交換器通過模式匹配分析消息的 routing-key 屬性。它將 routing-key 和 binding-key 的字符串切分成單詞。這些單詞之間用點(diǎn)隔開。它同樣也會(huì)識(shí)別兩個(gè)通配符:#匹配0個(gè)或者多個(gè)單詞,*匹配一個(gè)單詞。例如,binding key:*.stock.#匹配 routing key:usd.stcok 和 eur.stock.db,但是不匹配 stock.nana。
因?yàn)榻粨Q器是命名實(shí)體,聲明一個(gè)已經(jīng)存在的交換器,但是試圖賦予不同類型是會(huì)導(dǎo)致錯(cuò)誤??蛻舳诵枰?jiǎng)h除這個(gè)已經(jīng)存在的交換器,然后重新聲明并且賦予新的類型。
6. 在 applicationContext.xml 中聲明一個(gè)隊(duì)列,name 屬性是需要配置到 RabbitMQ 服務(wù)器的。
<rabbit:queue id="test_queue" name="${mq.queue}_testQueue" durable="true" auto-delete="false" exclusive="false" />
- durable:是否持久化
- exclusive:僅創(chuàng)建者可以使用的私有隊(duì)列,斷開后自動(dòng)刪除
- auto-delete:當(dāng)所有消費(fèi)端連接斷開后,是否自動(dòng)刪除隊(duì)列
7. 創(chuàng)建生產(chǎn)者端
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @Description: 消息隊(duì)列發(fā)送者
* @Author:
* @CreateTime:
*/
@Service
public class Producer {
@Autowired
private AmqpTemplate amqpTemplate;
public void sendQueue(String exchange_key, String queue_key, Object object) {
// convertAndSend 將Java對象轉(zhuǎn)換為消息發(fā)送至匹配key的交換機(jī)中Exchange
amqpTemplate.convertAndSend(exchange_key, queue_key, object);
}
}
8. 在 applicationContext.xml 中配置監(jiān)聽及消費(fèi)者端
<!-- 消費(fèi)者 --> <bean name="rabbitmqService" class="com.enh.mq.RabbitmqService"></bean> <!-- 配置監(jiān)聽 --> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto"> <!-- queues 監(jiān)聽隊(duì)列,多個(gè)用逗號(hào)分隔 ref 監(jiān)聽器 --> <rabbit:listener queues="test_queue" ref="rabbitmqService"/> </rabbit:listener-container>
消費(fèi)者 Java 代碼:
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class RabbitmqService implements MessageListener {
public void onMessage(Message message) {
System.out.println("消息消費(fèi)者 = " + message.toString());
}
}
至此,我們的所有配置文件就寫完了,最終如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-3.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<!-- RabbitMQ start -->
<!-- 連接配置 -->
<rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}"
password="${mq.password}" port="${mq.port}" />
<rabbit:admin connection-factory="connectionFactory"/>
<!-- 消息隊(duì)列客戶端 -->
<rabbit:template id="amqpTemplate" exchange="${mq.queue}_exchange" connection-factory="connectionFactory" />
<!-- queue 隊(duì)列聲明 -->
<!--
durable 是否持久化
exclusive 僅創(chuàng)建者可以使用的私有隊(duì)列,斷開后自動(dòng)刪除
auto-delete 當(dāng)所有消費(fèi)端連接斷開后,是否自動(dòng)刪除隊(duì)列 -->
<rabbit:queue id="test_queue" name="${mq.queue}_testQueue" durable="true" auto-delete="false" exclusive="false" />
<!-- 交換機(jī)定義 -->
<!--
交換機(jī):一個(gè)交換機(jī)可以綁定多個(gè)隊(duì)列,一個(gè)隊(duì)列也可以綁定到多個(gè)交換機(jī)上。
如果沒有隊(duì)列綁定到交換機(jī)上,則發(fā)送到該交換機(jī)上的信息則會(huì)丟失。
direct模式:消息與一個(gè)特定的路由器完全匹配,才會(huì)轉(zhuǎn)發(fā)
topic模式:按規(guī)則轉(zhuǎn)發(fā)消息,最靈活
-->
<rabbit:topic-exchange name="${mq.queue}_exchange" durable="true" auto-delete="false">
<rabbit:bindings>
<!-- 設(shè)置消息Queue匹配的pattern (direct模式為key) -->
<rabbit:binding queue="test_queue" pattern="${mq.queue}_patt"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<bean name="rabbitmqService" class="com.enh.mq.RabbitmqService"></bean>
<!-- 配置監(jiān)聽 消費(fèi)者 -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
<!--
queues 監(jiān)聽隊(duì)列,多個(gè)用逗號(hào)分隔
ref 監(jiān)聽器 -->
<rabbit:listener queues="test_queue" ref="rabbitmqService"/>
</rabbit:listener-container>
</beans>
9. 如何使用 RabbitMQ 發(fā)送一個(gè)消息
@Autowired
private Producer producer;
@Value("#{appConfig['mq.queue']}")
private String queueId;
/**
* @Description: 消息隊(duì)列
* @Author:
* @CreateTime:
*/
@ResponseBody
@RequestMapping("/sendQueue")
public String testQueue() {
try {
Map<String, Object> map = new HashMap<String, Object>();
map.put("data", "hello rabbitmq");
producer.sendQueue(queueId + "_exchange", queueId + "_patt", map);
} catch (Exception e) {
e.printStackTrace();
}
return "發(fā)送完畢";
}
嗯。這個(gè)測試是 SpringMVC 框架。
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
- SpringBoot使用RabbitMQ延時(shí)隊(duì)列(小白必備)
- SpringBoot集成RabbitMQ的方法(死信隊(duì)列)
- springboot實(shí)現(xiàn)rabbitmq的隊(duì)列初始化和綁定
- Spring Boot與RabbitMQ結(jié)合實(shí)現(xiàn)延遲隊(duì)列的示例
- rabbitmq結(jié)合spring實(shí)現(xiàn)消息隊(duì)列優(yōu)先級(jí)的方法
- Spring學(xué)習(xí)筆記3之消息隊(duì)列(rabbitmq)發(fā)送郵件功能
- Spring項(xiàng)目集成RabbitMQ及自動(dòng)創(chuàng)建隊(duì)列
相關(guān)文章
Spring Cloud Alibaba和Dubbo融合實(shí)現(xiàn)
這篇文章主要介紹了Spring Cloud Alibaba和Dubbo融合實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-04-04
java實(shí)現(xiàn)文件讀寫與壓縮實(shí)例
這篇文章主要介紹了java實(shí)現(xiàn)文件讀寫與壓縮實(shí)例,有助于讀者加深對文件操作的理解,需要的朋友可以參考下2014-07-07
java命令調(diào)用虛擬機(jī)方法總結(jié)
在本篇文章里我們給大家整理了關(guān)于java中的java命令如何調(diào)用虛擬機(jī)的方法和具體步驟,需要的朋友們跟著操作下。2019-05-05
HashMap紅黑樹入門(實(shí)現(xiàn)一個(gè)簡單的紅黑樹)
紅黑樹(Red Black Tree) 是一種自平衡二叉查找樹,是在計(jì)算機(jī)科學(xué)中用到的一種數(shù)據(jù)結(jié)構(gòu),典型的用途是實(shí)現(xiàn)關(guān)聯(lián)數(shù)組。 紅黑樹發(fā)明時(shí)被稱為平衡二叉B樹,后來修改為如今的“紅黑樹”2021-06-06
Java語言實(shí)現(xiàn)數(shù)據(jù)結(jié)構(gòu)棧代碼詳解
這篇文章主要介紹了Java語言實(shí)現(xiàn)數(shù)據(jù)結(jié)構(gòu)棧代碼詳解,簡單介紹了棧的概念,然后分享了線性棧和鏈?zhǔn)綏5腏ava代碼,具有一定參考價(jià)值,需要的朋友可以了解下。2017-11-11

