SpringBoot 整合 RabbitMQ 的使用方式(代碼示例)
一、RabbitTemplate 的使用
1.【導(dǎo)入依賴】
<!-- rabbitMQ --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.6.1</version> </dependency>
2.【添加配置】
rabbitmq: host: #ip地址 port: 5672 #端口 username: guest password: guest virtual-host: / listener: simple: prefetch: 1 # 默認(rèn)每次取出一條消息消費(fèi), 消費(fèi)完成取下一條 acknowledge-mode: manual # 設(shè)置消費(fèi)端手動ack確認(rèn) retry: enabled: true # 是否支持重試 publisher-confirm-type: correlated #確認(rèn)消息已發(fā)送到交換機(jī)(Exchange) publisher-returns: true #確認(rèn)消息已發(fā)送到隊(duì)列(Queue)
3.【點(diǎn)對點(diǎn)通信(隊(duì)列模式)(Point-to-Point Messaging)】
使用方式:
這種方式也被稱為隊(duì)列(Queue)模型。消息發(fā)送者(Producer)發(fā)送消息到隊(duì)列,然后消息接收者(Consumer)從隊(duì)列中獲取消息進(jìn)行處理。這種模型下,每個(gè)消息只有一個(gè)消費(fèi)者可以接收,確保消息的可靠傳遞和順序處理。
代碼示例: 生產(chǎn)者
/** * 第一種模型: 簡單模型 * 一個(gè)消息生產(chǎn)者 一個(gè)隊(duì)列 一個(gè)消費(fèi)者 * @return */ @GetMapping("hello/world") public void helloWorld() { SysUser sysUser = new SysUser(); // 發(fā)送消息 // 第一個(gè)參數(shù): String routingKey 路由規(guī)則 【交換機(jī) 和隊(duì)列的綁定規(guī)則 】 隊(duì)列名稱 // 第二個(gè)參數(shù): object message 消息的內(nèi)容 // rabbitTemplate.convertAndSend("hello_world_queue", "hello world rabbit!"); /// MessagePostProcessor 消息包裝器 如果需要對消息進(jìn)行包裝 rabbitTemplate.convertAndSend("hello_world_queue", "hello world rabbit!", message -> { // 設(shè)置唯一的標(biāo)識 message.getMessageProperties().setMessageId(UUID.randomUUID().toString()); return message; });
消費(fèi)者
import com.rabbitmq.client.Channel; import lombok.extern.log4j.Log4j2; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; @Component @Log4j2 public class HelloWorldConsumer { @Autowired private StringRedisTemplate redisTemplate; /** * 監(jiān)聽 hello_world_queue 隊(duì)列消費(fèi)消息 * queues 監(jiān)聽隊(duì)列的名稱 要求這個(gè)隊(duì)列必須是已經(jīng)存在的隊(duì)列 * queuesToDeclare 監(jiān)聽隊(duì)列 如果這個(gè)隊(duì)列不存在 則 rabbitMQ 中 RabbitAdmin 會幫助去構(gòu)建這個(gè)隊(duì)列 */ @RabbitListener(queuesToDeclare = @Queue("hello_world_queue")) public void helloWorldConsumer(String msg, Message message, Channel channel) { // 獲取消息的唯一標(biāo)識 String messageId = message.getMessageProperties().getMessageId(); // 將消息添加到 Redis的set集合中 set 不能重復(fù)的 方法的返回值 添加成功的數(shù)量 Long count = redisTemplate.opsForSet().add("hello_world_queue", messageId); if (count != null && count == 1) { // 沒有消費(fèi)過 正常消費(fèi) log.info("hello_world_queue隊(duì)列消費(fèi)者接收到了消息,消息內(nèi)容:{}", message); } } }
4.【發(fā)布/訂閱模式(Publish/Subscribe Messaging)】
使用方式:
在發(fā)布/訂閱模式中,消息發(fā)送者將消息發(fā)布到交換機(jī)(Exchange),而不是直接發(fā)送到隊(duì)列。交換機(jī)負(fù)責(zé)將消息路由到一個(gè)或多個(gè)綁定的隊(duì)列中。每個(gè)訂閱者(Subscriber)可以選擇訂閱它感興趣的消息隊(duì)列,從而接收消息。
代碼示例: 生產(chǎn)者
/** * 工作隊(duì)列 * 一個(gè)生產(chǎn)者 一個(gè)隊(duì)列 多個(gè)消費(fèi)者 */ @GetMapping("work/queue") public void workQueue() { for (int i = 1; i <= 10; i++) { rabbitTemplate.convertAndSend("work_queue", i + "hello work queue!"); } }
消費(fèi)者
import lombok.extern.log4j.Log4j2; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @Log4j2 public class WorkQueueConsumer { /*** * 消費(fèi)者1 * @param message */ @RabbitListener(queuesToDeclare = @Queue("work_queue")) public void workQueueConsumer(String message) throws InterruptedException { Thread.sleep(200); log.info("work_queue隊(duì)列消費(fèi)者1接收到了消息,消息內(nèi)容:{}", message); } /*** * 消費(fèi)者2 * @param message */ @RabbitListener(queuesToDeclare = @Queue("work_queue")) public void workQueueConsumer2(String message) throws InterruptedException { Thread.sleep(400); log.info("work_queue隊(duì)列消費(fèi)者2接收到了消息,消息內(nèi)容:{}", message); } }
5.【工作隊(duì)列模式(Work Queues)】
使用方式:
工作隊(duì)列模式也稱為任務(wù)隊(duì)列(Task Queues),它可以用來實(shí)現(xiàn)任務(wù)的異步處理。多個(gè)工作者(Worker)同時(shí)監(jiān)聽同一個(gè)隊(duì)列,當(dāng)有新的任務(wù)消息被發(fā)送到隊(duì)列中時(shí),空閑的工作者會獲取并處理這些任務(wù),確保任務(wù)能夠并行處理而不會重復(fù)執(zhí)行。
代碼示例: 生產(chǎn)者
/** * 發(fā)布訂閱 * 一個(gè)生產(chǎn)者 多個(gè)隊(duì)列 多個(gè)消費(fèi)者 涉及 到交換機(jī) fanout */ @GetMapping("publish/subscribe") public void publishSubscribe() { // 第一個(gè)參數(shù): 交換機(jī)的名稱 沒有要求 // 第二個(gè)參數(shù): 交換機(jī)和隊(duì)列的綁定規(guī)則 如果是發(fā)布訂閱模式 那么這個(gè)規(guī)則默認(rèn)不寫 只需要交換機(jī)和隊(duì)列綁定即可不需要規(guī)則 // 第三個(gè)參數(shù): 消息內(nèi)容 rabbitTemplate.convertAndSend("publish_subscribe_exchange", "", "hello publisher subscribe!!"); }
消費(fèi)者
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class PublisherSubscribeConsumer { private static final Logger log = LoggerFactory.getLogger(PublisherSubscribeConsumer.class); /** * 發(fā)布訂閱模型消費(fèi)者 * * @param message */ @RabbitListener(bindings = @QueueBinding(value = @Queue("pb_sb_queue_01"), exchange = @Exchange(name = "publish_subscribe_exchange", type = ExchangeTypes.FANOUT))) public void publisherSubscribe(String message) { log.info("發(fā)布訂閱模型消費(fèi)者1接收到了消息,消息內(nèi)容:{}", message); } /** * 發(fā)布訂閱模型消費(fèi)者 * * @param message */ @RabbitListener(bindings = @QueueBinding(value = @Queue("pb_sb_queue_02"), exchange = @Exchange(name = "publish_subscribe_exchange", type = ExchangeTypes.FANOUT))) public void publisherSubscribe2(String message) { log.info("發(fā)布訂閱模型消費(fèi)者2接收到了消息,消息內(nèi)容:{}", message); } }
6.【路由模式(Routing)】
使用方式:
路由模式允許發(fā)送者根據(jù)消息的路由鍵(Routing Key)將消息路由到特定的隊(duì)列。發(fā)送者將消息發(fā)送到交換機(jī),并且通過設(shè)置不同的路由鍵,使消息能夠被交換機(jī)路由到不同的隊(duì)列。消費(fèi)者可以根據(jù)需要選擇監(jiān)聽哪些隊(duì)列來接收消息。
代碼示例: 生產(chǎn)者
/** * 路由模型 * 一個(gè)生產(chǎn)者 多個(gè)隊(duì)列 多個(gè)消費(fèi)者 涉及 到交換機(jī) direct */ @GetMapping("routing") public void routing() { // 第一個(gè)參數(shù): 交換機(jī)的名稱 沒有要求 // 第二個(gè)參數(shù): 交換機(jī)和隊(duì)列的綁定規(guī)則 字符串 隨意 // 第三個(gè)參數(shù): 消息內(nèi)容 rabbitTemplate.convertAndSend("routing_exchange", "aaa", "hello routing!!"); }
消費(fèi)者
import lombok.extern.log4j.Log4j2; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @Log4j2 public class RoutingConsumer { /** * 路由模型消費(fèi)者 * @param message */ @RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_01"), exchange = @Exchange(name = "routing_exchange", type = ExchangeTypes.DIRECT), key = { "abc", "error", "info" })) public void routingConsumer(String message) { log.info("路由模型消費(fèi)者1接收到了消息,消息內(nèi)容:{}", message); } /** * 路由模型消費(fèi)者 * @param message */ @RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_02"), exchange = @Exchange(name = "routing_exchange", type = ExchangeTypes.DIRECT), key = { "aaa", "ccc", "waadaffas" })) public void routingConsumer2(String message) { log.info("路由模型消費(fèi)者2接收到了消息,消息內(nèi)容:{}", message); } /** * 路由模型消費(fèi)者 * @param message */ @RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_03"), exchange = @Exchange(name = "routing_exchange", type = ExchangeTypes.DIRECT), key = { "bbbb", "asdfasd", "asdfasdf" })) public void routingConsumer3(String message) { log.info("路由模型消費(fèi)者3接收到了消息,消息內(nèi)容:{}", message); } }
7.【主題模式(Topics)】
使用方式:
主題模式是路由模式的一種擴(kuò)展,它允許發(fā)送者根據(jù)消息的多個(gè)屬性(如主題)將消息路由到一個(gè)或多個(gè)隊(duì)列。主題交換機(jī)(Topic Exchange)使用通配符匹配路由鍵與隊(duì)列綁定鍵的模式,從而實(shí)現(xiàn)更靈活的消息路由和過濾。
代碼示例: 生產(chǎn)者
/** * 主題模型 * 一個(gè)生產(chǎn)者 多個(gè)隊(duì)列 多個(gè)消費(fèi)者 涉及 到交換機(jī) topic */ @GetMapping("topic") public void topic() { // 第一個(gè)參數(shù): 交換機(jī)的名稱 沒有要求 // 第二個(gè)參數(shù): 交換機(jī)和隊(duì)列的綁定規(guī)則 多個(gè)單詞 以 “.” 拼起來 // 第三個(gè)參數(shù): 消息內(nèi)容 rabbitTemplate.convertAndSend("topic_exchange", "bwie.age.name", "hello topic!!"); }
消費(fèi)者
import lombok.extern.log4j.Log4j2; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @Log4j2 public class TopicConsumer { /** * * 表示任意一個(gè)單詞 * # 表示任意一個(gè)單詞 或 多個(gè) */ @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic_queue_01"), exchange = @Exchange(name = "topic_exchange", type = ExchangeTypes.TOPIC), key = { "abc.*", "error.*.info", "#.name" })) public void topicConsumer(String message) { log.info("xxxxxxxxx1"); } /** * * 表示任意一個(gè)單詞 * # 表示任意一個(gè)單詞 或 多個(gè) */ @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic_queue_02"), exchange = @Exchange(name = "topic_exchange", type = ExchangeTypes.TOPIC), key = { "abc.*", "username" })) public void topicConsumer2(String message) { log.info("xxxxxxxxx2"); } /** * * 表示任意一個(gè)單詞 * # 表示任意一個(gè)單詞 或 多個(gè) */ @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic_queue_03"), exchange = @Exchange(name = "topic_exchange", type = ExchangeTypes.TOPIC), key = { "bwie.*", "error.*.info" })) public void topicConsumer3(String message) { log.info("xxxxxxxxx3"); } }
到此這篇關(guān)于SpringBoot 整合 RabbitMQ 的使用的文章就介紹到這了,更多相關(guān)SpringBoot 整合 RabbitMQ內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringBoot實(shí)現(xiàn)RabbitMQ監(jiān)聽消息的四種方式
- SpringBoot使用RabbitMQ延時(shí)隊(duì)列(小白必備)
- SpringBoot中RabbitMQ集群的搭建詳解
- SpringBoot集成RabbitMQ的方法(死信隊(duì)列)
- 一文掌握Springboot集成RabbitMQ的方法
- Springboot 配置RabbitMQ文檔的方法步驟
- springboot整合rabbitmq的示例代碼
- SpringBoot中連接多個(gè)RabbitMQ的方法詳解
- springboot3.0整合rabbitmq3.13的實(shí)現(xiàn)示例
相關(guān)文章
java的Map集合中按value值進(jìn)行排序輸出的實(shí)例代碼
下面小編就為大家?guī)硪黄猨ava的Map集合中按value值進(jìn)行排序輸出的實(shí)例代碼。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2016-08-08解析Java中的定時(shí)器及使用定時(shí)器制作彈彈球游戲的示例
這篇文章主要介紹了Java中的定時(shí)器及使用定時(shí)器制作彈彈球游戲的示例,文中同時(shí)也分析了定時(shí)器timer的缺點(diǎn)及相關(guān)替代方案,需要的朋友可以參考下2016-02-02Java基礎(chǔ)之Math和Random類知識總結(jié)
今天帶大家來學(xué)習(xí)java的Math和Random類,文中有非常詳細(xì)的代碼示例及介紹,對正在學(xué)習(xí)java基礎(chǔ)的小伙伴們很有幫助喲,需要的朋友可以參考下2021-05-05使用java實(shí)現(xiàn)“釘釘微應(yīng)用免登進(jìn)入某H5系統(tǒng)首頁“功能”
這篇文章主要介紹了用java實(shí)現(xiàn)“釘釘微應(yīng)用,免登進(jìn)入某H5系統(tǒng)首頁“功能”,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2019-10-10基于Java實(shí)現(xiàn)簡單的身材計(jì)算程序
這篇文章主要為大家詳細(xì)介紹了如何利用Java實(shí)現(xiàn)簡單的身材計(jì)算程序,可以計(jì)算身體的體脂率以及BMI數(shù)值等,感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2022-12-12springboot + rabbitmq 如何實(shí)現(xiàn)消息確認(rèn)機(jī)制(踩坑經(jīng)驗(yàn))
這篇文章主要介紹了springboot + rabbitmq 如何實(shí)現(xiàn)消息確認(rèn)機(jī)制,本文給大家分享小編實(shí)際開發(fā)中的一點(diǎn)踩坑經(jīng)驗(yàn),內(nèi)容簡單易懂,需要的朋友可以參考下2020-07-07