springBoot整合rabbitMQ的方法詳解
引入pom
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.4.5</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.wxy</groupId> <artifactId>test-rabbitmq</artifactId> <version>0.0.1-SNAPSHOT</version> <name>test-rabbitmq</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
測試
package com.wxy.rabbit; import org.junit.jupiter.api.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @RunWith(SpringRunner.class) @SpringBootTest class TestRabbitmqApplicationTests { @Autowired RabbitTemplate rabbitTemplate; @Test public void sendmessage() { String exchange = "exchange.direct"; String routingkey = "wxy.news"; //object為消息發(fā)送的消息體,可以自動實現(xiàn)消息的序列化 Map<String,Object> msg = new HashMap<>(); msg.put("msg","使用mq發(fā)送消息"); msg.put("data", Arrays.asList("helloword",123456,true)); rabbitTemplate.convertAndSend(exchange, routingkey,msg); } @Test public void receive() { Object object = rabbitTemplate.receiveAndConvert("wxy.news"); System.out.println(object); } }
默認(rèn)消息轉(zhuǎn)換類型
###############在RabbitTemplate默認(rèn)使用的是SimpleMessageConverter####### private MessageConverter messageConverter = new SimpleMessageConverter(); ###############源碼:使用SerializationUtils.deserialize############### public Object fromMessage(Message message) throws MessageConversionException { Object content = null; MessageProperties properties = message.getMessageProperties(); if (properties != null) { String contentType = properties.getContentType(); if (contentType != null && contentType.startsWith("text")) { String encoding = properties.getContentEncoding(); if (encoding == null) { encoding = this.defaultCharset; } try { content = new String(message.getBody(), encoding); } catch (UnsupportedEncodingException var8) { throw new MessageConversionException("failed to convert text-based Message content", var8); } } else if (contentType != null && contentType.equals("application/x-java-serialized-object")) { try { content = SerializationUtils.deserialize(this.createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl)); } catch (IllegalArgumentException | IllegalStateException | IOException var7) { throw new MessageConversionException("failed to convert serialized Message content", var7); } } }
將默認(rèn)消息類型轉(zhuǎn)化成自定義json格式
第一:上面SimpleMessageConverter是org.springframework.amqp.support.converter包下MessageConverter接口的一個實現(xiàn)類 第二:查看該接口MessageConverter下支持哪些消息轉(zhuǎn)化 ctrl+H查看該接口中的所有實現(xiàn)類 第三步:找到j(luò)son相關(guān)的convert
RabbitTemplateConfigurer中定義if (this.messageConverter != null)則使用配置的messageConverter
################## if (this.messageConverter != null)則使用配置的messageConverter public void configure(RabbitTemplate template, ConnectionFactory connectionFactory) { PropertyMapper map = PropertyMapper.get(); template.setConnectionFactory(connectionFactory); if (this.messageConverter != null) { template.setMessageConverter(this.messageConverter); } template.setMandatory(this.determineMandatoryFlag()); Template templateProperties = this.rabbitProperties.getTemplate(); if (templateProperties.getRetry().isEnabled()) { template.setRetryTemplate((new RetryTemplateFactory(this.retryTemplateCustomizers)).createRetryTemplate(templateProperties.getRetry(), Target.SENDER)); } templateProperties.getClass(); map.from(templateProperties::getReceiveTimeout).whenNonNull().as(Duration::toMillis).to(template::setReceiveTimeout); templateProperties.getClass(); map.from(templateProperties::getReplyTimeout).whenNonNull().as(Duration::toMillis).to(template::setReplyTimeout); templateProperties.getClass(); map.from(templateProperties::getExchange).to(template::setExchange); templateProperties.getClass(); map.from(templateProperties::getRoutingKey).to(template::setRoutingKey); templateProperties.getClass(); map.from(templateProperties::getDefaultReceiveQueue).whenNonNull().to(template::setDefaultReceiveQueue); }
配置一個messageConversert(org.springframework.amqp.support.converter包中的)
package com.wxy.rabbit.config; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MessageConverConfig { @Bean public MessageConverter getMessageConvert(){ return new Jackson2JsonMessageConverter(); } }
再次發(fā)送消息體json格式
使用注解@RabbitListener監(jiān)聽
監(jiān)聽多個隊列
@RabbitListener(queues = {"wxy.news","wxy.emps"})
監(jiān)聽單個隊列
@RabbitListener(queues = "wxy.news")
package com.wxy.rabbit.service; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service public class RabbitMqReceiveService { @RabbitListener(queues = {"wxy.news","wxy.emps"}) public void getReceiveMessage(){ System.out.println("監(jiān)聽到性的消息"); } @RabbitListener(queues = {"wxy.news","wxy.emps"}) public void getReceiveMessageHead(Message message){ System.out.println(message.getBody()); System.out.println( message.getMessageProperties()); } }
在程序中創(chuàng)建隊列,交換器,并進行綁定
@Test public void create() { //創(chuàng)建一個點對點的交換器 amqpAdmin.declareExchange(new DirectExchange("amqpexchange.direct")); //創(chuàng)建一個隊列 // String name,:隊列名稱 // boolean durable :持久化 amqpAdmin.declareQueue(new Queue("amqp.queue",true)); //綁定 //String destination, Binding.DestinationType destinationType, String exchange, String routingKey // @Nullable Map<String, Object> arguments amqpAdmin.declareBinding(new Binding("amqp.queue", Binding.DestinationType.QUEUE, "amqpexchange.direct","wxy.news", null)); }
到此這篇關(guān)于springBoot整合rabbitMQ的方法詳解的文章就介紹到這了,更多相關(guān)springBoot整合rabbitMQ內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java中基于maven實現(xiàn)zxing二維碼功能
這篇文章主要介紹了Java中基于maven實現(xiàn)zxing二維碼功能,非常不錯,具有參考借鑒價值,需要的朋友可以參考下2017-02-02