SpringAMQP消息隊列(SpringBoot集成RabbitMQ方式)
一、初始配置
1、導入maven坐標
<!--rabbitmq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2、yml配置
spring: rabbitmq: host: 你的rabbitmq的ip port: 5672 username: guest password: guest
二、基本消息隊列
1、創(chuàng)建隊列
訪問接口:http://localhost:15672,賬號密碼都為guest
進入后左下角有Add queue添加隊列,我已添加隊列為MqTest1
2、發(fā)布消息
@SpringBootTest class RabbitMQDemoPublishApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test void contextLoads() { String queue="MqTest1"; String message="message1"; rabbitTemplate.convertAndSend(queue,message); } }
此時可以看到隊列有一個消息
3、接受消息
package com.rabbitmqdemoconsumer.rabbitmq; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class SpringRabbitLeistener { @RabbitListener(queues = "MqTest1") public void listenSimpleQueueMessage(String msg){ System.out.println("接收到的消息:"+msg); } }
此時控制臺輸出接收到的消息
三、工作消息隊列(Work Queue)
可以提高消息處理速度,避免隊列消息堆積
1、發(fā)布消息
@SpringBootTest class RabbitMQDemoPublishApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test void contextLoads() { String queue="MqTest1"; String message="message1"; for (int i=0;i<10;i++){ rabbitTemplate.convertAndSend(queue,message); } } }
此時隊列有10條消息
2、接受消息
package com.rabbitmqdemoconsumer.rabbitmq; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class SpringRabbitLeistener { @RabbitListener(queues = "MqTest1") public void listenSimpleQueueMessage1(String msg){ System.out.println("consume1接收到的消息:"+msg); } @RabbitListener(queues = "MqTest1") public void listenSimpleQueueMessage2(String msg){ System.out.println("consume2接收到的消息:"+msg); } }
3、控制臺輸出結果
consume1接收到的消息:message1
consume2接收到的消息:message1
consume1接收到的消息:message1
consume2接收到的消息:message1
consume1接收到的消息:message1
consume2接收到的消息:message1
consume1接收到的消息:message1
consume2接收到的消息:message1
consume1接收到的消息:message1
consume2接收到的消息:message1
4、消息預取問題
但是此時有一個問題就是消息預取,比如隊列有10條消息,兩個消費者各自直接先預取5個消息,如果一個消費者接受消息的速度慢,一個快,就會導致一個消費者已經完成工作,另一個還在慢慢處理,會造成消息堆積消費者身上,要解決這個問題需要在yml文件配置相關配置
rabbitmq: host: 43.140.244.236 port: 5672 username: guest password: guest virtual-host: / listener: simple: prefetch: 1 #每次只能取一個,處理完才能取下一個消息
這樣可以避免消息預取導致堆積
四、發(fā)布訂閱模式
exchange是交換機,負責消息路由,但不存儲消息,路由失敗則消息丟失
五、發(fā)布訂閱模式之廣播模式(Fanout)
1、Fanout配置類(@Bean聲明)
package com.rabbitmqdemoconsumer.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanountConfig { //交換機聲明 @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("FanountExchange"); } //聲明隊列1 @Bean public Queue Fanount_Qeueue1(){ return new Queue("Fanount_Qeueue1"); } //聲明隊列2 @Bean public Queue Fanount_Qeueue2(){ return new Queue("Fanount_Qeueue2"); } //綁定交換機和隊列 @Bean public Binding bindingFanount_Qeueue1(Queue Fanount_Qeueue1,FanoutExchange fanoutExchange){ return BindingBuilder.bind(Fanount_Qeueue1).to(fanoutExchange); } @Bean public Binding bindingFanount_Qeueue2(Queue Fanount_Qeueue2,FanoutExchange fanoutExchange){ return BindingBuilder.bind(Fanount_Qeueue2).to(fanoutExchange); } }
可以看到聲明的隊列
已經聲明的交換機(第一個)
綁定關系
2、發(fā)送消息
首先發(fā)送10條消息,經過交換機轉發(fā)到隊列
@SpringBootTest class RabbitMQDemoPublishApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test void contextLoads2() { String exchange="FanountExchange"; String message="message"; for (int i=0;i<10;i++){ rabbitTemplate.convertAndSend(exchange,"",message); } } }
此時可以看到兩個隊列各自有十條消息
3、接受消息
//監(jiān)聽交換機Fanount_Qeueue1 @RabbitListener(queues = "Fanount_Qeueue1") public void listenFanountQeueue1(String msg){ System.out.println("Fanount_Qeueue1接收到的消息:"+msg); } //監(jiān)聽交換機Fanount_Qeueue2 @RabbitListener(queues = "Fanount_Qeueue2") public void listenFanountQeueue2(String msg){ System.out.println("Fanount_Qeueue2接收到的消息:"+msg); }
控制臺結果如下(共發(fā)送20條,每個隊列10條)
Fanount_Qeueue1接收到的消息:message
Fanount_Qeueue1接收到的消息:message
Fanount_Qeueue1接收到的消息:message
Fanount_Qeueue1接收到的消息:message
Fanount_Qeueue2接收到的消息:message
Fanount_Qeueue1接收到的消息:message
Fanount_Qeueue2接收到的消息:message
Fanount_Qeueue1接收到的消息:message
Fanount_Qeueue2接收到的消息:message
Fanount_Qeueue1接收到的消息:message
Fanount_Qeueue2接收到的消息:message
Fanount_Qeueue1接收到的消息:message
Fanount_Qeueue2接收到的消息:message
Fanount_Qeueue1接收到的消息:message
Fanount_Qeueue2接收到的消息:message
Fanount_Qeueue2接收到的消息:message
Fanount_Qeueue2接收到的消息:message
Fanount_Qeueue2接收到的消息:message
六、發(fā)布訂閱模式之路由模式(Direct)
會將消息根據規(guī)則路由到指定的隊列
1、聲明(基于@RabbitListener聲明)
package com.rabbitmqdemoconsumer.rabbitmq; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class SpringRabbitLeistener { /** * 綁定交換機和隊列,并為key賦值 * @param msg */ @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "DirectQueue1"), exchange = @Exchange(name = "DirectExchange",type = ExchangeTypes.DIRECT), key = {"red","blue"} )) public void listenDirectQueue1(String msg){ System.out.println("listenDirectQueue1接收到的消息:"+msg); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "DirectQueue2"), exchange = @Exchange(name = "DirectExchange",type = ExchangeTypes.DIRECT), key = {"red","yellow"} )) public void listenDirectQueue2(String msg){ System.out.println("listenDirectQueue2接收到的消息:"+msg); } }
此時可以看到聲明的隊列
聲明的交換機(第一個)
綁定關系
2、發(fā)送給blue
發(fā)送消息
@SpringBootTest class RabbitMQDemoPublishApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test void contextLoads2() { String exchange="DirectExchange"; String message="HelloWorld"; for (int i=0;i<10;i++){ rabbitTemplate.convertAndSend(exchange,"blue",message); } } }
接收消息
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
3、發(fā)送給red
發(fā)送消息
@SpringBootTest class RabbitMQDemoPublishApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test void contextLoads2() { String exchange="DirectExchange"; String message="HelloWorld"; for (int i=0;i<10;i++){ rabbitTemplate.convertAndSend(exchange,"blue",message); } } }
接收消息
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
七、發(fā)布訂閱模式之廣播模式(Topic)
Queue與Exchange指定BindingKey可以使用通配符:
- #:代指0個或多個單詞
- *:代指一個單詞
比如:
- bindingkey: china.# ->中國的所有消息
- bindingkey: #.weather ->所以國家的天氣
1、聲明
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "TopicQueue1"), exchange = @Exchange(name = "TopicExchange",type = ExchangeTypes.TOPIC), key = {"china.#"} )) public void listenTopicQueue1(String msg){ System.out.println("listenTopicQueue1接收到的消息:"+msg); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "TopicQueue2"), exchange = @Exchange(name = "TopicExchange",type = ExchangeTypes.TOPIC), key = {"#.news"} )) public void listenTopicQueue2(String msg){ System.out.println("listenTopicQueue2接收到的消息:"+msg); }
隊列
交換機(第四個)
綁定關系
2、發(fā)送消息(測試1)
package com.rabbitmqdemo; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class RabbitMQDemoPublishApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test void contextLoads2() { String exchange="TopicExchange"; String message="HelloWorld"; for (int i=0;i<10;i++){ rabbitTemplate.convertAndSend(exchange,"china.news",message); } } }
接收消息
TopicQueue2接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue2接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue2接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue2接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue2接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue2接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue2接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue2接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue2接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue2接收到的消息:HelloWorld
3、發(fā)送消息(測試2)
發(fā)送消息
package com.rabbitmqdemo; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class RabbitMQDemoPublishApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test void contextLoads2() { String exchange="TopicExchange"; String message="HelloWorld"; for (int i=0;i<10;i++){ rabbitTemplate.convertAndSend(exchange,"china.weather",message); } } }
接收消息
TopicQueue1接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
總結
以上為個人經驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
Java中Queue的poll()和remove()區(qū)別詳解
這篇文章主要介紹了Java中Queue的poll()和remove()區(qū)別詳解,Queue接口提供了許多方法,其中poll()和remove()是兩個常用的方法,它們的區(qū)別在于,當隊列為空時,poll()方法返回null,而remove()方法會拋出,需要的朋友可以參考下2023-07-07關于SpringBoot的@ConfigurationProperties注解和松散綁定、數據校驗
這篇文章主要介紹了關于SpringBoot的@ConfigurationProperties注解和松散綁定、數據校驗,@ConfigurationProperties主要作用就是將prefix屬性指定的前綴配置項的值綁定到這個JavaBean上?,通過指定的前綴,來綁定配置文件中的配置,需要的朋友可以參考下2023-05-05