Springboot RabbitMQ 消息隊列使用示例詳解
更新時間:2024年06月05日 10:12:34 作者:bj_wasin
本文通過示例代碼介紹了Springboot RabbitMQ 消息隊列使用,對大家的學習或工作具有一定的參考借鑒價值,感興趣的朋友跟隨小編一起看看吧
一、概念介紹:
RabbitMQ中幾個重要的概念介紹:
- Channels:信道,多路復用連接中的一條獨立的雙向數(shù)據(jù)流通道。信道是建立在真實的 TCP 連接內(nèi)地虛擬連接,AMQP 命令都是通過信道發(fā)出去的,不管是發(fā)布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對于操作系統(tǒng)來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復用一條 TCP 連接。
- Exchanges:交換器,用來接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務器中的隊列。
- 交換機類型主要有以下幾種:
- Direct Exchange(直連交換機):這種類型的交換機根據(jù)消息的Routing Key(路由鍵)進行精確匹配,只有綁定了相同路由鍵的隊列才會收到消息。適用于點對點的消息傳遞場景。
- Fanout Exchange(扇形交換機):這種類型的交換機采用廣播模式,它會將消息發(fā)送給所有綁定到該交換機的隊列,不管消息的路由鍵是什么。適用于消息需要被多個消費者處理的場景。
- Topic Exchange(主題交換機):這種類型的交換機支持基于模式匹配的路由鍵,可以使用通配符*(匹配一個單詞)和#(匹配零個或多個單詞)進行匹配。適用于實現(xiàn)更復雜的消息路由邏輯。
- Headers Exchange(頭交換機):這種類型的交換機不處理路由鍵,而是根據(jù)發(fā)送的消息內(nèi)容中的headers屬性進行匹配。適用于需要在消息頭中攜帶額外信息的場景。
- Queues:消息隊列,用來保存消息直到發(fā)送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列里面,等待消費者連接到這個隊列將其取走。
二、引入依賴:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
三、添加配置信息
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest listener: simple: acknowledge-mode: manual # 手動提交
四、Direct Exchange(直連交換機)模式
1、新建配置文件 RabbitDirectConfig類
package com.example.direct; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author wasin * @version 1.0 * @date 2024/6/4 * @description: 直連交換機--這種類型的交換機根據(jù)消息的Routing Key(路由鍵)進行精確匹配, * 只有綁定了相同路由鍵的隊列才會收到消息。適用于點對點的消息傳遞場景 */ @Configuration public class RabbitDirectConfig { /** * 隊列名稱 */ public static final String QUEUE_MESSAGE ="QUEUE_MESSAGE"; public static final String QUEUE_USER ="QUEUE_USER"; /** * 交換機 */ public static final String EXCHANGE="EXCHANGE_01"; /** * 路由 */ public static final String ROUTING_KEY="ROUTING_KEY_01"; @Bean public Queue queue01() { return new Queue(QUEUE_MESSAGE, //隊列名稱 true, //是否持久化 false, //是否排他 false //是否自動刪除 ); } @Bean public Queue queue02() { return new Queue(QUEUE_USER, //隊列名稱 true, //是否持久化 false, //是否排他 false //是否自動刪除 ); } @Bean public DirectExchange exchange01() { return new DirectExchange(EXCHANGE, true, //是否持久化 false //是否排他 ); } @Bean public Binding demoBinding() { return BindingBuilder.bind(queue01()).to(exchange01()).with(ROUTING_KEY); } @Bean public Binding demoBinding2() { return BindingBuilder.bind(queue02()).to(exchange01()).with(ROUTING_KEY); } }
2、添加消息生產(chǎn)者 Producer類
package com.example.direct; import com.example.entity.User; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * @author wasin * @version 1.0 * @date 2024/6/4 * @description: */ @Component public class Producer { @Resource RabbitTemplate rabbitTemplate; public void sendMessageByExchangeANdRoute(String message){ rabbitTemplate.convertAndSend(RabbitDirectConfig.EXCHANGE, RabbitDirectConfig.ROUTING_KEY,message); } /** * 默認交換器,隱式地綁定到每個隊列,路由鍵等于隊列名稱。 * @param message */ public void sendMessageByQueue(String message){ rabbitTemplate.convertAndSend(RabbitDirectConfig.QUEUE_MESSAGE,message); } public void sendMessage(User user){ rabbitTemplate.convertAndSend(RabbitDirectConfig.QUEUE_USER,user); } }
3、添加消息消費者
package com.example.direct; import com.example.entity.User; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @author wasin * @version 1.0 * @date 2024/6/4 * @description: */ @Component public class Consumer { @RabbitListener(queues = RabbitDirectConfig.QUEUE_USER) public void onMessage(User user){ System.out.println("收到的實體bean消息:"+user); } @RabbitListener(queues = RabbitDirectConfig.QUEUE_MESSAGE) public void onMessage2(String message){ System.out.println("收到的字符串消息:"+message); } }
4、 測試
package com.example; import com.example.entity.User; import com.example.direct.Producer; import com.example.fanout.FanoutProducer; import com.example.topic.TopicProducer; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; import javax.annotation.Resource; @SpringBootTest class SpringbootRabbitMqApplicationTests { @Resource Producer producer; @Test public void sendMessage() throws InterruptedException { producer.sendMessageByQueue("哈哈"); producer.sendMessage(new User().setAge(10).setName("wasin")); } }
五、Topic Exchange(主題交換機)模式
1、新建RabbitTopicConfig類
package com.example.topic; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author wasin * @version 1.0 * @date 2024/6/4 * @description: 主題交換機--這種類型的交換機支持基于模式匹配的路由鍵, * 可以使用通配符*(匹配一個單詞)和#(匹配零個或多個單詞)進行匹配。適用于實現(xiàn)更復雜的消息路由邏輯。 */ @Configuration public class RabbitTopicConfig { /** * 交換機 */ public static final String EXCHANGE = "EXCHANGE_TOPIC1"; /** * 隊列名稱 */ public static final String QUEUE_TOPIC1 = "QUEUE_TOPIC"; /** * 路由 * "*" 與 "#",用于做模糊匹配。其中 "*" 用于匹配一個單詞,"#" 用于匹配多個單詞(可以是零個) * 可以匹配 aa.wasin.aa.bb wasin.aa.bb wasin.aa .... * aa.bb.wasin.cc 無法匹配 */ public static final String ROUTING_KEY1 = "*.wasin.#"; @Bean public Queue queue() { return new Queue(QUEUE_TOPIC1, //隊列名稱 true, //是否持久化 false, //是否排他 false //是否自動刪除 ); } @Bean public TopicExchange exchange() { return new TopicExchange(EXCHANGE, true, //是否持久化 false //是否排他 ); } @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(exchange()).with(ROUTING_KEY1); } }
2、新建 消息生產(chǎn)者和發(fā)送者
TopicProducer類
package com.example.topic; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * @author wasin * @version 1.0 * @date 2024/6/4 * @description: */ @Component public class TopicProducer { @Resource RabbitTemplate rabbitTemplate; /** * @param routeKey 路由 * @param message 消息 */ public void sendMessageByQueue(String routeKey, String message){ rabbitTemplate.convertAndSend(RabbitTopicConfig.EXCHANGE,routeKey,message); } }
TopicConsumer類
package com.example.topic; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @author wasin * @version 1.0 * @date 2024/6/4 * @description: */ @Slf4j @Component public class TopicConsumer { @RabbitListener(queues = RabbitTopicConfig.QUEUE_TOPIC1) public void onMessage2(String message){ log.info("topic收到的字符串消息:{}",message); } }
六、Fanout Exchange(扇形交換機)模式
1、 新建 RabbitFanoutConfig類
package com.example.fanout; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author wasin * @version 1.0 * @date 2024/6/4 * @description: 扇形交換機--這種類型的交換機采用廣播模式,它會將消息發(fā)送給所有綁定到該交換機的隊列, * 不管消息的路由鍵是什么。適用于消息需要被多個消費者處理的場景。 */ @Configuration public class RabbitFanoutConfig { /** * 交換機 */ public static final String EXCHANGE = "EXCHANGE_FANOUT"; /** * 隊列名稱 */ public static final String QUEUE_FANOUT1 = "QUEUE_FANOUT"; /** * 隊列名稱 */ public static final String QUEUE_FANOUT2 = "QUEUE_FANOUT2"; @Bean public Queue queueFanout1() { return new Queue(QUEUE_FANOUT1, //隊列名稱 true, //是否持久化 false, //是否排他 false //是否自動刪除 ); } @Bean public Queue queueFanout2() { return new Queue(QUEUE_FANOUT2, //隊列名稱 true, //是否持久化 false, //是否排他 false //是否自動刪除 ); } @Bean public FanoutExchange exchangeFanout() { return new FanoutExchange(EXCHANGE, true, //是否持久化 false //是否排他 ); } @Bean public Binding bindingFanout() { return BindingBuilder.bind(queueFanout1()).to(exchangeFanout()); } @Bean public Binding bindingFanout2() { return BindingBuilder.bind(queueFanout2()).to(exchangeFanout()); } }
2、新建 消息生產(chǎn)者和發(fā)送者
FanoutProducer類:
package com.example.fanout; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * @author wasin * @version 1.0 * @date 2024/6/4 * @description: */ @Component public class FanoutProducer { @Resource RabbitTemplate rabbitTemplate; /** * @param message 消息 */ public void sendMessageByQueue(String message) { rabbitTemplate.convertAndSend(RabbitFanoutConfig.EXCHANGE, "", message); } }
FanoutConsumer類
package com.example.fanout; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import java.io.IOException; /** * @author wasin * @version 1.0 * @date 2024/6/4 * @description: */ @Slf4j @Component public class FanoutConsumer { /** * 手動提交 * @param message * @param channel * @param tag * @throws IOException */ @RabbitListener(queues = RabbitFanoutConfig.QUEUE_FANOUT1) public void onMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException { log.info("fanout1收到的字符串消息:{}",message); channel.basicAck(tag,false); } @RabbitListener(queues = RabbitFanoutConfig.QUEUE_FANOUT2) public void onMessage2(String message){ log.info("fanout2到的字符串消息:{}",message); } }
到此這篇關(guān)于Springboot RabbitMQ 消息隊列使用的文章就介紹到這了,更多相關(guān)Springboot RabbitMQ 消息隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java OpenCV利用KNN算法實現(xiàn)圖像背景移除
這篇文章主要為大家介紹了Java OpenCV利用K最鄰近(KNN,K-NearestNeighbor)分類算法實現(xiàn)圖像背景移除的示例代碼,需要的可以參考一下2022-01-01Java異常處理操作 Throwable、Exception、Error
這篇文章主要介紹了Java異常處理操作 Throwable、Exception、Error,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-06-06Spring Boot多模塊化后,服務間調(diào)用的坑及解決
這篇文章主要介紹了Spring Boot多模塊化后,服務間調(diào)用的坑及解決,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-06-06