RabbitMQ的Direct Exchange模式實(shí)現(xiàn)的消息發(fā)布案例(示例代碼)
Producer生產(chǎn)者代碼
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQProducer {
private final static String EXCHANGE_NAME = "direct_message_exchange";
private final static String EXCHANGE_TYPE = "direct";
public static void main(String[] args) {
// 1. 創(chuàng)建連接工廠,設(shè)置連接參數(shù)
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672); // RabbitMQ默認(rèn)端口
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
// 2. 聲明交換機(jī) (direct類型,持久化)
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true);
// 3. 聲明隊(duì)列 (持久化,非獨(dú)占,連接斷開時(shí)不自動(dòng)刪除)
channel.queueDeclare("queue5", true, false, false, null);
channel.queueDeclare("queue6", true, false, false, null);
channel.queueDeclare("queue7", true, false, false, null);
// 4. 綁定隊(duì)列到交換機(jī),設(shè)置路由鍵
channel.queueBind("queue5", EXCHANGE_NAME, "order");
channel.queueBind("queue6", EXCHANGE_NAME, "order");
channel.queueBind("queue7", EXCHANGE_NAME, "course");
// 5. 準(zhǔn)備要發(fā)送的消息
String message = "你好,學(xué)相伴:www.kuangstudy.com";
// 6. 向交換機(jī)發(fā)送消息,使用路由鍵 "course"
channel.basicPublish(EXCHANGE_NAME, "course", null, message.getBytes("UTF-8"));
System.out.println("消息發(fā)送成功!");
} catch (Exception ex) {
// 捕獲異常并打印堆棧信息
ex.printStackTrace();
System.out.println("消息發(fā)送出現(xiàn)異常...");
} finally {
// 在try-with-resources中,不再需要顯式關(guān)閉連接和通道
// 會(huì)自動(dòng)關(guān)閉連接和通道
}
}
}功能點(diǎn):
- 聲明了一個(gè)Direct類型的交換機(jī),并綁定了三個(gè)隊(duì)列(
queue5,queue6,queue7)。其中queue5和queue6都綁定到order路由鍵,而queue7綁定到course路由鍵。 - 發(fā)送了一條消息到
course路由鍵綁定的隊(duì)列中(即queue7)。
Consumer消費(fèi)者代碼
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class RabbitMQConsumer {
private final static String QUEUE_NAME = "queue7"; // 與生產(chǎn)者的綁定一致
private final static String EXCHANGE_NAME = "direct_message_exchange";
private final static String EXCHANGE_TYPE = "direct";
public static void main(String[] args) {
// 1. 創(chuàng)建連接工廠,設(shè)置連接參數(shù)
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672); // RabbitMQ默認(rèn)端口
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
// 2. 聲明交換機(jī)和隊(duì)列,與生產(chǎn)者保持一致
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 3. 綁定隊(duì)列到交換機(jī),路由鍵為"course"
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "course");
System.out.println(" [*] 等待接收消息...");
// 4. 定義接收消息的回調(diào)函數(shù)
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] 接收到的消息: '" + message + "'");
// 這里可以添加進(jìn)一步的消息處理邏輯
};
// 5. 開始消費(fèi)消息 (自動(dòng)應(yīng)答)
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
} catch (Exception ex) {
// 捕獲異常并打印堆棧信息
ex.printStackTrace();
System.out.println("消費(fèi)者運(yùn)行中出現(xiàn)異常...");
}
}
}功能點(diǎn):
1. 與生產(chǎn)者保持一致:消費(fèi)者的隊(duì)列名稱、交換機(jī)名稱和路由鍵與生產(chǎn)者保持一致,即監(jiān)聽queue7隊(duì)列,并接收路由鍵為course的消息。
2. 回調(diào)函數(shù)處理消息:使用DeliverCallback來定義收到消息后的處理邏輯。在回調(diào)函數(shù)中,delivery.getBody()獲取消息內(nèi)容,隨后可以對(duì)消息進(jìn)行處理、存儲(chǔ)或其他業(yè)務(wù)邏輯操作。
3 自動(dòng)應(yīng)答:basicConsume中的true表示自動(dòng)應(yīng)答(auto-acknowledge),即消息處理完畢后,RabbitMQ會(huì)自動(dòng)確認(rèn)消息已成功處理。如果需要手動(dòng)應(yīng)答,可以將true替換為false,并在處理完成后調(diào)用channel.basicAck()來手動(dòng)確認(rèn)消息。
到此這篇關(guān)于RabbitMQ的Direct Exchange模式實(shí)現(xiàn)的消息發(fā)布案例的文章就介紹到這了,更多相關(guān)RabbitMQ Direct Exchange消息內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java解決LocalDateTime傳輸前端為時(shí)間的數(shù)組
本文主要介紹了Java解決LocalDateTime傳輸前端為時(shí)間的數(shù)組,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-03-03
Logger.getLogger()與LogFactory.getLog()的區(qū)別詳解
LogFactory來自common-logging包。如果用LogFactory.getLog,你可以用任何實(shí)現(xiàn)了通用日志接口的日志記錄器替換log4j,而程序不受影響2013-09-09
用SpringBoot Admin監(jiān)控SpringBoot程序
這篇文章主要介紹了用SpringBoot Admin監(jiān)控SpringBoot程序,幫助大家更好的理解和使用springboot框架,感興趣的朋友可以了解下2020-10-10
JAVA對(duì)稱加密算法PBE定義與用法實(shí)例分析
這篇文章主要介紹了JAVA對(duì)稱加密算法PBE定義與用法,結(jié)合實(shí)例形式分析了JAVA對(duì)稱加密算法PBE的概念、原理、定義及使用方法,需要的朋友可以參考下2019-09-09
Spring?Boot?多數(shù)據(jù)源處理事務(wù)的思路詳解
這篇文章主要介紹了Spring?Boot?多數(shù)據(jù)源如何處理事務(wù),本文單純就是技術(shù)探討,要從實(shí)際應(yīng)用中來說的話,我并不建議這樣去玩分布式事務(wù)、也不建議這樣去玩多數(shù)據(jù)源,畢竟分布式事務(wù)主要還是用在微服務(wù)場(chǎng)景下,對(duì)Spring?Boot?多數(shù)據(jù)源事務(wù)相關(guān)知識(shí)感興趣的朋友參考下本文2022-06-06
深入解析Java編程中的StringBuffer與StringBuider
這篇文章主要介紹了Java編程中的StringBuffer與StringBuider,是Java入門學(xué)習(xí)中的基礎(chǔ)知識(shí),需要的朋友可以參考下2015-09-09
java中使用@Transactional會(huì)有哪些坑
在Java中,@Transactional是一個(gè)常用的注解,用于聲明方法應(yīng)該在一個(gè)事務(wù)的上下文中執(zhí)行,本文主要介紹了java中使用@Transactional會(huì)有哪些坑,感興趣的可以了解一下2024-04-04
IDEA提示:Boolean method ‘xxx‘ is always&nb
這篇文章主要介紹了IDEA提示:Boolean method ‘xxx‘ is always inverted問題及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-08-08

