spring boot集成rabbitmq的實(shí)例教程
一、RabbitMQ的介紹
RabbitMQ是消息中間件的一種,消息中間件即分布式系統(tǒng)中完成消息的發(fā)送和接收的基礎(chǔ)軟件.這些軟件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿里巴巴公司的,現(xiàn)已經(jīng)轉(zhuǎn)讓給apache).
消息中間件的工作過程可以用生產(chǎn)者消費(fèi)者模型來表示.即,生產(chǎn)者不斷的向消息隊(duì)列發(fā)送信息,而消費(fèi)者從消息隊(duì)列中消費(fèi)信息.具體過程如下:

從上圖可看出,對于消息隊(duì)列來說,生產(chǎn)者,消息隊(duì)列,消費(fèi)者是最重要的三個(gè)概念,生產(chǎn)者發(fā)消息到消息隊(duì)列中去,消費(fèi)者監(jiān)聽指定的消息隊(duì)列,并且當(dāng)消息隊(duì)列收到消息之后,接收消息隊(duì)列傳來的消息,并且給予相應(yīng)的處理.消息隊(duì)列常用于分布式系統(tǒng)之間互相信息的傳遞.
對于RabbitMQ來說,除了這三個(gè)基本模塊以外,還添加了一個(gè)模塊,即交換機(jī)(Exchange).它使得生產(chǎn)者和消息隊(duì)列之間產(chǎn)生了隔離,生產(chǎn)者將消息發(fā)送給交換機(jī),而交換機(jī)則根據(jù)調(diào)度策略把相應(yīng)的消息轉(zhuǎn)發(fā)給對應(yīng)的消息隊(duì)列.那么RabitMQ的工作流程如下所示:

緊接著說一下交換機(jī).交換機(jī)的主要作用是接收相應(yīng)的消息并且綁定到指定的隊(duì)列.交換機(jī)有四種類型,分別為Direct,topic,headers,Fanout.
Direct是RabbitMQ默認(rèn)的交換機(jī)模式,也是最簡單的模式.即創(chuàng)建消息隊(duì)列的時(shí)候,指定一個(gè)BindingKey.當(dāng)發(fā)送者發(fā)送消息的時(shí)候,指定對應(yīng)的Key.當(dāng)Key和消息隊(duì)列的BindingKey一致的時(shí)候,消息將會被發(fā)送到該消息隊(duì)列中.
topic轉(zhuǎn)發(fā)信息主要是依據(jù)通配符,隊(duì)列和交換機(jī)的綁定主要是依據(jù)一種模式(通配符+字符串),而當(dāng)發(fā)送消息的時(shí)候,只有指定的Key和該模式相匹配的時(shí)候,消息才會被發(fā)送到該消息隊(duì)列中.
headers也是根據(jù)一個(gè)規(guī)則進(jìn)行匹配,在消息隊(duì)列和交換機(jī)綁定的時(shí)候會指定一組鍵值對規(guī)則,而發(fā)送消息的時(shí)候也會指定一組鍵值對規(guī)則,當(dāng)兩組鍵值對規(guī)則相匹配的時(shí)候,消息會被發(fā)送到匹配的消息隊(duì)列中.
Fanout是路由廣播的形式,將會把消息發(fā)給綁定它的全部隊(duì)列,即便設(shè)置了key,也會被忽略.
概念:
- 生產(chǎn)者 消息的產(chǎn)生方,負(fù)責(zé)將消息推送到消息隊(duì)列
- 消費(fèi)者 消息的最終接受方,負(fù)責(zé)監(jiān)聽隊(duì)列中的對應(yīng)消息,消費(fèi)消息
- 隊(duì)列 消息的寄存器,負(fù)責(zé)存放生產(chǎn)者發(fā)送的消息
- 交換機(jī) 負(fù)責(zé)根據(jù)一定規(guī)則分發(fā)生產(chǎn)者產(chǎn)生的消息
- 綁定 完成交換機(jī)和隊(duì)列之間的綁定
模式:
1、direct
直連模式,用于實(shí)例間的任務(wù)分發(fā)
2、topic
話題模式,通過可配置的規(guī)則分發(fā)給綁定在該exchange上的隊(duì)列
3、headers
適用規(guī)則復(fù)雜的分發(fā),用headers里的參數(shù)表達(dá)規(guī)則
4、fanout
分發(fā)給所有綁定到該exchange上的隊(duì)列,忽略routing key
安裝
單機(jī)版安裝很簡單,大概步驟如下:
# 安裝erlang包 yum install erlang # 安裝socat yum install socat # 安裝rabbit rpm -ivh rabbitmq-server-3.6.6-1.el6.noarch.rpm # 啟動服務(wù) rabbitmq-server start # 增加管理控制功能 rabbitmq-plugins enable rabbitmq_management # 增加用戶: sudo rabbitmqctl add_user root password rabbitmqctl set_user_tags root administrator rabbitmqctl set_permissions -p / root '.*' '.*' '.*'
集群安裝,可參考這篇文章:
以上就是rabbitmq的介紹,下面開始本文的正文:spring boot 集成rabbitmq ,本人在學(xué)習(xí)rabbitmq時(shí)發(fā)現(xiàn)網(wǎng)上很少有系統(tǒng)性介紹springboot和rabbitmq如何集成的,其他人總結(jié)的都片段化,所以結(jié)合個(gè)人調(diào)研過程,整理此篇文章。
二、springboot配置
廢話少說直接上代碼:
配置參數(shù)
application.yml:
spring: rabbitmq: addresses: 192.168.1.1:5672 username: username password: password publisher-confirms: true virtual-host: /
java config讀取參數(shù)
/**
* RabbitMq配置文件讀取類
*
* @author chenhf
* @create 2017-10-23 上午9:31
**/
@Configuration
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitMqConfig {
@Value("${spring.rabbitmq.addresses}")
private String addresses;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.publisher-confirms}")
private Boolean publisherConfirms;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
// 構(gòu)建mq實(shí)例工廠
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(addresses);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setPublisherConfirms(publisherConfirms);
connectionFactory.setVirtualHost(virtualHost);
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
return new RabbitAdmin(connectionFactory);
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate(){
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
}
三、rabbitmq生產(chǎn)者配置
主要配置了直連和話題模式,其中話題模式設(shè)置兩個(gè)隊(duì)列(queueTopicTest1、queueTopicTest2),此兩個(gè)隊(duì)列在和交換機(jī)綁定時(shí)分別設(shè)置不同的routingkey(.TEST.以及l(fā)azy.#)來驗(yàn)證匹配模式。
/**
* 用于配置交換機(jī)和隊(duì)列對應(yīng)關(guān)系
* 新增消息隊(duì)列應(yīng)該按照如下步驟
* 1、增加queue bean,參見queueXXXX方法
* 2、增加queue和exchange的binding
* @author chenhf
* @create 2017-10-23 上午10:33
**/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class RabbitMqExchangeConfig {
/** logger */
private static final Logger logger = LoggerFactory.getLogger(RabbitMqExchangeConfig.class);
/**
* @Author:chenhf
* @Description: 主題型交換機(jī)
* @Date:下午5:49 2017/10/23
* @param
* @return
*/
@Bean
TopicExchange contractTopicExchangeDurable(RabbitAdmin rabbitAdmin){
TopicExchange contractTopicExchange = new TopicExchange(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode());
rabbitAdmin.declareExchange(contractTopicExchange);
logger.debug("完成主題型交換機(jī)bean實(shí)例化");
return contractTopicExchange;
}
/**
* 直連型交換機(jī)
*/
@Bean
DirectExchange contractDirectExchange(RabbitAdmin rabbitAdmin) {
DirectExchange contractDirectExchange = new DirectExchange(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode());
rabbitAdmin.declareExchange(contractDirectExchange);
logger.debug("完成直連型交換機(jī)bean實(shí)例化");
return contractDirectExchange;
}
//在此可以定義隊(duì)列
@Bean
Queue queueTest(RabbitAdmin rabbitAdmin){
Queue queue = new Queue(RabbitMqEnum.QueueName.TESTQUEUE.getCode());
rabbitAdmin.declareQueue(queue);
logger.debug("測試隊(duì)列實(shí)例化完成");
return queue;
}
//topic 1
@Bean
Queue queueTopicTest1(RabbitAdmin rabbitAdmin){
Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST1.getCode());
rabbitAdmin.declareQueue(queue);
logger.debug("話題測試隊(duì)列1實(shí)例化完成");
return queue;
}
//topic 2
@Bean
Queue queueTopicTest2(RabbitAdmin rabbitAdmin){
Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST2.getCode());
rabbitAdmin.declareQueue(queue);
logger.debug("話題測試隊(duì)列2實(shí)例化完成");
return queue;
}
//在此處完成隊(duì)列和交換機(jī)綁定
@Bean
Binding bindingQueueTest(Queue queueTest,DirectExchange exchange,RabbitAdmin rabbitAdmin){
Binding binding = BindingBuilder.bind(queueTest).to(exchange).with(RabbitMqEnum.QueueEnum.TESTQUEUE.getCode());
rabbitAdmin.declareBinding(binding);
logger.debug("測試隊(duì)列與直連型交換機(jī)綁定完成");
return binding;
}
//topic binding1
@Bean
Binding bindingQueueTopicTest1(Queue queueTopicTest1,TopicExchange exchange,RabbitAdmin rabbitAdmin){
Binding binding = BindingBuilder.bind(queueTopicTest1).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE1.getCode());
rabbitAdmin.declareBinding(binding);
logger.debug("測試隊(duì)列與話題交換機(jī)1綁定完成");
return binding;
}
//topic binding2
@Bean
Binding bindingQueueTopicTest2(Queue queueTopicTest2,TopicExchange exchange,RabbitAdmin rabbitAdmin){
Binding binding = BindingBuilder.bind(queueTopicTest2).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE2.getCode());
rabbitAdmin.declareBinding(binding);
logger.debug("測試隊(duì)列與話題交換機(jī)2綁定完成");
return binding;
}
}
在這里用到枚舉類:RabbitMqEnum
/**
* 定義rabbitMq需要的常量
*
* @author chenhf
* @create 2017-10-23 下午4:07
**/
public class RabbitMqEnum {
/**
* @param
* @Author:chenhf
* @Description:定義數(shù)據(jù)交換方式
* @Date:下午4:08 2017/10/23
* @return
*/
public enum Exchange {
CONTRACT_FANOUT("CONTRACT_FANOUT", "消息分發(fā)"),
CONTRACT_TOPIC("CONTRACT_TOPIC", "消息訂閱"),
CONTRACT_DIRECT("CONTRACT_DIRECT", "點(diǎn)對點(diǎn)");
private String code;
private String name;
Exchange(String code, String name) {
this.code = code;
this.name = name;
}
public String getCode() {
return code;
}
public String getName() {
return name;
}
}
/**
* describe: 定義隊(duì)列名稱
* creat_user: chenhf
* creat_date: 2017/10/31
**/
public enum QueueName {
TESTQUEUE("TESTQUEUE", "測試隊(duì)列"),
TOPICTEST1("TOPICTEST1", "topic測試隊(duì)列"),
TOPICTEST2("TOPICTEST2", "topic測試隊(duì)列");
private String code;
private String name;
QueueName(String code, String name) {
this.code = code;
this.name = name;
}
public String getCode() {
return code;
}
public String getName() {
return name;
}
}
/**
* describe: 定義routing_key
* creat_user: chenhf
* creat_date: 2017/10/31
**/
public enum QueueEnum {
TESTQUEUE("TESTQUEUE1", "測試隊(duì)列key"),
TESTTOPICQUEUE1("*.TEST.*", "topic測試隊(duì)列key"),
TESTTOPICQUEUE2("lazy.#", "topic測試隊(duì)列key");
private String code;
private String name;
QueueEnum(String code, String name) {
this.code = code;
this.name = name;
}
public String getCode() {
return code;
}
public String getName() {
return name;
}
}
}
以上完成消息生產(chǎn)者的定義,下面封裝調(diào)用接口
測試時(shí)直接調(diào)用此工具類,testUser類需自己實(shí)現(xiàn)
rabbitMqSender.sendRabbitmqDirect("TESTQUEUE1",testUser);
rabbitMqSender.sendRabbitmqTopic("lazy.1.2",testUser);
rabbitMqSender.sendRabbitmqTopic("lazy.TEST.2",testUser);
/**
* rabbitmq發(fā)送消息工具類
*
* @author chenhf
* @create 2017-10-26 上午11:10
**/
@Component
public class RabbitMqSender implements RabbitTemplate.ConfirmCallback{
/** logger */
private static final Logger logger = LoggerFactory.getLogger(RabbitMqSender.class);
private RabbitTemplate rabbitTemplate;
@Autowired
public RabbitMqSender(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
this.rabbitTemplate.setConfirmCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
logger.info("confirm: " + correlationData.getId());
}
/**
* 發(fā)送到 指定routekey的指定queue
* @param routeKey
* @param obj
*/
public void sendRabbitmqDirect(String routeKey,Object obj) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
logger.info("send: " + correlationData.getId());
this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode(), routeKey , obj, correlationData);
}
/**
* 所有發(fā)送到Topic Exchange的消息被轉(zhuǎn)發(fā)到所有關(guān)心RouteKey中指定Topic的Queue上
* @param routeKey
* @param obj
*/
public void sendRabbitmqTopic(String routeKey,Object obj) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
logger.info("send: " + correlationData.getId());
this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode(), routeKey , obj, correlationData);
}
}
四、rabbitmq消費(fèi)者配置
springboot注解方式監(jiān)聽隊(duì)列,無法手動指定回調(diào),所以采用了實(shí)現(xiàn)ChannelAwareMessageListener接口,重寫onMessage來進(jìn)行手動回調(diào),詳見以下代碼,詳細(xì)介紹可以在spring的官網(wǎng)上找amqp相關(guān)章節(jié)閱讀
直連消費(fèi)者
通過設(shè)置TestUser的name來測試回調(diào),分別發(fā)兩條消息,一條UserName為1,一條為2,查看控制臺中隊(duì)列中消息是否被消費(fèi)
/**
* 消費(fèi)者配置
*
* @author chenhf
* @create 2017-10-30 下午3:14
**/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class ExampleAmqpConfiguration {
@Bean("testQueueContainer")
public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("TESTQUEUE");
container.setMessageListener(exampleListener());
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return container;
}
@Bean("testQueueListener")
public ChannelAwareMessageListener exampleListener() {
return new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());
//通過設(shè)置TestUser的name來測試回調(diào),分別發(fā)兩條消息,一條UserName為1,一條為2,查看控制臺中隊(duì)列中消息是否被消費(fèi)
if ("2".equals(testUser.getUserName())){
System.out.println(testUser.toString());
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
if ("1".equals(testUser.getUserName())){
System.out.println(testUser.toString());
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
}
}
};
}
}
topic消費(fèi)者1
/**
* 消費(fèi)者配置
*
* @author chenhf
* @create 2017-10-30 下午3:14
**/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class TopicAmqpConfiguration {
@Bean("topicTest1Container")
public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("TOPICTEST1");
container.setMessageListener(exampleListener1());
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return container;
}
@Bean("topicTest1Listener")
public ChannelAwareMessageListener exampleListener1(){
return new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());
System.out.println("TOPICTEST1:"+testUser.toString());
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
};
}
}
topic消費(fèi)者2
/**
* 消費(fèi)者配置
*
* @author chenhf
* @create 2017-10-30 下午3:14
**/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class TopicAmqpConfiguration2 {
@Bean("topicTest2Container")
public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("TOPICTEST2");
container.setMessageListener(exampleListener());
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return container;
}
@Bean("topicTest2Listener")
public ChannelAwareMessageListener exampleListener() {
return new ChannelAwareMessageListener() {
@Override
public void
總結(jié)
以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,如果有疑問大家可以留言交流,謝謝大家對腳本之家的支持。
- RabbitMQ 3.9.7 鏡像模式集群與Springboot 2.5.5 整合
- SpringAMQP消息隊(duì)列(SpringBoot集成RabbitMQ方式)
- 一文掌握Springboot集成RabbitMQ的方法
- springboot2.5.6集成RabbitMq實(shí)現(xiàn)Topic主題模式(推薦)
- Springboot集成RabbitMQ死信隊(duì)列的實(shí)現(xiàn)
- SpringBoot集成RabbitMQ的方法(死信隊(duì)列)
- springboot2.0集成rabbitmq的示例代碼
- Spring Boot系列教程之7步集成RabbitMQ的方法
- springboot集成rabbitMQ之對象傳輸?shù)姆椒?/a>
- 詳解spring boot集成RabbitMQ
- Spring Boot 3 集成 RabbitMQ 實(shí)踐指南(原理解析)
相關(guān)文章
jvm中指定時(shí)區(qū)信息user.timezone問題及解決方式
同一份程序使用時(shí)間LocalDateTime類型,在國內(nèi)和國外部署后,返回的時(shí)間信息前端使用出問題,這篇文章主要介紹了jvm中指定時(shí)區(qū)信息user.timezone問題及解決方法,需要的朋友可以參考下2023-02-02
Java中實(shí)現(xiàn)線程的三種方式及對比_動力節(jié)點(diǎn)Java學(xué)院整理
本文給大家分享了java實(shí)現(xiàn)線程的三種方式,非常不錯,具有參考借鑒價(jià)值,需要的朋友參考下吧2017-05-05
Java通過PowerMockito和Mokito進(jìn)行單元測試的實(shí)現(xiàn)
PowerMockito和Mockito都是Java語言中的測試框架,用于進(jìn)行單元測試和集成測試,本文就來詳細(xì)的介紹一下通過PowerMockito和Mokito進(jìn)行單元測試,感興趣的可以了解一下2023-08-08
詳解springboot-mysql-pagehelper分頁插件集成
這篇文章主要介紹了springboot-mysql-pagehelper分頁插件集成,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-07-07
SpringCloud實(shí)現(xiàn)服務(wù)調(diào)用feign與熔斷hystrix和網(wǎng)關(guān)gateway詳細(xì)分析
這篇文章主要介紹了SpringCloud實(shí)現(xiàn)服務(wù)調(diào)用feign與熔斷hystrix和網(wǎng)關(guān)gateway,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧2023-04-04
SpringBoot使用Druid數(shù)據(jù)源的配置方法
這篇文章主要介紹了SpringBoot使用Druid數(shù)據(jù)源的配置方法,文中代碼實(shí)例相結(jié)合的形式給大家介紹的非常詳細(xì),需要的朋友參考下吧2018-04-04
JPA?通過Specification如何實(shí)現(xiàn)復(fù)雜查詢
這篇文章主要介紹了JPA?通過Specification如何實(shí)現(xiàn)復(fù)雜查詢,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-11-11

