Springboot 配置RabbitMQ文檔的方法步驟
簡(jiǎn)介
RabbitMQ是實(shí)現(xiàn)AMQP(高級(jí)消息隊(duì)列協(xié)議)的消息中間件的一種,用于在分布式系統(tǒng)中存儲(chǔ)轉(zhuǎn)發(fā)消息,在易用性、擴(kuò)展性、高可用性等方面表現(xiàn)不俗
概念:
- 生產(chǎn)者 消息的產(chǎn)生方,負(fù)責(zé)將消息推送到消息隊(duì)列
- 消費(fèi)者 消息的最終接受方,負(fù)責(zé)監(jiān)聽(tīng)隊(duì)列中的對(duì)應(yīng)消息,消費(fèi)消息
- 隊(duì)列 消息的寄存器,負(fù)責(zé)存放生產(chǎn)者發(fā)送的消息
- 交換機(jī) 負(fù)責(zé)根據(jù)一定規(guī)則分發(fā)生產(chǎn)者產(chǎn)生的消息
- 綁定 完成交換機(jī)和隊(duì)列之間的綁定
模式:
- direct:直連模式,用于實(shí)例間的任務(wù)分發(fā)
- topic:話(huà)題模式,通過(guò)可配置的規(guī)則分發(fā)給綁定在該exchange上的隊(duì)列
- headers:適用規(guī)則復(fù)雜的分發(fā),用headers里的參數(shù)表達(dá)規(guī)則
- fanout:分發(fā)給所有綁定到該exchange上的隊(duì)列,忽略routing key
SpringBoot集成RabbitMQ
一、引入maven依賴(lài)
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>1.5.2.RELEASE</version> </dependency>
二、配置application.properties
# rabbitmq spring.rabbitmq.host = dev-mq.a.pa.com spring.rabbitmq.port = 5672 spring.rabbitmq.username = admin spring.rabbitmq.password = admin spring.rabbitmq.virtualHost = /message-test/
三、編寫(xiě)AmqpConfiguration配置文件
package message.test.configuration; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.amqp.RabbitProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class AmqpConfiguration { /** * 消息編碼 */ public static final String MESSAGE_ENCODING = "UTF-8"; public static final String EXCHANGE_ISSUE = "exchange_message_issue"; public static final String QUEUE_ISSUE_USER = "queue_message_issue_user"; public static final String QUEUE_ISSUE_ALL_USER = "queue_message_issue_all_user"; public static final String QUEUE_ISSUE_ALL_DEVICE = "queue_message_issue_all_device"; public static final String QUEUE_ISSUE_CITY = "queue_message_issue_city"; public static final String ROUTING_KEY_ISSUE_USER = "routing_key_message_issue_user"; public static final String ROUTING_KEY_ISSUE_ALL_USER = "routing_key_message_issue_all_user"; public static final String ROUTING_KEY_ISSUE_ALL_DEVICE = "routing_key_message_issue_all_device"; public static final String ROUTING_KEY_ISSUE_CITY = "routing_key_message_issue_city"; public static final String EXCHANGE_PUSH = "exchange_message_push"; public static final String QUEUE_PUSH_RESULT = "queue_message_push_result"; @Autowired private RabbitProperties rabbitProperties; @Bean public Queue issueUserQueue() { return new Queue(QUEUE_ISSUE_USER); } @Bean public Queue issueAllUserQueue() { return new Queue(QUEUE_ISSUE_ALL_USER); } @Bean public Queue issueAllDeviceQueue() { return new Queue(QUEUE_ISSUE_ALL_DEVICE); } @Bean public Queue issueCityQueue() { return new Queue(QUEUE_ISSUE_CITY); } @Bean public Queue pushResultQueue() { return new Queue(QUEUE_PUSH_RESULT); } @Bean public DirectExchange issueExchange() { return new DirectExchange(EXCHANGE_ISSUE); } @Bean public DirectExchange pushExchange() { // 參數(shù)1:隊(duì)列 // 參數(shù)2:是否持久化 // 參數(shù)3:是否自動(dòng)刪除 return new DirectExchange(EXCHANGE_PUSH, true, true); } @Bean public Binding issueUserQueueBinding(@Qualifier("issueUserQueue") Queue queue, @Qualifier("issueExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_USER); } @Bean public Binding issueAllUserQueueBinding(@Qualifier("issueAllUserQueue") Queue queue, @Qualifier("issueExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_ALL_USER); } @Bean public Binding issueAllDeviceQueueBinding(@Qualifier("issueAllDeviceQueue") Queue queue, @Qualifier("issueExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_ALL_DEVICE); } @Bean public Binding issueCityQueueBinding(@Qualifier("issueCityQueue") Queue queue, @Qualifier("issueExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_CITY); } @Bean public Binding pushResultQueueBinding(@Qualifier("pushResultQueue") Queue queue, @Qualifier("pushExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).withQueueName(); } @Bean public ConnectionFactory defaultConnectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(rabbitProperties.getHost()); connectionFactory.setPort(rabbitProperties.getPort()); connectionFactory.setUsername(rabbitProperties.getUsername()); connectionFactory.setPassword(rabbitProperties.getPassword()); connectionFactory.setVirtualHost(rabbitProperties.getVirtualHost()); return connectionFactory; } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( @Qualifier("defaultConnectionFactory") ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } @Bean public AmqpTemplate rabbitTemplate(@Qualifier("defaultConnectionFactory") ConnectionFactory connectionFactory) { return new RabbitTemplate(connectionFactory); } }
三、編寫(xiě)生產(chǎn)者
body = JSON.toJSONString(issueMessage).getBytes(AmqpConfiguration.MESSAGE_ENCODING); rabbitTemplate.convertAndSend(AmqpConfiguration.EXCHANGE_ISSUE, AmqpConfiguration.ROUTING_KEY_ISSUE_USER, body);
四、編寫(xiě)消費(fèi)者
@RabbitListener(queues = AmqpConfiguration.QUEUE_PUSH_RESULT) public void handlePushResult(@Payload byte[] data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { }
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
SpringBoot中實(shí)現(xiàn)登錄攔截器的代碼實(shí)例
這篇文章主要介紹了SpringBoot中實(shí)現(xiàn)登錄攔截器的代碼實(shí)例,對(duì)于管理系統(tǒng)或其他需要用戶(hù)登錄的系統(tǒng),登錄驗(yàn)證都是必不可少的環(huán)節(jié),在SpringBoot開(kāi)發(fā)的項(xiàng)目中,通過(guò)實(shí)現(xiàn)攔截器來(lái)實(shí)現(xiàn)用戶(hù)登錄攔截并驗(yàn)證,需要的朋友可以參考下2023-10-10java通過(guò)AES生成公鑰加密數(shù)據(jù)ECC加密公鑰
這篇文章主要為大家介紹了java通過(guò)AES生成公鑰加密數(shù)據(jù)ECC加密公鑰實(shí)現(xiàn)案例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-12-12Spring框架實(shí)現(xiàn)AOP添加日志記錄功能過(guò)程詳解
這篇文章主要介紹了Spring框架實(shí)現(xiàn)AOP添加日志記錄功能過(guò)程詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-12-12springcloud結(jié)合bytetcc實(shí)現(xiàn)數(shù)據(jù)強(qiáng)一致性原理解析
這篇文章主要介紹了springcloud結(jié)合bytetcc實(shí)現(xiàn)數(shù)據(jù)強(qiáng)一致性原理解析,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-03-03JAVA 統(tǒng)計(jì)字符串中中文,英文,數(shù)字,空格,特殊字符的個(gè)數(shù)
這篇文章主要介紹了JAVA 統(tǒng)計(jì)字符串中中文,英文,數(shù)字,空格,特殊字符的個(gè)數(shù) ,本文通過(guò)一段代碼給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2019-06-06