Java中RabbitMQ消息隊(duì)列的交換機(jī)詳解
RabbitMQ交換機(jī)
交換機(jī)屬性
- Name:交換機(jī)名稱(chēng)
- Type:交換機(jī)類(lèi)型 direct、topic、fanout、headers
- Durability:是否需要持久化,true為持久化
- Auto Delete:當(dāng)最后一個(gè)綁定到Exchange上的隊(duì)列刪除后,自動(dòng)刪除該Exchange
- Internal:當(dāng)前Exchange是否用于RabbitMQ內(nèi)部使用,默認(rèn)為False
- Arguments:擴(kuò)展參數(shù),用于擴(kuò)展AMQP協(xié)議,定制化使用
直流交換機(jī)
直連交換機(jī)Direct Exchange(完全匹配路由key)
所有發(fā)送到Direct Exchange的消息會(huì)被轉(zhuǎn)發(fā)到RouteKey中指定的Queue
注意:Direct模式可以使用RabbitMQ自帶的Exchange:default Exchange,所以不需要將Exchange進(jìn)行任何綁定(binding)操作,消息傳遞時(shí),RouteKey必須完全匹配才會(huì)被隊(duì)列接收,否則該消息會(huì)被拋棄;
消費(fèi)端代碼
package com.xieminglu.rabbitmqapi.exchange.direct; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; public class Consumer4DirectExchange { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory() ; connectionFactory.setHost("192.168.248.134"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setAutomaticRecoveryEnabled(true); connectionFactory.setNetworkRecoveryInterval(3000); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //4 聲明 String exchangeName = "test_direct_exchange"; String exchangeType = "direct"; String queueName = "test_direct_queue"; String routingKey = "test.direct"; //表示聲明了一個(gè)交換機(jī) channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); //表示聲明了一個(gè)隊(duì)列 channel.queueDeclare(queueName, false, false, false, null); //建立一個(gè)綁定關(guān)系: channel.queueBind(queueName, exchangeName, routingKey); //durable 是否持久化消息 QueueingConsumer consumer = new QueueingConsumer(channel); //參數(shù):隊(duì)列名稱(chēng)、是否自動(dòng)ACK、Consumer channel.basicConsume(queueName, true, consumer); //循環(huán)獲取消息 while(true){ //獲取消息,如果沒(méi)有消息,這一步將會(huì)一直阻塞 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息:" + msg); } } }
生產(chǎn)端代碼
package com.xieminglu.rabbitmqapi.exchange.direct; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @author 小李飛刀 * @site www.javaxl.com * @company * @create 2019-11-18 10:22 */ public class Producer4DirectExchange { public static void main(String[] args) throws Exception { //1 創(chuàng)建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.248.134"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2 創(chuàng)建Connection Connection connection = connectionFactory.newConnection(); //3 創(chuàng)建Channel Channel channel = connection.createChannel(); //4 聲明 String exchangeName = "test_direct_exchange"; String routingKey = "test.direct"; // String routingKey = "test.direct111"; //收不到 //5 發(fā)送 String msg = "Hello World RabbitMQ 4 Direct Exchange Message 111 ... "; channel.basicPublish(exchangeName, routingKey , null , msg.getBytes()); } }
代碼的區(qū)別: 一條消息只會(huì)發(fā)送在一個(gè)隊(duì)列里
創(chuàng)建一個(gè)交換機(jī)與隊(duì)列
所綁定的交換機(jī)
控制臺(tái)輸出
主題交換機(jī)
主題交換機(jī)Topic Exchange(匹配路由規(guī)則的交換機(jī))
所有發(fā)送到Topic Exchange的消息被轉(zhuǎn)發(fā)到所有關(guān)系RouteKey中指定Topic的Queue上;
Exchange將RouteKey和某Topic進(jìn)行模糊匹配,此時(shí)隊(duì)列需要綁定一個(gè)Topic;
注意:可以使用通配符進(jìn)行模糊匹配
- 符號(hào):“#” 匹配一個(gè)或者多個(gè)詞
- 符號(hào):“” 匹配不多不少一個(gè)詞
列如:
- “log.#” 能夠匹配到 “log.info.oa”
- “log.” 能夠匹配到 “log.err”
消費(fèi)端代碼
package com.xieminglu.rabbitmqapi.exchange.topic; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; public class Consumer4TopicExchange { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory() ; connectionFactory.setHost("192.168.248.134"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setAutomaticRecoveryEnabled(true); connectionFactory.setNetworkRecoveryInterval(3000); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //4 聲明 String exchangeName = "test_topic_exchange"; String exchangeType = "topic"; String queueName = "test_topic_queue"; String routingKey = "user.#"; // String routingKey = "user.*"; // 1 聲明交換機(jī) channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); // 2 聲明隊(duì)列 channel.queueDeclare(queueName, false, false, false, null); // 3 建立交換機(jī)和隊(duì)列的綁定關(guān)系: channel.queueBind(queueName, exchangeName, routingKey); //durable 是否持久化消息 QueueingConsumer consumer = new QueueingConsumer(channel); //參數(shù):隊(duì)列名稱(chēng)、是否自動(dòng)ACK、Consumer channel.basicConsume(queueName, true, consumer); //循環(huán)獲取消息 while(true){ //獲取消息,如果沒(méi)有消息,這一步將會(huì)一直阻塞 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息:" + msg); } } }
生產(chǎn)端代碼
package com.xieminglu.rabbitmqapi.exchange.topic; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer4TopicExchange { public static void main(String[] args) throws Exception { //1 創(chuàng)建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.248.134"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2 創(chuàng)建Connection Connection connection = connectionFactory.newConnection(); //3 創(chuàng)建Channel Channel channel = connection.createChannel(); //4 聲明 String exchangeName = "test_topic_exchange"; String routingKey1 = "user.save"; String routingKey2 = "user.update"; String routingKey3 = "user.delete.abc"; //5 發(fā)送 String msg = "Hello World RabbitMQ 4 Topic Exchange Message ..."; channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes()); channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes()); channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes()); channel.close(); connection.close(); } }
代碼的區(qū)別: 一條消息會(huì)發(fā)送在多個(gè)隊(duì)列里 消費(fèi)端:
生產(chǎn)端:
控制臺(tái)輸出
并且可以同時(shí)綁定多個(gè)交換機(jī)
輸出交換機(jī)
輸出交換機(jī)Fanout Exchange(不做路由)
- 不處理路由鍵,只需要簡(jiǎn)單的將隊(duì)列綁定到交換機(jī)上;
- 發(fā)送到交換機(jī)的消息都會(huì)被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上;
- Fanout交換機(jī)轉(zhuǎn)發(fā)消息是最快的
消費(fèi)端代碼
package com.xieminglu.rabbitmqapi.exchange.fanout; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; public class Consumer4FanoutExchange { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory() ; connectionFactory.setHost("192.168.248.134"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setAutomaticRecoveryEnabled(true); connectionFactory.setNetworkRecoveryInterval(3000); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //4 聲明 String exchangeName = "test_fanout_exchange"; String exchangeType = "fanout"; String queueName = "test_fanout_queue"; String routingKey = ""; //不設(shè)置路由鍵 channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); //durable 是否持久化消息 QueueingConsumer consumer = new QueueingConsumer(channel); //參數(shù):隊(duì)列名稱(chēng)、是否自動(dòng)ACK、Consumer channel.basicConsume(queueName, true, consumer); //循環(huán)獲取消息 while(true){ //獲取消息,如果沒(méi)有消息,這一步將會(huì)一直阻塞 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息:" + msg); } } }
生產(chǎn)端代碼
package com.xieminglu.rabbitmqapi.exchange.fanout; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer4FanoutExchange { public static void main(String[] args) throws Exception { //1 創(chuàng)建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.248.134"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2 創(chuàng)建Connection Connection connection = connectionFactory.newConnection(); //3 創(chuàng)建Channel Channel channel = connection.createChannel(); //4 聲明 String exchangeName = "test_fanout_exchange"; //5 發(fā)送 for(int i = 0; i < 10; i ++) { String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ..."; channel.basicPublish(exchangeName, "", null , msg.getBytes()); } channel.close(); connection.close(); } }
消費(fèi)端:
生產(chǎn)端:
控制臺(tái)輸出
Binding-綁定
- Exchange和Exchange、Queue之間的連接關(guān)系;
- Binding中可以包含RoutingKey或者參數(shù)
Queue-消息隊(duì)列
- 消息隊(duì)列,實(shí)際存儲(chǔ)消息數(shù)據(jù)
- Durability:是否持久化
- Durable:是,Transient:否
- Auto delete:如選yes,代表當(dāng)最后一個(gè)監(jiān)聽(tīng)被移除之后,該Queue會(huì)自動(dòng)被刪除
Message-消息
- 服務(wù)器和應(yīng)用程序之間傳遞的數(shù)據(jù)
- 本質(zhì)上就是一段數(shù)據(jù),由Properties和Payload(Body)組成
- 常用屬性:delivery model、headers(自定義屬性)
Message-其他屬性
- content_type、content_encoding、priority
- correlation_id、reply_to、expiration、message_id
- Timestamp、type、user_id、app_id、cluster_id
Virtual host-虛擬主機(jī)
- 虛擬地址,用于進(jìn)行邏輯隔離,最上層的消息路由
- 一個(gè)Virtual Host里面可以有若干個(gè)Exchange和Queue
- 同一個(gè)Virtual Host里面不能有相同名稱(chēng)的Exchange或Queue
總結(jié)一下
交換機(jī)的概念
在沒(méi)有交換機(jī)的時(shí)候,我們的消息隊(duì)列會(huì)處理所有發(fā)給這個(gè)消息隊(duì)列的消息,然后由消費(fèi)者一個(gè)一個(gè)消費(fèi)這個(gè)隊(duì)列里面的消息,如果由集群的話(huà)還會(huì)分?jǐn)倢?duì)這個(gè)消息隊(duì)列的處理。只不過(guò)這里面有一個(gè)
Message acknowledgment的概念
這將會(huì)導(dǎo)致嚴(yán)重的bug——Queue中堆積的消息會(huì)越來(lái)越多
當(dāng)然一般的消息中間件都不會(huì)這么干,我們使用了交換機(jī)后,我們看到我們的三種策略,其實(shí)都可以說(shuō)由交換機(jī)去找跟它所綁定的消息隊(duì)列,如果生產(chǎn)端的路由鍵不符合要求或找不到消息隊(duì)列定好的路由鍵的話(huà)就會(huì)進(jìn)行其他處理。
到此這篇關(guān)于Java中RabbitMQ消息隊(duì)列的交換機(jī)詳解的文章就介紹到這了,更多相關(guān)RabbitMQ交換機(jī)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringMVC配置多個(gè)properties文件之通配符解析
這篇文章主要介紹了SpringMVC配置多個(gè)properties文件之通配符解析,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09java實(shí)現(xiàn)socket從服務(wù)器連續(xù)獲取消息的示例
這篇文章主要介紹了java實(shí)現(xiàn)socket從服務(wù)器連續(xù)獲取消息的示例,需要的朋友可以參考下2014-04-04Struts1簡(jiǎn)介和入門(mén)_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
這篇文章主要為大家詳細(xì)介紹了Struts1簡(jiǎn)介和入門(mén)的相關(guān)資料,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-09-09Intellj?idea新建的java源文件夾不是藍(lán)色的圖文解決辦法
idea打開(kāi)java項(xiàng)目后新建的模塊中,java文件夾需要變成藍(lán)色,這篇文章主要給大家介紹了關(guān)于Intellj?idea新建的java源文件夾不是藍(lán)色的相關(guān)資料,文中通過(guò)圖文介紹的非常詳細(xì),需要的朋友可以參考下2024-02-02雙token實(shí)現(xiàn)token超時(shí)策略示例
用于restful的app應(yīng)用無(wú)狀態(tài)無(wú)sesion登錄示例,需要的朋友可以參考下2014-02-02mybatis使用foreach查詢(xún)不出結(jié)果也不報(bào)錯(cuò)的問(wèn)題
這篇文章主要介紹了mybatis使用foreach查詢(xún)不出結(jié)果也不報(bào)錯(cuò)的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03解決BigDecimal轉(zhuǎn)long丟失精度的問(wèn)題
這篇文章主要介紹了解決BigDecimal轉(zhuǎn)long丟失精度的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-12-12Spring Boot啟動(dòng)過(guò)程(六)之內(nèi)嵌Tomcat中StandardHost、StandardContext和Sta
這篇文章主要介紹了Spring Boot啟動(dòng)過(guò)程(六)之內(nèi)嵌Tomcat中StandardHost、StandardContext和StandardWrapper的啟動(dòng)教程詳解,需要的朋友可以參考下2017-04-04解決idea update project 更新選項(xiàng)消失的問(wèn)題
這篇文章主要介紹了解決idea update project 更新選項(xiàng)消失的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-01-01如何解決springboot讀取配置文件的中文亂碼問(wèn)題
本篇文章主要介紹了如何解決springboot讀取配置文件的中文亂碼問(wèn)題,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看2018-05-05