實(shí)戰(zhàn)干貨之基于SpringBoot的RabbitMQ多種模式隊(duì)列
環(huán)境準(zhǔn)備
安裝RabbitMQ
由于RabbitMQ的安裝比較簡(jiǎn)單,這里不再贅述??勺孕械焦倬W(wǎng)下載http://www.rabbitmq.com/download.html
依賴
SpringBoot項(xiàng)目導(dǎo)入依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
連接配置
配置文件添加如下配置(根據(jù)自身情況修改配置)
spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #spring.rabbitmq.virtual-host=acelin
五種隊(duì)列模式實(shí)現(xiàn)
1 點(diǎn)對(duì)點(diǎn)的隊(duì)列
在java配置文件DirectRabbitConfig
中先聲明一個(gè)隊(duì)列用于接收信息
public static final String PEER_TO_PEER_QUEUE = "peer-to-peer-queue"; // 點(diǎn)對(duì)點(diǎn)隊(duì)列 /******************************* Peer-to-peer ******************************/ @Bean public Queue peer2peerQueue() { return new Queue(PEER_TO_PEER_QUEUE,true); }
創(chuàng)建一個(gè)消費(fèi)者類Peer2PeerConsumers
。用@RabbitListener
對(duì)聲明的隊(duì)列進(jìn)行監(jiān)聽(tīng)
@Component public class Peer2PeerConsumers extends Base { @RabbitListener(queues = DirectRabbitConfig.PEER_TO_PEER_QUEUE) public void consumer2(Object testMessage) { logger.debug("peer-to-peer消費(fèi)者收到消息 : " + testMessage.toString()); } }
創(chuàng)造一個(gè)消息生產(chǎn)者。在編碼形式上,直接把消息發(fā)發(fā)送給接收的消息隊(duì)列
/** * 【點(diǎn)對(duì)點(diǎn)模式】 * @param task 消息內(nèi)容 **/ @PostMapping("/peer-to-peer/{task}") public String peerToPeer(@PathVariable("task") String task){ rabbitTemplate.convertAndSend(DirectRabbitConfig.PEER_TO_PEER_QUEUE,task); return "ok"; }
啟動(dòng)項(xiàng)目。隊(duì)列綁定到默認(rèn)交換機(jī)
調(diào)用生產(chǎn)者接口產(chǎn)生消息,可看到的消費(fèi)者立即接收到信息
peer-to-peer消費(fèi)者收到消息 : (Body:'hi mq' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=peer-to-peer-queue, deliveryTag=1, consumerTag=amq.ctag-vuKWCYLNLn3GwRJKJO5-Mg, consumerQueue=peer-to-peer-queue])
這里要說(shuō)明一點(diǎn)的是,點(diǎn)對(duì)點(diǎn)模式雖然編碼形式只與隊(duì)列交互,但其本質(zhì)上還是要跟交換機(jī)交互的,本質(zhì)跟下面要介紹的路由模式其實(shí)是一樣的。
查看convertAndSend
方法的源碼,可以看到我們雖然沒(méi)有進(jìn)行交換機(jī)和隊(duì)列的綁定,發(fā)送消息是也沒(méi)指定交換機(jī),但是程序會(huì)為我們綁定默認(rèn)的交換機(jī)。
The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. It is not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted.
默認(rèn)交換機(jī)會(huì)隱式綁定到每個(gè)隊(duì)列,路由鍵等于隊(duì)列名稱。我們無(wú)法明確綁定到默認(rèn)交換機(jī)或從默認(rèn)交換中解除綁定。它也無(wú)法刪除。
且我們第一個(gè)參數(shù)傳遞的是隊(duì)列的名稱,但實(shí)際上程序是以這個(gè)名字作為路由,將同名隊(duì)列跟默認(rèn)交換機(jī)做綁定。所以的消息會(huì)根據(jù)該路由信息,通過(guò)默認(rèn)交換機(jī)分發(fā)到同名隊(duì)列上。(我們通過(guò)接收的信息receivedRoutingKey=peer-to-peer-queue
和consumerQueue=peer-to-peer-queue
也可以看的出來(lái))
2 工作隊(duì)列模式Work Queue
在java配置文件DirectRabbitConfig
中先聲明一個(gè)工作隊(duì)列
public static final String WORK_QUEUE = "work-queue"; // 工作隊(duì)列 /******************************* Work Queue ******************************/ @Bean public Queue workQueue() { return new Queue(WORK_QUEUE,true); }
創(chuàng)建一個(gè)消費(fèi)者類WorkConsumers
。同樣用@RabbitListener
對(duì)聲明的隊(duì)列進(jìn)行監(jiān)聽(tīng)
@Component public class WorkConsumers extends Base { @RabbitListener(queues = DirectRabbitConfig.WORK_QUEUE) public void consumer1(Object testMessage) { logger.debug("work消費(fèi)者[1]收到消息 : " + testMessage.toString()); } @RabbitListener(queues = DirectRabbitConfig.WORK_QUEUE) public void consumer2(Object testMessage) { logger.debug("work消費(fèi)者[2]收到消息 : " + testMessage.toString()); } }
創(chuàng)造一個(gè)消息生產(chǎn)者。在編碼形式上,直接把消息發(fā)發(fā)送給接收的消息隊(duì)列
/** * 【工作隊(duì)列模式】 * @param task 消息內(nèi)容 **/ @PostMapping("/work/{task}") public String sendWorkMessage(@PathVariable("task") String task){ rabbitTemplate.convertAndSend(DirectRabbitConfig.WORK_QUEUE,task); return "ok"; }
啟動(dòng)項(xiàng)目,同樣的,工作隊(duì)列也是綁定到默認(rèn)交換機(jī)。
調(diào)用生產(chǎn)者接口連續(xù)發(fā)送幾次消息,可看到兩個(gè)消費(fèi)者競(jìng)爭(zhēng)對(duì)隊(duì)列消息進(jìn)行消費(fèi),一條消息只被一個(gè)消費(fèi)者消費(fèi),不會(huì)出現(xiàn)重復(fù)消費(fèi)的情況,因此工作隊(duì)列模式也被稱為競(jìng)爭(zhēng)消費(fèi)者模式。
- work消費(fèi)者[1]收到消息 : (Body:'task1' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=work-queue, deliveryTag=1, consumerTag=amq.ctag-PUYjfVq56aEn-7a9DzLNzQ, consumerQueue=work-queue])
- work消費(fèi)者[2]收到消息 : (Body:'task2' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=work-queue, deliveryTag=1, consumerTag=amq.ctag-1IVtDalFUCKVvYpFr_GF8A, consumerQueue=work-queue])
- work消費(fèi)者[1]收到消息 : (Body:'task3' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=work-queue, deliveryTag=2, consumerTag=amq.ctag-PUYjfVq56aEn-7a9DzLNzQ, consumerQueue=work-queue])
- work消費(fèi)者[2]收到消息 : (Body:'task4' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=work-queue, deliveryTag=2, consumerTag=amq.ctag-1IVtDalFUCKVvYpFr_GF8A, consumerQueue=work-queue])
事實(shí)上,競(jìng)爭(zhēng)消費(fèi)者模式本質(zhì)就是多個(gè)消費(fèi)者對(duì)同一個(gè)隊(duì)列消息進(jìn)行消費(fèi)。另外,與點(diǎn)對(duì)點(diǎn)模式一樣,工作隊(duì)列模式的也是用到了默認(rèn)交換機(jī)進(jìn)行消息分發(fā)。因此于基于的Direct交換機(jī)的路由模式的原理本質(zhì)上都是一樣的,因此,某種程度上,我們也可以用路由模式實(shí)現(xiàn)工作隊(duì)列模式,這點(diǎn)我們下面介紹路由模式再進(jìn)行展開(kāi)
3 路由模式Routing
在java配置文件DirectRabbitConfig
中先聲明2個(gè)隊(duì)列和一個(gè)direct類型的交換機(jī),然后將隊(duì)列1和與交換機(jī)用一個(gè)路由鍵1進(jìn)行綁定,隊(duì)列2用路由鍵2與隊(duì)列進(jìn)行綁定
public static final String DIRECT_QUEUE_ONE = "directQueue-1"; // Direct隊(duì)列名稱1 public static final String DIRECT_QUEUE_TWO = "directQueue-2"; // Direct隊(duì)列名稱2 public static final String MY_DIRECT_EXCHANGE = "myDirectExchange"; // Direct交換機(jī)名稱 public static final String ROUTING_KEY_ONE = "direct.routing-key-1"; // direct路由標(biāo)識(shí)1 public static final String ROUTING_KEY_ONE = "direct.routing-key-2"; // direct路由標(biāo)識(shí)2 /******************************* Direct ******************************/ @Bean public Queue directQueueOne() { return new Queue(DIRECT_QUEUE_ONE,true); } @Bean public Queue directQueueTwo() { return new Queue(DIRECT_QUEUE_TWO,true); } @Bean public DirectExchange directExchange() { return new DirectExchange(MY_DIRECT_EXCHANGE,true,false); } @Bean public Binding bindingDirectOne() { return BindingBuilder.bind(directQueueOne()).to(directExchange()).with(ROUTING_KEY_ONE); } @Bean public Binding bindingDirectTwo() { return BindingBuilder.bind(directQueueTwo()).to(directExchange()).with(ROUTING_KEY_TWO); }
創(chuàng)建一個(gè)消費(fèi)者類DirectConsumers
。在每個(gè)消費(fèi)者上,我們用3個(gè)消費(fèi)者注解@RabbitListener
對(duì)聲明的隊(duì)列進(jìn)行監(jiān)聽(tīng)。消費(fèi)者1和3監(jiān)聽(tīng)隊(duì)列1,消費(fèi)者2監(jiān)聽(tīng)隊(duì)列2
@Component public class DirectConsumers extends Base { @RabbitListener(queues = DirectRabbitConfig.DIRECT_QUEUE_ONE) public void consumer1(Object testMessage) { logger.debug("Direct消費(fèi)者[1]收到消息 : " + testMessage.toString()); } @RabbitListener(queues = DirectRabbitConfig.DIRECT_QUEUE_TWO) public void consumer2(Object testMessage) { logger.debug("Direct消費(fèi)者[2]收到消息 : " + testMessage.toString()); } @RabbitListener(queues = DirectRabbitConfig.DIRECT_QUEUE_ONE) public void consumer3(Object testMessage) { logger.debug("Direct消費(fèi)者[3]收到消息 : " + testMessage.toString()); } }
創(chuàng)造一個(gè)消息生產(chǎn)者。發(fā)送消息時(shí),帶上路由鍵1信息
/** * 【Direct路由模式】 * @param message 消息內(nèi)容 **/ @PostMapping("/direct/{message}") public String sendDirectMessage(@PathVariable("message") String message) { Map<String, Object> map = new HashMap<>(); map.put("messageId", String.valueOf(UUID.randomUUID())); map.put("messageData", message); /* 設(shè)置路由標(biāo)識(shí)MY_ROUTING_KEY,發(fā)送到交換機(jī)MY_DIRECT_EXCHANGE */ rabbitTemplate.convertAndSend(DirectRabbitConfig.MY_DIRECT_EXCHANGE,DirectRabbitConfig.ROUTING_KEY_ONE, map); return "ok"; }
啟動(dòng)項(xiàng)目,查看該交換機(jī)的綁定情況
發(fā)送多條信息,可以看到,由于隊(duì)列2沒(méi)有通過(guò)路由鍵1跟交換機(jī)進(jìn)行綁定,所以對(duì)于監(jiān)控隊(duì)列2的消費(fèi)者2,其無(wú)法結(jié)束到的帶有路由鍵1的消息,而消費(fèi)者1和3則競(jìng)爭(zhēng)消費(fèi)隊(duì)列1的消息
- Direct消費(fèi)者[3]收到消息 : (Body:'{messageId=54682b16-0142-46af-be0c-1156df1f27a7, messageData=msg-1}' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myDirectExchange, receivedRoutingKey=direct.routing-key-1, deliveryTag=15, consumerTag=amq.ctag-CsuZL9KKByH9IDtqTKe-fg, consumerQueue=directQueue-1])
- Direct消費(fèi)者[1]收到消息 : (Body:'{messageId=66cd296a-9a60-4458-8e87-72ed13f9964b, messageData=msg-2}' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myDirectExchange, receivedRoutingKey=direct.routing-key-1, deliveryTag=2, consumerTag=amq.ctag-hWmdY04YuLL0O2rgeSlxsw, consumerQueue=directQueue-1])
- Direct消費(fèi)者[3]收到消息 : (Body:'{messageId=48c0830e-2207-47ec-bd3e-a958fec48118, messageData=msg-3}' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myDirectExchange, receivedRoutingKey=direct.routing-key-1, deliveryTag=16, consumerTag=amq.ctag-CsuZL9KKByH9IDtqTKe-fg, consumerQueue=directQueue-1])
我們?nèi)绻麑?duì)新增一個(gè)隊(duì)列3,通過(guò)路由鍵1與交換機(jī)進(jìn)行綁定,消費(fèi)者獨(dú)立監(jiān)聽(tīng)隊(duì)列3,那么我們不難猜到,隊(duì)列3將和隊(duì)列1同樣拿到一條消息,相當(dāng)于廣播的概念,但我們會(huì)發(fā)現(xiàn)如果要這么做,似乎路由鍵無(wú)足輕重,因此rabbitmq提供了一種特殊的交換機(jī)來(lái)處理這種場(chǎng)景,不需要路由鍵的參與。我們接著往下看
4 發(fā)布/訂閱模式Publish/Subscribe
在java配置文件DirectRabbitConfig
中先聲明Fanout交換機(jī)和兩隊(duì)列,并將兩個(gè)隊(duì)列與該交換機(jī)進(jìn)行綁定
public static final String MY_FANOUT_EXCHANGE = "myFanoutExchange"; // Fanout交換機(jī)名稱 public static final String FANOUT_QUEUE_ONE = "fanout-queue-1"; // Fanout隊(duì)列名稱1 public static final String FANOUT_QUEUE_TWO = "fanout-queue-2"; // Fanout隊(duì)列名稱2 /******************************* Fanout ******************************/ @Bean public Queue fanoutQueueOne() { return new Queue(FANOUT_QUEUE_ONE,true); } @Bean public Queue fanoutQueueTwo() { return new Queue(FANOUT_QUEUE_TWO,true); } @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange(MY_FANOUT_EXCHANGE,true,false); } @Bean public Binding bindingFanoutOne() { return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange()); } @Bean public Binding bindingFanoutTwo() { return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange()); }
創(chuàng)建一個(gè)消費(fèi)者類FanoutConsumers
。創(chuàng)建兩個(gè)消費(fèi)者,分表對(duì)兩個(gè)隊(duì)列進(jìn)行監(jiān)聽(tīng)
@Component public class FanoutConsumers extends Base { @RabbitListener(queues = DirectRabbitConfig.FANOUT_QUEUE_ONE) public void consumer1(Object testMessage) { logger.debug("FANOUT消費(fèi)者[1]收到消息 : " + testMessage.toString()); } @RabbitListener(queues = DirectRabbitConfig.FANOUT_QUEUE_TWO) public void consumer2(Object testMessage) { logger.debug("FANOUT消費(fèi)者[2]收到消息 : " + testMessage.toString()); } }
創(chuàng)造一個(gè)消息生產(chǎn)者。將消息發(fā)送給Fanout交換機(jī)
/** * 【工作隊(duì)列模式】 * @param task 消息內(nèi)容 **/ @PostMapping("/work/{task}") public String sendWorkMessage(@PathVariable("task") String task){ rabbitTemplate.convertAndSend(DirectRabbitConfig.WORK_QUEUE,task); return "ok"; }
啟動(dòng)項(xiàng)目,我們可以看到交換機(jī)與兩個(gè)隊(duì)列進(jìn)行了綁定,但是路由鍵那一欄是空的。
發(fā)送兩條消息。
/** * 【Fanout發(fā)布訂閱模式】 * @param message 消息內(nèi)容 **/ @PostMapping("/fanout/{message}") public String sendFanoutMessage(@PathVariable("message") String message) { Map<String, Object> map = new HashMap<>(); map.put("messageId", String.valueOf(UUID.randomUUID())); map.put("messageData", message); /* 直接跟交換機(jī)MY_FANOUT_EXCHANGE交互 */ rabbitTemplate.setExchange(DirectRabbitConfig.MY_FANOUT_EXCHANGE); rabbitTemplate.convertAndSend(map); return "ok"; }
可以看到,兩個(gè)消費(fèi)者都拿到了同樣的數(shù)據(jù),達(dá)到了廣播的效果。
- FANOUT消費(fèi)者[2]收到消息 : (Body:'{messageId=a4bf1931-1db8-4cb9-8b01-397f43a82660, messageData=Hi Fanout}' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myFanoutExchange, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-ncVmsRM7xHLZ0iAJT2tSTg, consumerQueue=fanout-queue-2])
- FANOUT消費(fèi)者[1]收到消息 : (Body:'{messageId=a4bf1931-1db8-4cb9-8b01-397f43a82660, messageData=Hi Fanout}' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myFanoutExchange, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-zR3Oi0MVESq8qushlAMa3Q, consumerQueue=fanout-queue-1])
- FANOUT消費(fèi)者[1]收到消息 : (Body:'{messageId=51f66720-35dd-4abf-9d33-24acf7786ed8, messageData=666}' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myFanoutExchange, receivedRoutingKey=, deliveryTag=2, consumerTag=amq.ctag-zR3Oi0MVESq8qushlAMa3Q, consumerQueue=fanout-queue-1])
- FANOUT消費(fèi)者[2]收到消息 : (Body:'{messageId=51f66720-35dd-4abf-9d33-24acf7786ed8, messageData=666}' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myFanoutExchange, receivedRoutingKey=, deliveryTag=2, consumerTag=amq.ctag-ncVmsRM7xHLZ0iAJT2tSTg, consumerQueue=fanout-queue-2])
5 通配符模式Topics
在java配置文件DirectRabbitConfig
中先聲明一個(gè)Topic交換機(jī)、兩個(gè)工作隊(duì)列和三個(gè)通配綁定鍵,其中一個(gè)隊(duì)列通過(guò)兩個(gè)不同通配綁定鍵與交換機(jī)綁定,另外一個(gè)隊(duì)列用第三個(gè)綁定鍵進(jìn)行綁定。
public static final String WORK_QUEUE = "work-queue"; // 工作隊(duì)列 /******************************* Work Queue ******************************/ @Bean public Queue workQueue() { return new Queue(WORK_QUEUE,true); }
通過(guò)rabbitmq管理頁(yè)面我們可以看到交換機(jī)與隊(duì)列的綁定變化,可以看到隊(duì)列1車工綁定了兩個(gè)通配鍵
創(chuàng)建一個(gè)消費(fèi)者類TopicConsumers
。創(chuàng)建兩個(gè)消費(fèi)者分別對(duì)兩個(gè)隊(duì)列做監(jiān)聽(tīng)。
@Component public class WorkConsumers extends Base { @RabbitListener(queues = DirectRabbitConfig.WORK_QUEUE) public void consumer1(Object testMessage) { logger.debug("work消費(fèi)者[1]收到消息 : " + testMessage.toString()); } @RabbitListener(queues = DirectRabbitConfig.WORK_QUEUE) public void consumer2(Object testMessage) { logger.debug("work消費(fèi)者[2]收到消息 : " + testMessage.toString()); } }
創(chuàng)造一個(gè)消息生產(chǎn)者。發(fā)送3條不同的消息,分別帶上三個(gè)不同的路由鍵
/** * 【Topic通配符模式】 * @param message 消息內(nèi)容 **/ @PostMapping("/topic/{message}") public String sendTopicMessage(@PathVariable("message") String message) { Map<String, Object> map = new HashMap<>(); /* 直接跟交換機(jī)MY_FANOUT_EXCHANGE交互 */ rabbitTemplate.setExchange(DirectRabbitConfig.MY_TOPIC_EXCHANGE); map.put("messageId", String.valueOf(UUID.randomUUID())); map.put("messageData", message + "TEST1"); rabbitTemplate.convertAndSend(DirectRabbitConfig.TOPIC_ROUTING_KEY_ONE,map); map.put("messageId", String.valueOf(UUID.randomUUID())); map.put("messageData", message + "TEST2"); rabbitTemplate.convertAndSend(DirectRabbitConfig.TOPIC_ROUTING_KEY_TWO,map); map.put("messageId", String.valueOf(UUID.randomUUID())); map.put("messageData", message + "TEST3"); rabbitTemplate.convertAndSend(DirectRabbitConfig.TOPIC_ROUTING_KEY_THREE,map); return "ok"; }
路由鍵聲明如下:
public static final String TOPIC_ROUTING_KEY_ONE = "topic.a1.b1.c1"; // topic路由鍵1 public static final String TOPIC_ROUTING_KEY_TWO = "topic.a1.b1"; // topic路由鍵2 public static final String TOPIC_ROUTING_KEY_THREE = "topic.a2.b1"; // topic路由鍵3
啟動(dòng)項(xiàng)目,調(diào)用生產(chǎn)者的接口,查看兩個(gè)消費(fèi)者的消費(fèi)情況。
- TOPIC消費(fèi)者[2]收到消息 : (Body:'{messageId=82abd282-1110-4f1a-b09e-ae9a43c560c3, messageData=hi topic! TEST1}' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myTopicExchange, receivedRoutingKey=topic.a1.b1.c1, deliveryTag=1, consumerTag=amq.ctag-wlRVC5xWiN8glrtA2_i6uA, consumerQueue=topic-queue-2])
- TOPIC消費(fèi)者[1]收到消息 : (Body:'{messageId=b2039557-75d8-47d5-93a0-2a03a38fabc7, messageData=hi topic! TEST2}' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myTopicExchange, receivedRoutingKey=topic.a1.b1, deliveryTag=1, consumerTag=amq.ctag-F6ByjknEnCjh7XVolNfmcg, consumerQueue=topic-queue-1])
- TOPIC消費(fèi)者[2]收到消息 : (Body:'{messageId=b2039557-75d8-47d5-93a0-2a03a38fabc7, messageData=hi topic! TEST2}' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myTopicExchange, receivedRoutingKey=topic.a1.b1, deliveryTag=2, consumerTag=amq.ctag-wlRVC5xWiN8glrtA2_i6uA, consumerQueue=topic-queue-2])
- TOPIC消費(fèi)者[1]收到消息 : (Body:'{messageId=3a8f3164-706f-4523-bd2a-4fee73595fbb, messageData=hi topic! TEST3}' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myTopicExchange, receivedRoutingKey=topic.a2.b1, deliveryTag=2, consumerTag=amq.ctag-F6ByjknEnCjh7XVolNfmcg, consumerQueue=topic-queue-1])
可以看到,路由鍵前綴為topic.a1
的信息都可以被綁定了topic.a1.#
的隊(duì)列接收到,而綁定了topic.a1.*
的隊(duì)列只能接收到topic.a1
后面帶一個(gè)單詞的信息,由于隊(duì)列1還通過(guò)topic.*.b1
綁定交換機(jī),因此攜帶路由鍵"topic.a2.b1"
的信息同樣也被隊(duì)列1接收
topic交換機(jī)是direct交換機(jī)做的改造的。兩者的區(qū)別主要體現(xiàn)在路由鍵和綁定鍵格式上的限制不同。
路由鍵:必須是由點(diǎn)分隔的單詞列表。單詞形式不限。比如一個(gè)主題建:<主題1>.<主題2>.<主題3>
綁定鍵:格式上和路由鍵一致,但多了兩個(gè)通配符*
和#
,#
代表任意數(shù)量的單詞,包括0個(gè)。*
標(biāo)識(shí)一個(gè)單詞。
使用上,一個(gè)綁定鍵,我們可以看成是對(duì)一類具有多個(gè)特征的物體的一個(gè)抽象,由點(diǎn)分割的每個(gè)單詞,我們可以看成一個(gè)主題或是一個(gè)特征。因此只要做好消息特征的歸納抽象,加上通配符的使用,我們就有很高的自由度去處理任意類型的消息
總結(jié)
以上就是關(guān)于RabbitMQ五種隊(duì)列模式的實(shí)戰(zhàn)演練,關(guān)于RabbitMQ其它實(shí)戰(zhàn)與知識(shí)理解后續(xù)會(huì)相繼分享,感興趣的同學(xué)歡迎留言討論
到此這篇關(guān)于實(shí)戰(zhàn)干貨之基于SpringBoot的RabbitMQ多種模式隊(duì)列的文章就介紹到這了,更多相關(guān)SpringBoot RabbitMQ 多種模式隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java中的構(gòu)造方法(構(gòu)造函數(shù))與普通方法區(qū)別及說(shuō)明
這篇文章主要介紹了Java中的構(gòu)造方法(構(gòu)造函數(shù))與普通方法區(qū)別及說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-03-03org.apache.ibatis.annotations不存在的問(wèn)題
這篇文章主要介紹了org.apache.ibatis.annotations不存在的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-10-10SpringCloud容器化服務(wù)發(fā)現(xiàn)及注冊(cè)實(shí)現(xiàn)方法解析
這篇文章主要介紹了SpringCloud容器化服務(wù)發(fā)現(xiàn)及注冊(cè)實(shí)現(xiàn)方法解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-08-08基于springboot實(shí)現(xiàn)redis分布式鎖的方法
這篇文章主要介紹了基于springboot實(shí)現(xiàn)redis分布式鎖的方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11Java中的BaseTypeHandler自定義類型轉(zhuǎn)換器的使用
這篇文章主要介紹了Java中的BaseTypeHandler自定義類型轉(zhuǎn)換器的使用,文章圍繞主題展開(kāi)詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下2022-05-05Java的二叉樹(shù)排序以及遍歷文件展示文本格式的文件樹(shù)
這篇文章主要介紹了Java的二叉樹(shù)排序以及遍歷文件展示文本格式的文件樹(shù),是對(duì)二叉樹(shù)結(jié)構(gòu)學(xué)習(xí)的兩個(gè)很好的實(shí)踐,需要的朋友可以參考下2015-11-11java數(shù)組復(fù)制的四種方法效率對(duì)比
這篇文章主要介紹了java數(shù)組復(fù)制的四種方法效率對(duì)比,文中有簡(jiǎn)單的代碼示例,以及效率的比較結(jié)果,具有一定參考價(jià)值,需要的朋友可以了解下。2017-11-11