RabbitMQ的Direct Exchange模式實(shí)現(xiàn)的消息發(fā)布案例(示例代碼)
Producer生產(chǎn)者代碼
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class RabbitMQProducer { private final static String EXCHANGE_NAME = "direct_message_exchange"; private final static String EXCHANGE_TYPE = "direct"; public static void main(String[] args) { // 1. 創(chuàng)建連接工廠,設(shè)置連接參數(shù) ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); // RabbitMQ默認(rèn)端口 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); try (Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel()) { // 2. 聲明交換機(jī) (direct類型,持久化) channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true); // 3. 聲明隊(duì)列 (持久化,非獨(dú)占,連接斷開時(shí)不自動(dòng)刪除) channel.queueDeclare("queue5", true, false, false, null); channel.queueDeclare("queue6", true, false, false, null); channel.queueDeclare("queue7", true, false, false, null); // 4. 綁定隊(duì)列到交換機(jī),設(shè)置路由鍵 channel.queueBind("queue5", EXCHANGE_NAME, "order"); channel.queueBind("queue6", EXCHANGE_NAME, "order"); channel.queueBind("queue7", EXCHANGE_NAME, "course"); // 5. 準(zhǔn)備要發(fā)送的消息 String message = "你好,學(xué)相伴:www.kuangstudy.com"; // 6. 向交換機(jī)發(fā)送消息,使用路由鍵 "course" channel.basicPublish(EXCHANGE_NAME, "course", null, message.getBytes("UTF-8")); System.out.println("消息發(fā)送成功!"); } catch (Exception ex) { // 捕獲異常并打印堆棧信息 ex.printStackTrace(); System.out.println("消息發(fā)送出現(xiàn)異常..."); } finally { // 在try-with-resources中,不再需要顯式關(guān)閉連接和通道 // 會(huì)自動(dòng)關(guān)閉連接和通道 } } }
功能點(diǎn):
- 聲明了一個(gè)Direct類型的交換機(jī),并綁定了三個(gè)隊(duì)列(
queue5
,queue6
,queue7
)。其中queue5
和queue6
都綁定到order
路由鍵,而queue7
綁定到course
路由鍵。 - 發(fā)送了一條消息到
course
路由鍵綁定的隊(duì)列中(即queue7
)。
Consumer消費(fèi)者代碼
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class RabbitMQConsumer { private final static String QUEUE_NAME = "queue7"; // 與生產(chǎn)者的綁定一致 private final static String EXCHANGE_NAME = "direct_message_exchange"; private final static String EXCHANGE_TYPE = "direct"; public static void main(String[] args) { // 1. 創(chuàng)建連接工廠,設(shè)置連接參數(shù) ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); // RabbitMQ默認(rèn)端口 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); try (Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel()) { // 2. 聲明交換機(jī)和隊(duì)列,與生產(chǎn)者保持一致 channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true); channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 3. 綁定隊(duì)列到交換機(jī),路由鍵為"course" channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "course"); System.out.println(" [*] 等待接收消息..."); // 4. 定義接收消息的回調(diào)函數(shù) DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] 接收到的消息: '" + message + "'"); // 這里可以添加進(jìn)一步的消息處理邏輯 }; // 5. 開始消費(fèi)消息 (自動(dòng)應(yīng)答) channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } catch (Exception ex) { // 捕獲異常并打印堆棧信息 ex.printStackTrace(); System.out.println("消費(fèi)者運(yùn)行中出現(xiàn)異常..."); } } }
功能點(diǎn):
1. 與生產(chǎn)者保持一致:消費(fèi)者的隊(duì)列名稱、交換機(jī)名稱和路由鍵與生產(chǎn)者保持一致,即監(jiān)聽queue7
隊(duì)列,并接收路由鍵為course
的消息。
2. 回調(diào)函數(shù)處理消息:使用DeliverCallback
來(lái)定義收到消息后的處理邏輯。在回調(diào)函數(shù)中,delivery.getBody()
獲取消息內(nèi)容,隨后可以對(duì)消息進(jìn)行處理、存儲(chǔ)或其他業(yè)務(wù)邏輯操作。
3 自動(dòng)應(yīng)答:basicConsume
中的true
表示自動(dòng)應(yīng)答(auto-acknowledge),即消息處理完畢后,RabbitMQ會(huì)自動(dòng)確認(rèn)消息已成功處理。如果需要手動(dòng)應(yīng)答,可以將true
替換為false
,并在處理完成后調(diào)用channel.basicAck()
來(lái)手動(dòng)確認(rèn)消息。
到此這篇關(guān)于RabbitMQ的Direct Exchange模式實(shí)現(xiàn)的消息發(fā)布案例的文章就介紹到這了,更多相關(guān)RabbitMQ Direct Exchange消息內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springCloud集成nacos config的過(guò)程
本文介紹spring cloud集成nacos config的過(guò)程,通過(guò)實(shí)例代碼圖文相結(jié)合給大家介紹的非常詳細(xì),感興趣的朋友跟隨小編一起看看吧2024-08-08Java解決LocalDateTime傳輸前端為時(shí)間的數(shù)組
本文主要介紹了Java解決LocalDateTime傳輸前端為時(shí)間的數(shù)組,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2024-03-03Logger.getLogger()與LogFactory.getLog()的區(qū)別詳解
LogFactory來(lái)自common-logging包。如果用LogFactory.getLog,你可以用任何實(shí)現(xiàn)了通用日志接口的日志記錄器替換log4j,而程序不受影響2013-09-09用SpringBoot Admin監(jiān)控SpringBoot程序
這篇文章主要介紹了用SpringBoot Admin監(jiān)控SpringBoot程序,幫助大家更好的理解和使用springboot框架,感興趣的朋友可以了解下2020-10-10JAVA對(duì)稱加密算法PBE定義與用法實(shí)例分析
這篇文章主要介紹了JAVA對(duì)稱加密算法PBE定義與用法,結(jié)合實(shí)例形式分析了JAVA對(duì)稱加密算法PBE的概念、原理、定義及使用方法,需要的朋友可以參考下2019-09-09Spring?Boot?多數(shù)據(jù)源處理事務(wù)的思路詳解
這篇文章主要介紹了Spring?Boot?多數(shù)據(jù)源如何處理事務(wù),本文單純就是技術(shù)探討,要從實(shí)際應(yīng)用中來(lái)說(shuō)的話,我并不建議這樣去玩分布式事務(wù)、也不建議這樣去玩多數(shù)據(jù)源,畢竟分布式事務(wù)主要還是用在微服務(wù)場(chǎng)景下,對(duì)Spring?Boot?多數(shù)據(jù)源事務(wù)相關(guān)知識(shí)感興趣的朋友參考下本文2022-06-06深入解析Java編程中的StringBuffer與StringBuider
這篇文章主要介紹了Java編程中的StringBuffer與StringBuider,是Java入門學(xué)習(xí)中的基礎(chǔ)知識(shí),需要的朋友可以參考下2015-09-09java中使用@Transactional會(huì)有哪些坑
在Java中,@Transactional是一個(gè)常用的注解,用于聲明方法應(yīng)該在一個(gè)事務(wù)的上下文中執(zhí)行,本文主要介紹了java中使用@Transactional會(huì)有哪些坑,感興趣的可以了解一下2024-04-04IDEA提示:Boolean method ‘xxx‘ is always&nb
這篇文章主要介紹了IDEA提示:Boolean method ‘xxx‘ is always inverted問(wèn)題及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-08-08