RabbitMQ的基礎知識
RabbitMQ
1.對MQ的介紹
1.說明是MQ
MQ(message queue),從字面意思上看,本質是個隊列,F(xiàn)IFO 先入先出,只不過隊列中存放的內容是
message 而已,還是一種跨進程的通信機制,用于上下游傳遞消息。在互聯(lián)網架構中,MQ 是一種非常常
見的上下游“邏輯解耦+物理解耦”的消息通信服務。使用了 MQ 之后,消息發(fā)送上游只需要依賴 MQ,不
用依賴其他服務。
2.MQ的好處
1.流量消峰
舉個例子,如果訂單系統(tǒng)最多能處理一萬次訂單,這個處理能力應付正常時段的下單時綽綽有余,正常時段我們下單一秒后就能返回結果。但是在高峰期,如果有兩萬次下單操作系統(tǒng)是處理不了的,只能限制訂單超過一萬后不允許用戶下單。使用消息隊列做緩沖,我們可以取消這個限制,把一秒內下的訂單分散成一段時間來處理,這時有些用戶可能在下單十幾秒后才能收到下單成功的操作,但是比不能下單的體驗要好。
2.應用解耦
以電商應用為例,應用中有訂單系統(tǒng)、庫存系統(tǒng)、物流系統(tǒng)、支付系統(tǒng)。用戶創(chuàng)建訂單后,如果耦合調用庫存系統(tǒng)、物流系統(tǒng)、支付系統(tǒng),任何一個子系統(tǒng)出了故障,都會造成下單操作異常。當轉變成基于消息隊列的方式后,系統(tǒng)間調用的問題會減少很多,比如物流系統(tǒng)因為發(fā)生故障,需要幾分鐘來修復。在這幾分鐘的時間里,物流系統(tǒng)要處理的內存被緩存在消息隊列中,用戶的下單操作可以正常完成。當物流系統(tǒng)恢復后,繼續(xù)處理訂單信息即可,用戶感受不到物流系統(tǒng)的故障,提升系統(tǒng)的可用性。
- 異步處理
有些服務間調用是異步的,例如 A 調用 B,B 需要花費很長時間執(zhí)行,但是 A 需要知道 B 什么時候可以執(zhí)行完,以前一般有兩種方式,A 過一段時間去調用 B 的查詢 api 查詢?;蛘?A 提供一個 callback api, B 執(zhí)行完之后調用 api 通知 A 服務。這兩種方式都不是很優(yōu)雅,使用消息總線,可以很方便解決這個問題,A 調用 B 服務后,只需要監(jiān)聽 B 處理完成的消息,當 B 處理完成后,會發(fā)送一條消息給 MQ,MQ 會將此消息轉發(fā)給 A 服務。這樣 A 服務既不用循環(huán)調用 B 的查詢 api,也不用提供 callback api。同樣 B 服務也不用做這些操作。A 服務還能及時的得到異步處理成功的消息。
2.RabbitMQ的六種模式 及工作原理
工作模式
依次是:hello world ,工作模式,發(fā)布訂閱模式,路由模式,主題模式,發(fā)布確認模式
工作原理
Binding:exchange 和 queue 之間的虛擬連接,binding 中可以包含 routing key,Binding 信息被保
存到 exchange 中的查詢表中,用于 message 的分發(fā)依據
依賴
<!--rabbitmq 依賴客戶端--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency>
3.hello world隊列
1.生產者
public class Producer { //建立隊列 private static final String QUEUE_NAME="hello"; public static void main(String[] args) { //創(chuàng)建連接工場 ConnectionFactory factory=new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("guest"); factory.setUsername("guest"); try { //建立連接和信道 //channel 實現(xiàn)了自動 close 接口 自動關閉 不需要顯示關閉 Connection connection=factory.newConnection(); Channel channel=connection.createChannel(); /** * 生成一個隊列,并將信道和隊列連接 * 1.隊列名稱 * 2.隊列里面的消息是否持久化 默認消息存儲在內存中 * 3.該隊列是否只供一個消費者進行消費 是否進行共享 true 可以多個消費者消費 * 4.是否自動刪除 最后一個消費者端開連接以后 該隊列是否自動刪除 true 自動刪除 * 5.其他參數(shù) */ channel.queueDeclare(QUEUE_NAME,false,false,false,null); String message="hello world"; /** * 發(fā)送一個消息 * 1.發(fā)送到那個交換機 * 2.路由的 key 是哪個 * 3.其他的參數(shù)信息 * 4.發(fā)送消息的消息體 */ channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("消息發(fā)送成功"); }catch (Exception e){ e.printStackTrace(); } } }
消費者
public class Consumer { //定義隊列名 private static final String QUEUE_NAME="hello"; public static void main(String[] args) { //建立連接和信道 try { ConnectionFactory factory=new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection=factory.newConnection(); Channel channel=connection.createChannel(); System.out.println("等待接收消息"); /** *1.同一個會話, consumerTag 是固定的 可以做此會話的名字, deliveryTag 每次接收消息+1,可以做此消息處理通道的名字。 *2.包含消息的字節(jié)形式的類 */ DeliverCallback deliverCallback=(consumerTag,delivery)->{ String message=new String(delivery.getBody()); System.out.println(message); }; CancelCallback cancelCallback=(consumerTag)->{ System.out.println("消息消費被取消"); }; /* 消費者消費消息 * 1.消費哪個隊列 * 2.消費成功之后是否要自動應答 true 代表自動應答 false 手動應答 * 3.消費者未成功消費的回調 * 4.消費者取消消費的回調 */ channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); }catch (Exception e){ e.printStackTrace(); } } }
4.工作隊列模式
生產者
public class Producer { public static void main(String[] args) throws IOException, InterruptedException { Channel channel=RabbitMQChannelUtil.getChannel(); if(channel == null){ System.out.println("失敗"); return; } channel.queueDeclare(RabbitMQChannelUtil.QUEUE_NAME,false,false,false,null); int i=0; while (true){ String message="消息"+i; i++; /** * 發(fā)送一個消息 * 1.發(fā)送到那個交換機 * 2.路由的 key 是哪個 * 3.其他的參數(shù)信息 * 4.發(fā)送消息的消息體 */ channel.basicPublish("",RabbitMQChannelUtil.QUEUE_NAME,null,message.getBytes()); System.out.println(message); Thread.sleep(500); } } }
消費者
public class Consumer { public static void main(String[] args) { Channel channel=RabbitMQChannelUtil.getChannel(); if(channel == null){ System.out.println("消費失敗"); return; } DeliverCallback deliverCallback=(consumerTag, delivery)->{ String message=new String(delivery.getBody()); System.out.println(Thread.currentThread().getName()+"消費了"+message); }; CancelCallback cancelCallback=(consumerTag)->{ System.out.println("消息消費被取消"); }; Thread[] threads=new Thread[5]; for (int i = 0; i <threads.length ; i++) { threads[i]=new Thread(()->{ try { System.out.println(Thread.currentThread().getName()+"啟動等待消費"); channel.basicConsume(RabbitMQChannelUtil.QUEUE_NAME,true,deliverCallback,cancelCallback); } catch (IOException e) { e.printStackTrace(); } }); } for (int i = 0; i <threads.length ; i++) { threads[i].start(); } } }
5.消息應答機制
認識
消費者處理消息時,可能在處理過程中掛掉,那么消息就會丟失為了保證消息在發(fā)送過程中不丟失,rabbitmq 引入消息應答機制,消息應答就是:消費者在接收到消息并且處理該消息之后,告訴 rabbitmq 它已經處理了rabbitmq 可以把該消息刪除了。
自動應答
消息發(fā)送后立即被認為已經傳送成功,這種模式需要在高吞吐量和數(shù)據傳輸安全性方面做權衡,因為這種模式如果消息在接收到之前,消費者那邊出現(xiàn)連接或者 channel 關閉,那么消息就丟失了
手動應答
- Channel.basicAck(用于肯定確認)
RabbitMQ 已知道該消息并且成功的處理消息,可以將其丟棄了
- Channel.basicNack(用于否定確認)
- Channel.basicReject(用于否定確認)
與 Channel.basicNack 相比少一個參數(shù),不處理該消息了直接拒絕,可以將其丟棄了
Channel.basicNack參數(shù)中Multiple(批量應答) 的解釋
multiple 的 true 和 false 代表不同意思
- true 代表批量應答 channel 上未應答的消息
比如說 channel 上有傳送 tag 的消息 5,6,7,8 當前 tag 是 8 那么此時
5-8 的這些還未應答的消息都會被確認收到消息應答
- false
只會應答 tag=8 的消息 5,6,7 這三個消息依然不會被確認收到消息應答
消息手動應答的代碼
- 將手動應答開啟
/* 消費者消費消息 * 1.消費哪個隊列 * 2.消費成功之后是否要自動應答 true 代表自動應答 false 手動應答 * 3.當一個消息發(fā)送過來后的回調接口 * 4.消費者取消消費的回調 */ boolean ack=false; channel.basicConsume(QUEUE_NAME,ack,deliverCallback,cancelCallback);
- 消息消費回調時,使用手動應答
/** * 消息發(fā)送過來后的回調接口 *1.同一個會話, consumerTag 是固定的 可以做此會話的名字, deliveryTag 每次接收消息+1,可以做此消息處理通道的名字。 *2.消息類 */ DeliverCallback deliverCallback=(consumerTag,delivery)->{ String message=new String(delivery.getBody()); System.out.println(message); /** * 參數(shù)說明 * 1.消息的標記tag * 2.是否批量應答 */ channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); };
消息自動進行重新入隊
如果消費者由于某些原因失去連接(其通道已關閉,連接已關閉或 TCP 連接丟失),導致消息未發(fā)送 ACK 確認,RabbitMQ 將了解到消息未完全處理,并將對其重新排隊。如果此時其他消費者可以處理,它將很快將其重新分發(fā)給另一個消費者。這樣,即使某個消費者偶爾死亡,也可以確保不會丟失任何消息。
6.RabbitMQ的持久化,不公平分發(fā)及預取值
概念
剛剛我們已經看到了如何處理任務不丟失的情況,但是如何保障當 RabbitMQ 服務停掉以后消息生產者發(fā)送過來的消息不丟失。默認情況下 RabbitMQ 退出或由于某種原因崩潰時,它忽視隊列和消息,除非告知它不要這樣做。確保消息不會丟失需要做兩件事:我們需要將隊列和消息都標記為持久化。
隊列持久化
- 之前我們創(chuàng)建的隊列都是非持久化的,rabbitmq 如果重啟的化,該隊列就會被刪除掉,如果要隊列實現(xiàn)持久化 需要在聲明隊列的時候把 durable(第二個) 參數(shù)設置為持久化
- 但是需要注意的就是如果之前聲明的隊列不是持久化的,需要把原先隊列先刪除,或者重新創(chuàng)建一個持久化的隊列,不然就會出現(xiàn)錯誤
channel.queueDeclare(RabbitMQChannelUtil.QUEUE_NAME,true,false,false,null);
這個就是持久化隊列
消息持久化
- 要想讓消息實現(xiàn)持久化需要在消息生產者修改代碼,MessageProperties.PERSISTENT_TEXT_PLAIN 添加這個屬性。
隊列持久化為false時:
channel.basicPublish("",RabbitMQChannelUtil.QUEUE_NAME,null,message.getBytes());
隊列持久化為true時
channel.basicPublish("",RabbitMQChannelUtil.QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
- 將消息標記為持久化并不能完全保證不會丟失消息。盡管它告訴 RabbitMQ 將消息保存到磁盤,但是
這里依然存在當消息剛準備存儲在磁盤的時候 但是還沒有存儲完,消息還在緩存的一個間隔點。此時并沒
有真正寫入磁盤。持久性保證并不強.更強的持久化后面發(fā)布確認
會講到
不公平分發(fā)
在最開始的時候我們學習到 RabbitMQ 分發(fā)消息采用的輪訓分發(fā),但是在某種場景下這種策略并不是很好,比方說有兩個消費者在處理任務,其中有個消費者 1 處理任務的速度非???,而另外一個消費者 2處理速度卻很慢,這個時候我們還是采用輪訓分發(fā)的化就會到這處理速度快的這個消費者很大一部分時間處于空閑狀態(tài),而處理慢的那個消費者一直在干活,這種分配方式在這種情況下其實就不太好,但是RabbitMQ 并不知道這種情況它依然很公平的進行分發(fā)。
為了避免這種情況,我們設置不公平分發(fā):
channel.basicQos(1);
預取值
本身消息的發(fā)送就是異步發(fā)送的,所以在任何時候,channel 上肯定不止只有一個消息另外來自消費者的手動確認本質上也是異步的。因此這里就存在一個未確認的消息緩沖區(qū),因此希望開發(fā)人員能限制此緩沖區(qū)的大小,以避免緩沖區(qū)里面無限制的未確認消息問題。這個時候就可以通過使用 basic.qos 方法設置“預取計數(shù)”值來完成的。該值定義通道上允許的未確認消息的最大數(shù)量。一旦數(shù)量達到配置的數(shù)量,RabbitMQ 將停止在通道上傳遞更多消息,除非至少有一個未處理的消息被確認
prefetch就是預取值數(shù)
7.發(fā)布確認
上文持久化中提到,當消息持久化存入RabbitMQ磁盤時,RabbitMQ突然宕機,則消息未成功存入,會發(fā)生消息丟失。所以發(fā)布確認即:在消息成功存入磁盤時,返還給生產者一個消息,確認已經存入磁盤
具體介紹
生產者將信道設置成 confirm 模式,一旦信道進入 confirm 模式,所有在該信道上面發(fā)布的消息都將會被指派一個唯一的 ID(從 1 開始),一旦消息被投遞到所有匹配的隊列之后,broker就會發(fā)送一個確認給生產者(包含消息的唯一 ID),這就使得生產者知道消息已經正確到達目的隊列了,如果消息和隊列是可持久化的,那么確認消息會在將消息寫入磁盤之后發(fā)出,broker 回傳
給生產者的確認消息中 delivery-tag 域包含了確認消息的序列號,此外 broker 也可以設置basic.ack 的 multiple 域,表示到這個序列號之前的所有消息都已經得到了處理。
為了保證消息不丟失:
- 開啟隊列持久化
- 開啟消息持久化
- 開啟信道的發(fā)布確認
開啟發(fā)布確認的方法
channel.confirmSelect();
發(fā)布確認的模式
單個確認發(fā)布
public static void singleConfirm(){ try { Channel channel=RabbitMQChannelUtil.getChannel(); if(channel == null){ System.out.println("信道建立失敗"); return; } //開啟發(fā)布確認 channel.confirmSelect(); long begin=System.currentTimeMillis(); for (int i = 0; i <MESSAGE_COUNT ; i++) { String message=i+""; channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); //可以加時間參數(shù),當消息發(fā)送失敗或超過參數(shù)時間沒成功,則返回false boolean flag=channel.waitForConfirms(); //如果失敗可以重發(fā) if(flag){ System.out.println(message+"發(fā)送成功"); }else { //這里可以實現(xiàn)重發(fā) System.out.println(message+"發(fā)送失敗"); } } long end=System.currentTimeMillis(); System.out.println("發(fā)送"+MESSAGE_COUNT+"條消息,耗時"+(end-begin)+"ms"); }catch (Exception e){ e.printStackTrace(); } }
發(fā)布一個消息之后只有它被確認發(fā)布,后續(xù)的消息才能繼續(xù)發(fā)布,waitForConfirmsOrDie(long)這個方法只有在消息被確認的時候才返回,如果在指定時間范圍內這個消息沒有被確認那么它將拋出異常。
缺點:速度太慢
2.批量發(fā)布確認模式
public static void batchConfirm(){ try { Channel channel=RabbitMQChannelUtil.getChannel(); if(channel == null){ System.out.println("建立連接失敗"); return; } channel.queueDeclare(QUEUE_NAME, false, false, false, null); //當100條消息發(fā)布成功時,再確認 int ackMessageCount=100; //未確認的消息個數(shù) int needAckMessageCount=0; //開啟發(fā)布確認 channel.confirmSelect(); long begin=System.currentTimeMillis(); for (int i = 0; i <MESSAGE_COUNT ; i++) { String message=i+""; channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); needAckMessageCount++; if(needAckMessageCount == ackMessageCount){ //確認 channel.waitForConfirms(); needAckMessageCount=0; } } //判斷可能還有消息未發(fā)送,再發(fā)送依次 if(needAckMessageCount > 0){ channel.waitForConfirms(); } long end= System.currentTimeMillis(); System.out.println("發(fā)送"+MESSAGE_COUNT+"條消息,耗時"+(end-begin)+"ms"); }catch (Exception e){ e.printStackTrace(); } }
缺點:當發(fā)生故障導致發(fā)布出現(xiàn)問題時,不知道是哪個消息出現(xiàn)問題
3.異步確認發(fā)布
原理
有單獨一個隊列保存確認信號
public static void asyncConfirm() throws Exception { try (Channel channel = RabbitMQChannelUtil.getChannel()) { if(channel == null){ return; } channel.queueDeclare(QUEUE_NAME, false, false, false, null); //開啟發(fā)布確認 channel.confirmSelect(); /** * 線程安全有序的一個哈希表,適用于高并發(fā)的情況 * 1.輕松的將序號與消息進行關聯(lián) * 2.輕松批量刪除條目 只要給到序列號 * 3.支持并發(fā)訪問 */ ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>(); /** * 確認收到消息的一個回調 * 1.消息序列號 * 2.true 可以確認小于等于當前序列號的消息 * false 確認當前序列號消息 */ ConfirmCallback ackCallback = (sequenceNumber, multiple) -> { if (multiple) { //返回的是小于等于當前序列號的未確認消息 是一個 map ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber, true); //清除該部分未確認消息 confirmed.clear(); }else{ //只清除當前序列號的消息 outstandingConfirms.remove(sequenceNumber); } }; ConfirmCallback nackCallback = (sequenceNumber, multiple) -> { String message = outstandingConfirms.get(sequenceNumber); System.out.println("發(fā)布的消息"+message+"未被確認,序列號"+sequenceNumber); }; /** * 添加一個異步確認的監(jiān)聽器 * 1.確認收到消息的回調 * 2.未收到消息的回調 */ channel.addConfirmListener(ackCallback, null); long begin = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String message = "消息" + i; /** * channel.getNextPublishSeqNo()獲取下一個消息的序列號 * 通過序列號與消息體進行一個關聯(lián) * 全部都是未確認的消息體 */ outstandingConfirms.put(channel.getNextPublishSeqNo(), message); channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); } long end = System.currentTimeMillis(); System.out.println("發(fā)布" + MESSAGE_COUNT + "個異步確認消息,耗時" + (end - begin) + "ms"); } }
8.交換機
<1>交換機的認識
1.1 概念
RabbitMQ 消息傳遞模型的核心思想是: 生產者生產的消息從不會直接發(fā)送到隊列。實際上,通常生產者甚至都不知道這些消息傳遞傳遞到了哪些隊列中。
相反,生產者只能將消息發(fā)送到交換機(exchange),交換機工作的內容非常簡單,一方面它接收來自生產者的消息,另一方面將它們推入隊列。交換機必須確切知道如何處理收到的消息。是應該把這些消息放到特定隊列還是說把他們到許多隊列中還是說應該丟棄它們。這就的由交換機的類型來決定。
1.2Exchanges 的類型
總共有以下類型:
直接(direct), 主題(topic) ,標題(headers) , 扇出(fanout)
1.3無名Exchange
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
第一個參數(shù)是交換機的名稱??兆址硎灸J或無名稱交換機:消息能路由發(fā)送到隊列中其實
是由 routingKey(bindingkey)綁定 key 指定的,如果它存在的話
1.4臨時隊列
每當我們連接到 Rabbit 時,我們都需要一個全新的空隊列,為此我們可以創(chuàng)建一個具有隨機名稱的隊列,或者能讓服務器為我們選擇一個隨機隊列名稱那就更好了。其次一旦我們斷開了消費者的連接,隊列將被自動刪除。
String queueName = channel.queueDeclare().getQueue();
1.5隊列和交換機之間的綁定
String queueName = channel.queueDeclare().getQueue();
<2>交換機具體介紹
Fanout 刪除(廣播)
將接收到的所有消息廣播到它知道的所有隊列中。
Direct (直接)
將詳細發(fā)送到對應路由鍵的隊列上去
在上面這張圖中,我們可以看到 X 綁定了兩個隊列,綁定類型是 direct。隊列 Q1 綁定鍵為 orange,隊列 Q2 綁定鍵有兩個:一個綁定鍵為 black,另一個綁定鍵為 green.
在這種綁定情況下,生產者發(fā)布消息到 exchange 上,綁定鍵為 orange 的消息會被發(fā)布到隊列Q1。綁定鍵為 blackgreen 和的消息會被發(fā)布到隊列 Q2,其他消息類型的消息將被丟棄。
綁定
//聲明交換機名稱及類型 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //把該臨時隊列綁定我們的 exchange 其中 routingkey(也稱之為 binding key)為空字符串 channel.queueBind(queueName, EXCHANGE_NAME, "");
Topics(主題)
- 盡管使用 direct 交換機改進了我們的系統(tǒng),但是它仍然存在局限性-比方說我們想接收的日志類型有info.base 和 info.advantage,某個隊列只想 info.base 的消息,那這個時候 direct 就辦不到了。這個時候就只能使用 topic 類型
- 發(fā)送到類型是 topic 交換機的消息的 routing_key 不能隨意寫,必須滿足一定的要求,它**必須是一個單詞列表,以點號分隔開。這些單詞可以是任意單詞。但這個單詞列表最多不能超過 255 個字節(jié)。
- 可以代替一個單詞
- 可以替代零個或多個單詞
9.死信隊列
<1>認識死信隊列
概念
- 死信,顧名思義就是無法被消費的消息,字面意思可以這樣理解,一般來說,producer 將消息投遞到 broker 或者直接到 queue 里了,consumer 從 queue 取出消息進行消費,但某些時候由于特定的原因導致 queue中的某些消息無法被消費,這樣的消息如果沒有后續(xù)的處理,就變成了死信,有死信自然就有了死信隊列。
- 應用場景:為了保證訂單業(yè)務的消息數(shù)據不丟失,需要使用到 RabbitMQ 的死信隊列機制,當消息消費發(fā)生異常時,將消息投入死信隊列中.還有比如說: 用戶在商城下單成功并點擊去支付后在指定時間未支付時自動失效
來源
- 消息超出最大存活時間過期隊
- 列達到最大長度(隊列滿了,無法再添加數(shù)據到 mq 中)
- 消息被拒絕(basic.reject 或 basic.nack)并且 requeue=false.
<2>死信實戰(zhàn)
2.1架構圖
2.2TTL模擬死信隊列
生產者
public class Producer { private static final String NORMAL_EXCHANGE="normal_exchange"; public static void main(String[] args) { try { Channel channel= RabbitMQChannelUtil.getChannel(); if(channel == null){ return; } //聲明交換機類型 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); //設置消息TTL時間 AMQP.BasicProperties basicProperties=new AMQP.BasicProperties().builder().expiration("1000").build(); //用作演示消息隊列的限制個數(shù) for (int i = 0; i <10 ; i++) { String message="info"+i; channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",basicProperties,message.getBytes()); System.out.println("生產者發(fā)送消息"); } }catch (Exception e){ e.printStackTrace(); } } }
普通消費者:啟動之后關閉,模擬接收不到消息
public class NormalConsumer { //普通交換機名稱 private static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交換機名稱 private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMQChannelUtil.getChannel(); if(channel == null){ return; } //聲明死信和普通交換機 類型為 direct channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); //聲明死信隊列 String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); //死信隊列綁定死信交換機與 routingkey channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); //正常隊列綁定死信隊列信息 Map<String, Object> params = new HashMap<>(); //正常隊列設置死信交換機 參數(shù) key 是固定值 params.put("x-dead-letter-exchange", DEAD_EXCHANGE); //正常隊列設置死信 routing-key 參數(shù) key 是固定值 params.put("x-dead-letter-routing-key", "lisi"); String normalQueue = "normal-queue"; channel.queueDeclare(normalQueue, false, false, false, params); channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan"); System.out.println("等待接收消息....."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("NormalConsumer 接收到消息"+message); }; channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> { }); } }
死信隊列消費者
public class DeadConsumer { private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMQChannelUtil.getChannel(); if (channel == null) { return; } channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); System.out.println("等待接收死信隊列消息....."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("DeadConsumer 接收死信隊列的消息" + message); }; channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> { }); }
另外兩種思路相同.
到此這篇關于RabbitMQ的基礎知識的文章就介紹到這了,更多相關RabbitMQ基礎內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
mybatis主從表關聯(lián)查詢,返回對象帶有集合屬性解析
這篇文章主要介紹了mybatis主從表關聯(lián)查詢,返回對象帶有集合屬性解析,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03