RabbitMQ的基礎(chǔ)知識
RabbitMQ
1.對MQ的介紹
1.說明是MQ
MQ(message queue),從字面意思上看,本質(zhì)是個隊列,F(xiàn)IFO 先入先出,只不過隊列中存放的內(nèi)容是
message 而已,還是一種跨進(jìn)程的通信機(jī)制,用于上下游傳遞消息。在互聯(lián)網(wǎng)架構(gòu)中,MQ 是一種非常常
見的上下游“邏輯解耦+物理解耦”的消息通信服務(wù)。使用了 MQ 之后,消息發(fā)送上游只需要依賴 MQ,不
用依賴其他服務(wù)。
2.MQ的好處
1.流量消峰
舉個例子,如果訂單系統(tǒng)最多能處理一萬次訂單,這個處理能力應(yīng)付正常時段的下單時綽綽有余,正常時段我們下單一秒后就能返回結(jié)果。但是在高峰期,如果有兩萬次下單操作系統(tǒng)是處理不了的,只能限制訂單超過一萬后不允許用戶下單。使用消息隊列做緩沖,我們可以取消這個限制,把一秒內(nèi)下的訂單分散成一段時間來處理,這時有些用戶可能在下單十幾秒后才能收到下單成功的操作,但是比不能下單的體驗要好。
2.應(yīng)用解耦
以電商應(yīng)用為例,應(yīng)用中有訂單系統(tǒng)、庫存系統(tǒng)、物流系統(tǒng)、支付系統(tǒng)。用戶創(chuàng)建訂單后,如果耦合調(diào)用庫存系統(tǒng)、物流系統(tǒng)、支付系統(tǒng),任何一個子系統(tǒng)出了故障,都會造成下單操作異常。當(dāng)轉(zhuǎn)變成基于消息隊列的方式后,系統(tǒng)間調(diào)用的問題會減少很多,比如物流系統(tǒng)因為發(fā)生故障,需要幾分鐘來修復(fù)。在這幾分鐘的時間里,物流系統(tǒng)要處理的內(nèi)存被緩存在消息隊列中,用戶的下單操作可以正常完成。當(dāng)物流系統(tǒng)恢復(fù)后,繼續(xù)處理訂單信息即可,用戶感受不到物流系統(tǒng)的故障,提升系統(tǒng)的可用性。

- 異步處理
有些服務(wù)間調(diào)用是異步的,例如 A 調(diào)用 B,B 需要花費(fèi)很長時間執(zhí)行,但是 A 需要知道 B 什么時候可以執(zhí)行完,以前一般有兩種方式,A 過一段時間去調(diào)用 B 的查詢 api 查詢?;蛘?A 提供一個 callback api, B 執(zhí)行完之后調(diào)用 api 通知 A 服務(wù)。這兩種方式都不是很優(yōu)雅,使用消息總線,可以很方便解決這個問題,A 調(diào)用 B 服務(wù)后,只需要監(jiān)聽 B 處理完成的消息,當(dāng) B 處理完成后,會發(fā)送一條消息給 MQ,MQ 會將此消息轉(zhuǎn)發(fā)給 A 服務(wù)。這樣 A 服務(wù)既不用循環(huán)調(diào)用 B 的查詢 api,也不用提供 callback api。同樣 B 服務(wù)也不用做這些操作。A 服務(wù)還能及時的得到異步處理成功的消息。

2.RabbitMQ的六種模式 及工作原理
工作模式
依次是:hello world ,工作模式,發(fā)布訂閱模式,路由模式,主題模式,發(fā)布確認(rèn)模式

工作原理

Binding:exchange 和 queue 之間的虛擬連接,binding 中可以包含 routing key,Binding 信息被保
存到 exchange 中的查詢表中,用于 message 的分發(fā)依據(jù)
依賴
<!--rabbitmq 依賴客戶端--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency>
3.hello world隊列

1.生產(chǎn)者
public class Producer {
//建立隊列
private static final String QUEUE_NAME="hello";
public static void main(String[] args) {
//創(chuàng)建連接工場
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setUsername("guest");
try {
//建立連接和信道
//channel 實現(xiàn)了自動 close 接口 自動關(guān)閉 不需要顯示關(guān)閉
Connection connection=factory.newConnection();
Channel channel=connection.createChannel();
/**
* 生成一個隊列,并將信道和隊列連接
* 1.隊列名稱
* 2.隊列里面的消息是否持久化 默認(rèn)消息存儲在內(nèi)存中
* 3.該隊列是否只供一個消費(fèi)者進(jìn)行消費(fèi) 是否進(jìn)行共享 true 可以多個消費(fèi)者消費(fèi)
* 4.是否自動刪除 最后一個消費(fèi)者端開連接以后 該隊列是否自動刪除 true 自動刪除
* 5.其他參數(shù)
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message="hello world";
/**
* 發(fā)送一個消息
* 1.發(fā)送到那個交換機(jī)
* 2.路由的 key 是哪個
* 3.其他的參數(shù)信息
* 4.發(fā)送消息的消息體
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息發(fā)送成功");
}catch (Exception e){
e.printStackTrace();
}
}
}
消費(fèi)者
public class Consumer {
//定義隊列名
private static final String QUEUE_NAME="hello";
public static void main(String[] args) {
//建立連接和信道
try {
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection=factory.newConnection();
Channel channel=connection.createChannel();
System.out.println("等待接收消息");
/**
*1.同一個會話, consumerTag 是固定的 可以做此會話的名字, deliveryTag 每次接收消息+1,可以做此消息處理通道的名字。
*2.包含消息的字節(jié)形式的類
*/
DeliverCallback deliverCallback=(consumerTag,delivery)->{
String message=new String(delivery.getBody());
System.out.println(message);
};
CancelCallback cancelCallback=(consumerTag)->{
System.out.println("消息消費(fèi)被取消");
};
/* 消費(fèi)者消費(fèi)消息
* 1.消費(fèi)哪個隊列
* 2.消費(fèi)成功之后是否要自動應(yīng)答 true 代表自動應(yīng)答 false 手動應(yīng)答
* 3.消費(fèi)者未成功消費(fèi)的回調(diào)
* 4.消費(fèi)者取消消費(fèi)的回調(diào)
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}catch (Exception e){
e.printStackTrace();
}
}
}
4.工作隊列模式

生產(chǎn)者
public class Producer {
public static void main(String[] args) throws IOException, InterruptedException {
Channel channel=RabbitMQChannelUtil.getChannel();
if(channel == null){
System.out.println("失敗");
return;
}
channel.queueDeclare(RabbitMQChannelUtil.QUEUE_NAME,false,false,false,null);
int i=0;
while (true){
String message="消息"+i;
i++;
/**
* 發(fā)送一個消息
* 1.發(fā)送到那個交換機(jī)
* 2.路由的 key 是哪個
* 3.其他的參數(shù)信息
* 4.發(fā)送消息的消息體
*/
channel.basicPublish("",RabbitMQChannelUtil.QUEUE_NAME,null,message.getBytes());
System.out.println(message);
Thread.sleep(500);
}
}
}
消費(fèi)者
public class Consumer {
public static void main(String[] args) {
Channel channel=RabbitMQChannelUtil.getChannel();
if(channel == null){
System.out.println("消費(fèi)失敗");
return;
}
DeliverCallback deliverCallback=(consumerTag, delivery)->{
String message=new String(delivery.getBody());
System.out.println(Thread.currentThread().getName()+"消費(fèi)了"+message);
};
CancelCallback cancelCallback=(consumerTag)->{
System.out.println("消息消費(fèi)被取消");
};
Thread[] threads=new Thread[5];
for (int i = 0; i <threads.length ; i++) {
threads[i]=new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+"啟動等待消費(fèi)");
channel.basicConsume(RabbitMQChannelUtil.QUEUE_NAME,true,deliverCallback,cancelCallback);
} catch (IOException e) {
e.printStackTrace();
}
});
}
for (int i = 0; i <threads.length ; i++) {
threads[i].start();
}
}
}
5.消息應(yīng)答機(jī)制
認(rèn)識
消費(fèi)者處理消息時,可能在處理過程中掛掉,那么消息就會丟失為了保證消息在發(fā)送過程中不丟失,rabbitmq 引入消息應(yīng)答機(jī)制,消息應(yīng)答就是:消費(fèi)者在接收到消息并且處理該消息之后,告訴 rabbitmq 它已經(jīng)處理了rabbitmq 可以把該消息刪除了。
自動應(yīng)答
消息發(fā)送后立即被認(rèn)為已經(jīng)傳送成功,這種模式需要在高吞吐量和數(shù)據(jù)傳輸安全性方面做權(quán)衡,因為這種模式如果消息在接收到之前,消費(fèi)者那邊出現(xiàn)連接或者 channel 關(guān)閉,那么消息就丟失了
手動應(yīng)答
- Channel.basicAck(用于肯定確認(rèn))
RabbitMQ 已知道該消息并且成功的處理消息,可以將其丟棄了
- Channel.basicNack(用于否定確認(rèn))
- Channel.basicReject(用于否定確認(rèn))
與 Channel.basicNack 相比少一個參數(shù),不處理該消息了直接拒絕,可以將其丟棄了
Channel.basicNack參數(shù)中Multiple(批量應(yīng)答) 的解釋
multiple 的 true 和 false 代表不同意思
- true 代表批量應(yīng)答 channel 上未應(yīng)答的消息
比如說 channel 上有傳送 tag 的消息 5,6,7,8 當(dāng)前 tag 是 8 那么此時
5-8 的這些還未應(yīng)答的消息都會被確認(rèn)收到消息應(yīng)答
- false
只會應(yīng)答 tag=8 的消息 5,6,7 這三個消息依然不會被確認(rèn)收到消息應(yīng)答

消息手動應(yīng)答的代碼
- 將手動應(yīng)答開啟
/* 消費(fèi)者消費(fèi)消息 * 1.消費(fèi)哪個隊列 * 2.消費(fèi)成功之后是否要自動應(yīng)答 true 代表自動應(yīng)答 false 手動應(yīng)答 * 3.當(dāng)一個消息發(fā)送過來后的回調(diào)接口 * 4.消費(fèi)者取消消費(fèi)的回調(diào) */ boolean ack=false; channel.basicConsume(QUEUE_NAME,ack,deliverCallback,cancelCallback);
- 消息消費(fèi)回調(diào)時,使用手動應(yīng)答
/**
* 消息發(fā)送過來后的回調(diào)接口
*1.同一個會話, consumerTag 是固定的 可以做此會話的名字, deliveryTag 每次接收消息+1,可以做此消息處理通道的名字。
*2.消息類
*/
DeliverCallback deliverCallback=(consumerTag,delivery)->{
String message=new String(delivery.getBody());
System.out.println(message);
/**
* 參數(shù)說明
* 1.消息的標(biāo)記tag
* 2.是否批量應(yīng)答
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
消息自動進(jìn)行重新入隊
如果消費(fèi)者由于某些原因失去連接(其通道已關(guān)閉,連接已關(guān)閉或 TCP 連接丟失),導(dǎo)致消息未發(fā)送 ACK 確認(rèn),RabbitMQ 將了解到消息未完全處理,并將對其重新排隊。如果此時其他消費(fèi)者可以處理,它將很快將其重新分發(fā)給另一個消費(fèi)者。這樣,即使某個消費(fèi)者偶爾死亡,也可以確保不會丟失任何消息。

6.RabbitMQ的持久化,不公平分發(fā)及預(yù)取值
概念
剛剛我們已經(jīng)看到了如何處理任務(wù)不丟失的情況,但是如何保障當(dāng) RabbitMQ 服務(wù)停掉以后消息生產(chǎn)者發(fā)送過來的消息不丟失。默認(rèn)情況下 RabbitMQ 退出或由于某種原因崩潰時,它忽視隊列和消息,除非告知它不要這樣做。確保消息不會丟失需要做兩件事:我們需要將隊列和消息都標(biāo)記為持久化。
隊列持久化
- 之前我們創(chuàng)建的隊列都是非持久化的,rabbitmq 如果重啟的化,該隊列就會被刪除掉,如果要隊列實現(xiàn)持久化 需要在聲明隊列的時候把 durable(第二個) 參數(shù)設(shè)置為持久化
- 但是需要注意的就是如果之前聲明的隊列不是持久化的,需要把原先隊列先刪除,或者重新創(chuàng)建一個持久化的隊列,不然就會出現(xiàn)錯誤
channel.queueDeclare(RabbitMQChannelUtil.QUEUE_NAME,true,false,false,null);

這個就是持久化隊列
消息持久化
- 要想讓消息實現(xiàn)持久化需要在消息生產(chǎn)者修改代碼,MessageProperties.PERSISTENT_TEXT_PLAIN 添加這個屬性。
隊列持久化為false時:
channel.basicPublish("",RabbitMQChannelUtil.QUEUE_NAME,null,message.getBytes());
隊列持久化為true時
channel.basicPublish("",RabbitMQChannelUtil.QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
- 將消息標(biāo)記為持久化并不能完全保證不會丟失消息。盡管它告訴 RabbitMQ 將消息保存到磁盤,但是
這里依然存在當(dāng)消息剛準(zhǔn)備存儲在磁盤的時候 但是還沒有存儲完,消息還在緩存的一個間隔點。此時并沒
有真正寫入磁盤。持久性保證并不強(qiáng).更強(qiáng)的持久化后面發(fā)布確認(rèn)會講到
不公平分發(fā)
在最開始的時候我們學(xué)習(xí)到 RabbitMQ 分發(fā)消息采用的輪訓(xùn)分發(fā),但是在某種場景下這種策略并不是很好,比方說有兩個消費(fèi)者在處理任務(wù),其中有個消費(fèi)者 1 處理任務(wù)的速度非常快,而另外一個消費(fèi)者 2處理速度卻很慢,這個時候我們還是采用輪訓(xùn)分發(fā)的化就會到這處理速度快的這個消費(fèi)者很大一部分時間處于空閑狀態(tài),而處理慢的那個消費(fèi)者一直在干活,這種分配方式在這種情況下其實就不太好,但是RabbitMQ 并不知道這種情況它依然很公平的進(jìn)行分發(fā)。
為了避免這種情況,我們設(shè)置不公平分發(fā):
channel.basicQos(1);

預(yù)取值
本身消息的發(fā)送就是異步發(fā)送的,所以在任何時候,channel 上肯定不止只有一個消息另外來自消費(fèi)者的手動確認(rèn)本質(zhì)上也是異步的。因此這里就存在一個未確認(rèn)的消息緩沖區(qū),因此希望開發(fā)人員能限制此緩沖區(qū)的大小,以避免緩沖區(qū)里面無限制的未確認(rèn)消息問題。這個時候就可以通過使用 basic.qos 方法設(shè)置“預(yù)取計數(shù)”值來完成的。該值定義通道上允許的未確認(rèn)消息的最大數(shù)量。一旦數(shù)量達(dá)到配置的數(shù)量,RabbitMQ 將停止在通道上傳遞更多消息,除非至少有一個未處理的消息被確認(rèn)
prefetch就是預(yù)取值數(shù)

7.發(fā)布確認(rèn)
上文持久化中提到,當(dāng)消息持久化存入RabbitMQ磁盤時,RabbitMQ突然宕機(jī),則消息未成功存入,會發(fā)生消息丟失。所以發(fā)布確認(rèn)即:在消息成功存入磁盤時,返還給生產(chǎn)者一個消息,確認(rèn)已經(jīng)存入磁盤
具體介紹
生產(chǎn)者將信道設(shè)置成 confirm 模式,一旦信道進(jìn)入 confirm 模式,所有在該信道上面發(fā)布的消息都將會被指派一個唯一的 ID(從 1 開始),一旦消息被投遞到所有匹配的隊列之后,broker就會發(fā)送一個確認(rèn)給生產(chǎn)者(包含消息的唯一 ID),這就使得生產(chǎn)者知道消息已經(jīng)正確到達(dá)目的隊列了,如果消息和隊列是可持久化的,那么確認(rèn)消息會在將消息寫入磁盤之后發(fā)出,broker 回傳
給生產(chǎn)者的確認(rèn)消息中 delivery-tag 域包含了確認(rèn)消息的序列號,此外 broker 也可以設(shè)置basic.ack 的 multiple 域,表示到這個序列號之前的所有消息都已經(jīng)得到了處理。
為了保證消息不丟失:
- 開啟隊列持久化
- 開啟消息持久化
- 開啟信道的發(fā)布確認(rèn)
開啟發(fā)布確認(rèn)的方法
channel.confirmSelect();
發(fā)布確認(rèn)的模式
單個確認(rèn)發(fā)布
public static void singleConfirm(){
try {
Channel channel=RabbitMQChannelUtil.getChannel();
if(channel == null){
System.out.println("信道建立失敗");
return;
}
//開啟發(fā)布確認(rèn)
channel.confirmSelect();
long begin=System.currentTimeMillis();
for (int i = 0; i <MESSAGE_COUNT ; i++) {
String message=i+"";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
//可以加時間參數(shù),當(dāng)消息發(fā)送失敗或超過參數(shù)時間沒成功,則返回false
boolean flag=channel.waitForConfirms();
//如果失敗可以重發(fā)
if(flag){
System.out.println(message+"發(fā)送成功");
}else {
//這里可以實現(xiàn)重發(fā)
System.out.println(message+"發(fā)送失敗");
}
}
long end=System.currentTimeMillis();
System.out.println("發(fā)送"+MESSAGE_COUNT+"條消息,耗時"+(end-begin)+"ms");
}catch (Exception e){
e.printStackTrace();
}
}
發(fā)布一個消息之后只有它被確認(rèn)發(fā)布,后續(xù)的消息才能繼續(xù)發(fā)布,waitForConfirmsOrDie(long)這個方法只有在消息被確認(rèn)的時候才返回,如果在指定時間范圍內(nèi)這個消息沒有被確認(rèn)那么它將拋出異常。
缺點:速度太慢
2.批量發(fā)布確認(rèn)模式
public static void batchConfirm(){
try {
Channel channel=RabbitMQChannelUtil.getChannel();
if(channel == null){
System.out.println("建立連接失敗");
return;
}
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//當(dāng)100條消息發(fā)布成功時,再確認(rèn)
int ackMessageCount=100;
//未確認(rèn)的消息個數(shù)
int needAckMessageCount=0;
//開啟發(fā)布確認(rèn)
channel.confirmSelect();
long begin=System.currentTimeMillis();
for (int i = 0; i <MESSAGE_COUNT ; i++) {
String message=i+"";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
needAckMessageCount++;
if(needAckMessageCount == ackMessageCount){
//確認(rèn)
channel.waitForConfirms();
needAckMessageCount=0;
}
}
//判斷可能還有消息未發(fā)送,再發(fā)送依次
if(needAckMessageCount > 0){
channel.waitForConfirms();
}
long end= System.currentTimeMillis();
System.out.println("發(fā)送"+MESSAGE_COUNT+"條消息,耗時"+(end-begin)+"ms");
}catch (Exception e){
e.printStackTrace();
}
}
缺點:當(dāng)發(fā)生故障導(dǎo)致發(fā)布出現(xiàn)問題時,不知道是哪個消息出現(xiàn)問題
3.異步確認(rèn)發(fā)布
原理
有單獨(dú)一個隊列保存確認(rèn)信號
public static void asyncConfirm() throws Exception {
try (Channel channel = RabbitMQChannelUtil.getChannel()) {
if(channel == null){
return;
}
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//開啟發(fā)布確認(rèn)
channel.confirmSelect();
/**
* 線程安全有序的一個哈希表,適用于高并發(fā)的情況
* 1.輕松的將序號與消息進(jìn)行關(guān)聯(lián)
* 2.輕松批量刪除條目 只要給到序列號
* 3.支持并發(fā)訪問
*/
ConcurrentSkipListMap<Long, String> outstandingConfirms = new
ConcurrentSkipListMap<>();
/**
* 確認(rèn)收到消息的一個回調(diào)
* 1.消息序列號
* 2.true 可以確認(rèn)小于等于當(dāng)前序列號的消息
* false 確認(rèn)當(dāng)前序列號消息
*/
ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
if (multiple) {
//返回的是小于等于當(dāng)前序列號的未確認(rèn)消息 是一個 map
ConcurrentNavigableMap<Long, String> confirmed =
outstandingConfirms.headMap(sequenceNumber, true);
//清除該部分未確認(rèn)消息
confirmed.clear();
}else{
//只清除當(dāng)前序列號的消息
outstandingConfirms.remove(sequenceNumber);
}
};
ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
String message = outstandingConfirms.get(sequenceNumber);
System.out.println("發(fā)布的消息"+message+"未被確認(rèn),序列號"+sequenceNumber);
};
/**
* 添加一個異步確認(rèn)的監(jiān)聽器
* 1.確認(rèn)收到消息的回調(diào)
* 2.未收到消息的回調(diào)
*/
channel.addConfirmListener(ackCallback, null);
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "消息" + i;
/**
* channel.getNextPublishSeqNo()獲取下一個消息的序列號
* 通過序列號與消息體進(jìn)行一個關(guān)聯(lián)
* 全部都是未確認(rèn)的消息體
*/
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("發(fā)布" + MESSAGE_COUNT + "個異步確認(rèn)消息,耗時" + (end - begin) +
"ms");
}
}
8.交換機(jī)
<1>交換機(jī)的認(rèn)識
1.1 概念
RabbitMQ 消息傳遞模型的核心思想是: 生產(chǎn)者生產(chǎn)的消息從不會直接發(fā)送到隊列。實際上,通常生產(chǎn)者甚至都不知道這些消息傳遞傳遞到了哪些隊列中。
相反,生產(chǎn)者只能將消息發(fā)送到交換機(jī)(exchange),交換機(jī)工作的內(nèi)容非常簡單,一方面它接收來自生產(chǎn)者的消息,另一方面將它們推入隊列。交換機(jī)必須確切知道如何處理收到的消息。是應(yīng)該把這些消息放到特定隊列還是說把他們到許多隊列中還是說應(yīng)該丟棄它們。這就的由交換機(jī)的類型來決定。

1.2Exchanges 的類型
總共有以下類型:
直接(direct), 主題(topic) ,標(biāo)題(headers) , 扇出(fanout)
1.3無名Exchange
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
第一個參數(shù)是交換機(jī)的名稱。空字符串表示默認(rèn)或無名稱交換機(jī):消息能路由發(fā)送到隊列中其實
是由 routingKey(bindingkey)綁定 key 指定的,如果它存在的話
1.4臨時隊列
每當(dāng)我們連接到 Rabbit 時,我們都需要一個全新的空隊列,為此我們可以創(chuàng)建一個具有隨機(jī)名稱的隊列,或者能讓服務(wù)器為我們選擇一個隨機(jī)隊列名稱那就更好了。其次一旦我們斷開了消費(fèi)者的連接,隊列將被自動刪除。
String queueName = channel.queueDeclare().getQueue();
1.5隊列和交換機(jī)之間的綁定
String queueName = channel.queueDeclare().getQueue();
<2>交換機(jī)具體介紹
Fanout 刪除(廣播)
將接收到的所有消息廣播到它知道的所有隊列中。
Direct (直接)
將詳細(xì)發(fā)送到對應(yīng)路由鍵的隊列上去

在上面這張圖中,我們可以看到 X 綁定了兩個隊列,綁定類型是 direct。隊列 Q1 綁定鍵為 orange,隊列 Q2 綁定鍵有兩個:一個綁定鍵為 black,另一個綁定鍵為 green.
在這種綁定情況下,生產(chǎn)者發(fā)布消息到 exchange 上,綁定鍵為 orange 的消息會被發(fā)布到隊列Q1。綁定鍵為 blackgreen 和的消息會被發(fā)布到隊列 Q2,其他消息類型的消息將被丟棄。
綁定
//聲明交換機(jī)名稱及類型 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //把該臨時隊列綁定我們的 exchange 其中 routingkey(也稱之為 binding key)為空字符串 channel.queueBind(queueName, EXCHANGE_NAME, "");
Topics(主題)
- 盡管使用 direct 交換機(jī)改進(jìn)了我們的系統(tǒng),但是它仍然存在局限性-比方說我們想接收的日志類型有info.base 和 info.advantage,某個隊列只想 info.base 的消息,那這個時候 direct 就辦不到了。這個時候就只能使用 topic 類型
- 發(fā)送到類型是 topic 交換機(jī)的消息的 routing_key 不能隨意寫,必須滿足一定的要求,它**必須是一個單詞列表,以點號分隔開。這些單詞可以是任意單詞。但這個單詞列表最多不能超過 255 個字節(jié)。
- 可以代替一個單詞
- 可以替代零個或多個單詞

9.死信隊列
<1>認(rèn)識死信隊列
概念
- 死信,顧名思義就是無法被消費(fèi)的消息,字面意思可以這樣理解,一般來說,producer 將消息投遞到 broker 或者直接到 queue 里了,consumer 從 queue 取出消息進(jìn)行消費(fèi),但某些時候由于特定的原因?qū)е?queue中的某些消息無法被消費(fèi),這樣的消息如果沒有后續(xù)的處理,就變成了死信,有死信自然就有了死信隊列。
- 應(yīng)用場景:為了保證訂單業(yè)務(wù)的消息數(shù)據(jù)不丟失,需要使用到 RabbitMQ 的死信隊列機(jī)制,當(dāng)消息消費(fèi)發(fā)生異常時,將消息投入死信隊列中.還有比如說: 用戶在商城下單成功并點擊去支付后在指定時間未支付時自動失效
來源
- 消息超出最大存活時間過期隊
- 列達(dá)到最大長度(隊列滿了,無法再添加數(shù)據(jù)到 mq 中)
- 消息被拒絕(basic.reject 或 basic.nack)并且 requeue=false.
<2>死信實戰(zhàn)
2.1架構(gòu)圖

2.2TTL模擬死信隊列
生產(chǎn)者
public class Producer {
private static final String NORMAL_EXCHANGE="normal_exchange";
public static void main(String[] args) {
try {
Channel channel= RabbitMQChannelUtil.getChannel();
if(channel == null){
return;
}
//聲明交換機(jī)類型
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
//設(shè)置消息TTL時間
AMQP.BasicProperties basicProperties=new AMQP.BasicProperties().builder().expiration("1000").build();
//用作演示消息隊列的限制個數(shù)
for (int i = 0; i <10 ; i++) {
String message="info"+i;
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",basicProperties,message.getBytes());
System.out.println("生產(chǎn)者發(fā)送消息");
}
}catch (Exception e){
e.printStackTrace();
}
}
}
普通消費(fèi)者:啟動之后關(guān)閉,模擬接收不到消息
public class NormalConsumer {
//普通交換機(jī)名稱
private static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交換機(jī)名稱
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMQChannelUtil.getChannel();
if(channel == null){
return;
}
//聲明死信和普通交換機(jī) 類型為 direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//聲明死信隊列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
//死信隊列綁定死信交換機(jī)與 routingkey
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
//正常隊列綁定死信隊列信息
Map<String, Object> params = new HashMap<>();
//正常隊列設(shè)置死信交換機(jī) 參數(shù) key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常隊列設(shè)置死信 routing-key 參數(shù) key 是固定值
params.put("x-dead-letter-routing-key", "lisi");
String normalQueue = "normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println("等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("NormalConsumer 接收到消息"+message);
};
channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
});
}
}
死信隊列消費(fèi)者
public class DeadConsumer {
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMQChannelUtil.getChannel();
if (channel == null) {
return;
}
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
System.out.println("等待接收死信隊列消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("DeadConsumer 接收死信隊列的消息" + message);
};
channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
});
}
另外兩種思路相同.
到此這篇關(guān)于RabbitMQ的基礎(chǔ)知識的文章就介紹到這了,更多相關(guān)RabbitMQ基礎(chǔ)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java創(chuàng)建student類詳細(xì)代碼例子
這篇文章主要給大家介紹了關(guān)于Java創(chuàng)建student類的相關(guān)資料,學(xué)生類(Student)是一種面向?qū)ο蟮木幊谈拍?其主要用于描述學(xué)生的屬性和行為,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-11-11
mybatis主從表關(guān)聯(lián)查詢,返回對象帶有集合屬性解析
這篇文章主要介紹了mybatis主從表關(guān)聯(lián)查詢,返回對象帶有集合屬性解析,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03


