rabbitmq五種模式詳解(含實(shí)現(xiàn)代碼)
一、五種模式詳解
1.簡(jiǎn)單模式(Queue模式)
當(dāng)生產(chǎn)端發(fā)送消息到交換機(jī),交換機(jī)根據(jù)消息屬性發(fā)送到隊(duì)列,消費(fèi)者監(jiān)聽(tīng)綁定隊(duì)列實(shí)現(xiàn)消息的接收和消費(fèi)邏輯編寫.簡(jiǎn)單模式下,強(qiáng)調(diào)的一個(gè)隊(duì)列queue只被一個(gè)消費(fèi)者監(jiān)聽(tīng)消費(fèi).
1.1 結(jié)構(gòu)

生產(chǎn)者:生成消息,發(fā)送到交換機(jī)交換機(jī):根據(jù)消息屬性,將消息發(fā)送給隊(duì)列消費(fèi)者:監(jiān)聽(tīng)這個(gè)隊(duì)列,發(fā)現(xiàn)消息后,獲取消息執(zhí)行消費(fèi)邏輯
1.2應(yīng)用場(chǎng)景
常見(jiàn)的應(yīng)用場(chǎng)景就是一發(fā),一接的結(jié)構(gòu)
例如:
手機(jī)短信郵件單發(fā)
2.爭(zhēng)搶模式(Work模式)
強(qiáng)調(diào)的也是后端隊(duì)列與消費(fèi)者綁定的結(jié)構(gòu)
2.1結(jié)構(gòu)

生產(chǎn)者:發(fā)送消息到交換機(jī)交換機(jī):根據(jù)消息屬性將消息發(fā)送給隊(duì)列消費(fèi)者:多個(gè)消費(fèi)者,同時(shí)綁定監(jiān)聽(tīng)一個(gè)隊(duì)列,之間形成了爭(zhēng)搶消息的效果
2.2應(yīng)用場(chǎng)景
- 搶紅包
- 資源分配系統(tǒng)
3.路由模式(Route模式 Direct定向)
從路由模式開(kāi)始,關(guān)心的就是消息如何到達(dá)的隊(duì)列,路由模式需要使用的交換機(jī)類型就是路由交換機(jī)(direct)
3.1 結(jié)構(gòu)

- 生產(chǎn)端:發(fā)送消息,在消息中處理消息內(nèi)容,攜帶一個(gè)routingkey
- 交換機(jī):接收消息,根據(jù)消息的routingkey去計(jì)算匹配后端隊(duì)列的routingkey
- 隊(duì)列:存儲(chǔ)交換機(jī)發(fā)送的消息
- 消費(fèi)端:簡(jiǎn)單模式 工作爭(zhēng)搶
3.2應(yīng)用場(chǎng)景
- 短信
- 聊天工具
- 郵箱。。
手機(jī)號(hào)/郵箱地址,都可以是路由key
4.發(fā)布訂閱模式(Pulish/Subscribe模式 Fanout廣播)
不計(jì)算路由的一種特殊交換機(jī)
4.1結(jié)構(gòu)

4.2應(yīng)用場(chǎng)景
- 消息推送
- 廣告
5.主題模式(Topics模式 Tpoic通配符)
路由key值是一種多級(jí)路徑。中國(guó).四川.成都.武侯區(qū)
5.1結(jié)構(gòu)

生產(chǎn)端:攜帶路由key,發(fā)送消息到交換機(jī)
隊(duì)列:綁定交換機(jī)和路由不一樣,不是一個(gè)具體的路由key,而可以使用*和#代替一個(gè)范圍
| * | 字符串,只能表示一級(jí) |
| --- | --- |
| # | 多級(jí)字符串 |
交換機(jī):根據(jù)匹配規(guī)則,將路由key對(duì)應(yīng)發(fā)送到隊(duì)列
消息路由key:
- 北京市.朝陽(yáng)區(qū).酒仙橋
- 北京市.#: 匹配true
- 上海市.浦東區(qū).*: 沒(méi)匹配false
- 新疆.烏魯木齊.#
5.2 應(yīng)用場(chǎng)景
做物流分揀的多級(jí)傳遞.
6.完整結(jié)構(gòu)

二、代碼實(shí)現(xiàn)
1.創(chuàng)建SpringBoot工程
1.1 工程基本信息

1.2 依賴信息

1.3 配置文件applicasion.properties
# 應(yīng)用名稱 spring.application.name=springboot-demo # Actuator Web 訪問(wèn)端口 management.server.port=8801 management.endpoints.jmx.exposure.include=* management.endpoints.web.exposure.include=* management.endpoint.health.show-details=always # 應(yīng)用服務(wù) WEB 訪問(wèn)端口 server.port=8801 ######################### RabbitMQ配置 ######################## # RabbitMQ主機(jī) spring.rabbitmq.host=127.0.0.1 # RabbitMQ虛擬主機(jī) spring.rabbitmq.virtual-host=demo # RabbitMQ服務(wù)端口 spring.rabbitmq.port=5672 # RabbitMQ服務(wù)用戶名 spring.rabbitmq.username=admin # RabbitMQ服務(wù)密碼 spring.rabbitmq.password=admin # RabbitMQ服務(wù)發(fā)布確認(rèn)屬性配置 ## NONE值是禁用發(fā)布確認(rèn)模式,是默認(rèn)值 ## CORRELATED值是發(fā)布消息成功到交換器后會(huì)觸發(fā)回調(diào)方法 ## SIMPLE值經(jīng)測(cè)試有兩種效果,其一效果和CORRELATED值一樣會(huì)觸發(fā)回調(diào)方法,其二在發(fā)布消息成功后使用rabbitTemplate調(diào)用waitForConfirms或waitForConfirmsOrDie方法等待broker節(jié)點(diǎn)返回發(fā)送結(jié)果,根據(jù)返回結(jié)果來(lái)判定下一步的邏輯,要注意的點(diǎn)是waitForConfirmsOrDie方法如果返回false則會(huì)關(guān)閉channel,則接下來(lái)無(wú)法發(fā)送消息到broker; spring.rabbitmq.publisher-confirm-type=simple # RabbitMQ服務(wù)開(kāi)啟消息發(fā)送確認(rèn) spring.rabbitmq.publisher-returns=true ######################### simple模式配置 ######################## # RabbitMQ服務(wù) 消息接收確認(rèn)模式 ## NONE:不確認(rèn) ## AUTO:自動(dòng)確認(rèn) ## MANUAL:手動(dòng)確認(rèn) spring.rabbitmq.listener.simple.acknowledge-mode=manual # 指定最小的消費(fèi)者數(shù)量 spring.rabbitmq.listener.simple.concurrency=1 # 指定最大的消費(fèi)者數(shù)量 spring.rabbitmq.listener.simple.max-concurrency=1 # 開(kāi)啟支持重試 spring.rabbitmq.listener.simple.retry.enabled=true
2.簡(jiǎn)單模式
2.1 創(chuàng)建SimpleQueueConfig 簡(jiǎn)單隊(duì)列配置類
package com.gmtgo.demo.simple;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 大帥
*/
@Configuration
public class SimpleQueueConfig {
/**
* 定義簡(jiǎn)單隊(duì)列名.
*/
private final String simpleQueue = "queue_simple";
@Bean
public Queue simpleQueue() {
return new Queue(simpleQueue);
}
}
2.2 編寫生產(chǎn)者
package com.gmtgo.demo.simple;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author 大帥
*/
@Slf4j
@Component
public class SimpleProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage() {
for (int i = 0; i < 5; i++) {
String message = "簡(jiǎn)單消息" + i;
log.info("我是生產(chǎn)信息:{}", message);
rabbitTemplate.convertAndSend( "queue_simple", message);
}
}
}
2.3 編寫消費(fèi)者
package com.gmtgo.demo.simple;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author 大帥
*/
@Slf4j
@Component
public class SimpleConsumers {
@RabbitListener(queues = "queue_simple")
public void readMessage(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("我是消費(fèi)信息:{}", new String(message.getBody()));
}
}
2.4 編寫訪問(wèn)類
package com.gmtgo.demo.simple;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author 大帥
*/
@RestController
@RequestMapping(value = "/rabbitMq")
public class SimpleRabbitMqController {
@Autowired
private SimpleProducer simpleProducer;
@RequestMapping(value = "/simpleQueueTest")
public String simpleQueueTest() {
simpleProducer.sendMessage();
return "success";
}
}
2.5 測(cè)試啟動(dòng)項(xiàng)目訪問(wèn) simpleQueueTest
訪問(wèn)地址:http://127.0.0.1:8801/rabbitMq/simpleQueueTest
結(jié)果:

3.Work隊(duì)列
3.1 編寫工作配置
package com.gmtgo.demo.work;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 大帥
*/
@Configuration
public class WorkQueueConfig {
/**
* 隊(duì)列名.
*/
private final String work = "work_queue";
@Bean
public Queue workQueue() {
return new Queue(work);
}
}
3.2 編寫生產(chǎn)者
package com.gmtgo.demo.work;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author 大帥
*/
@Slf4j
@Component
public class WorkProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage() {
for (int i = 0; i < 10; i++) {
String message = "工作消息" + i;
log.info("我是生產(chǎn)信息:{}", message);
rabbitTemplate.convertAndSend("work_queue", message);
}
}
}
3.3 編寫消費(fèi)者1
package com.gmtgo.demo.work;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author 大帥
*/
@Slf4j
@Component
public class WorkConsumers1 {
@RabbitListener(queues = "work_queue")
public void readMessage(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("我是消費(fèi)信息1:{}", new String(message.getBody()));
}
}
3.4 編寫消費(fèi)者2
package com.gmtgo.demo.work;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author 大帥
*/
@Slf4j
@Component
public class WorkConsumers2 {
@RabbitListener(queues = "work_queue")
public void readMessage(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("我是消費(fèi)信息2:{}", new String(message.getBody()));
}
}
3.5 編寫測(cè)試方法
package com.gmtgo.demo.work;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author 大帥
*/
@RestController
@RequestMapping(value = "rabbitMq")
public class WorkRabbitMqController {
@Autowired
private WorkProducer workProducer;
@RequestMapping(value = "workQueueTest")
public String workQueueTest() {
workProducer.sendMessage();
return "success";
}
}
3.6 測(cè)試啟動(dòng)項(xiàng)目訪問(wèn) workQueueTest
訪問(wèn)地址http://127.0.0.1:8801/rabbitMq/workQueueTest
結(jié)果:

控制臺(tái)打印,發(fā)現(xiàn)10條消息 偶數(shù)條消費(fèi)者1獲取,奇數(shù)條消費(fèi)者2獲取,并且平均分配。
當(dāng)然通過(guò)代碼實(shí)現(xiàn)按需分配,即誰(shuí)的性能強(qiáng),誰(shuí)優(yōu)先原則,實(shí)現(xiàn)負(fù)載均衡。
配置可控分配數(shù)

4. 發(fā)布訂閱模式(Publish/Subscibe模式)
訂閱模式–多個(gè)消費(fèi)者監(jiān)聽(tīng)不同的隊(duì)列,但隊(duì)列都綁定同一個(gè)交換機(jī)
4.1 編寫訂閱配置類
package com.gmtgo.demo.fanout;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 大帥
*/
@Configuration
public class FanoutQueueConfig {
/**
* 聲明隊(duì)列名.
*/
private final String fanout1 = "fanout_queue_1";
private final String fanout2 = "fanout_queue_2";
/**
* 聲明交換機(jī)的名字.
*/
private final String fanoutExchange = "fanoutExchange";
/**
* 聲明隊(duì)列.
*
* @return
*/
@Bean
public Queue fanoutQueue1() {
return new Queue(fanout1);
}
@Bean
public Queue fanoutQueue2() {
return new Queue(fanout2);
}
/**
* 聲明交換機(jī).
*/
@Bean
public FanoutExchange exchange() {
return new FanoutExchange(fanoutExchange);
}
/**
* 隊(duì)列綁定交換機(jī),也可在可視化工具中進(jìn)行綁定.
*
* @return
*/
@Bean
public Binding bindingFanoutQueue1(Queue fanoutQueue1, FanoutExchange exchange) {
return BindingBuilder.bind(fanoutQueue1).to(exchange);
}
@Bean
public Binding bindingFanoutQueue2(Queue fanoutQueue2, FanoutExchange exchange) {
return BindingBuilder.bind(fanoutQueue2).to(exchange);
}
}
4.2 編寫訂閱生產(chǎn)者
package com.gmtgo.demo.fanout;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author 大帥
*/
@Slf4j
@Component
public class FanoutProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage() {
for (int i = 0; i < 5; i++) {
String message = "訂閱模式消息" + i;
log.info("我是生產(chǎn)信息:{}", message);
rabbitTemplate.convertAndSend("fanoutExchange", "", message);
}
}
}
4.3 編寫訂閱消費(fèi)者1
package com.gmtgo.demo.fanout;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author 大帥
*/
@Slf4j
@Component
public class FanoutConsumers1 {
@RabbitListener(queues = "fanout_queue_1")
public void readMessage(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("我是消費(fèi)信息1:{}", new String(message.getBody()));
}
}
4.4 編寫訂閱消費(fèi)者2
package com.gmtgo.demo.fanout;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author 大帥
*/
@Slf4j
@Component
public class FanoutConsumers2 {
@RabbitListener(queues = "fanout_queue_2")
public void readMessage(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("我是消費(fèi)信息2:{}", new String(message.getBody()));
}
}
4.5 編寫測(cè)試方法
package com.gmtgo.demo.fanout;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author 大帥
*/
@RestController
@RequestMapping(value = "rabbitMq")
public class FanoutRabbitMqController {
@Autowired
private FanoutProducer fanoutProducer;
@RequestMapping(value = "fanoutQueueTest")
public String fanoutQueueTest() {
fanoutProducer.sendMessage();
return "success";
}
}
3.6 測(cè)試啟動(dòng)項(xiàng)目訪問(wèn) fanoutQueueTest
- 訪問(wèn)地址http://127.0.0.1:8801/rabbitMq/fanoutQueueTest
- 結(jié)果:

控制臺(tái)打印 ,發(fā)現(xiàn)兩個(gè)綁定了不同隊(duì)列的消費(fèi)者都接受到了同一條消息查看RabbitMq 服務(wù)器:



5. 路由模式(Route模式 Direct定向)
5.1 編寫路由配置類
package com.gmtgo.demo.direct;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 大帥
*/
@Configuration
public class DirectQueueConfig {
/**
* 聲明隊(duì)列名.
*/
private final String direct1 = "direct_queue_1";
private final String direct2 = "direct_queue_2";
/**
* 聲明交換機(jī)的名字.
*/
private final String directExchange = "directExchange";
/**
* 聲明隊(duì)列.
*
* @return
*/
@Bean
public Queue directQueue1() {
return new Queue(direct1);
}
@Bean
public Queue directQueue2() {
return new Queue(direct2);
}
/**
* 聲明路由交換機(jī).
*
* @return
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange(directExchange);
}
/**
* 隊(duì)列綁定交換機(jī),指定routingKey,也可在可視化工具中進(jìn)行綁定.
*
* @return
*/
@Bean
Binding bindingDirectExchange1(Queue directQueue1, DirectExchange exchange) {
return BindingBuilder.bind(directQueue1).to(exchange).with("update");
}
/**
* 隊(duì)列綁定交換機(jī),指定routingKey,也可在可視化工具中進(jìn)行綁定.
*
* @return
*/
@Bean
Binding bindingDirectExchange2(Queue directQueue2, DirectExchange exchange) {
return BindingBuilder.bind(directQueue2).to(exchange).with("add");
}
}
5.2 編寫生產(chǎn)者
package com.gmtgo.demo.direct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author 大帥
*/
@Slf4j
@Component
public class DirectProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessageA() {
for (int i = 0; i < 5; i++) {
String message = "路由模式--routingKey=update消息" + i;
log.info("我是生產(chǎn)信息:{}", message);
rabbitTemplate.convertAndSend("directExchange", "update", message);
}
}
public void sendMessageB() {
for (int i = 0; i < 5; i++) {
String message = "路由模式--routingKey=add消息" + i;
log.info("我是生產(chǎn)信息:{}", message);
rabbitTemplate.convertAndSend("directExchange", "add", message);
}
}
}
5.3 編寫消費(fèi)者1
package com.gmtgo.demo.direct;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author 大帥
*/
@Slf4j
@Component
public class DirectConsumers1 {
@RabbitListener(queues = "direct_queue_1")
public void readMessage(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("我是消費(fèi)信息1:{}", new String(message.getBody()));
}
}
5.4 編寫消費(fèi)者2
package com.gmtgo.demo.direct;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author 大帥
*/
@Slf4j
@Component
public class DirectConsumers2 {
@RabbitListener(queues = "direct_queue_2")
public void readMessage(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("我是消費(fèi)信息2:{}", new String(message.getBody()));
}
}
5.5 編寫訪問(wèn)類
package com.gmtgo.demo.direct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author 大帥
*/
@RestController
@RequestMapping(value = "rabbitMq")
public class DirectRabbitMqController {
@Autowired
private DirectProducer directProducer;
@RequestMapping(value = "directQueueTest1")
public String directQueueTest1() {
directProducer.sendMessageA();
return "success";
}
@RequestMapping(value = "directQueueTest2")
public String directQueueTest2() {
directProducer.sendMessageB();
return "success";
}
}
5.6 測(cè)試啟動(dòng)項(xiàng)目訪問(wèn)directQueueTest1 , directQueueTest2
訪問(wèn)地址http://127.0.0.1:8801/rabbitMq/directQueueTest1
訪問(wèn)地址http://127.0.0.1:8801/rabbitMq/directQueueTest2
結(jié)果:directQueueTest1:

directQueueTest2:

6. 主題模式(Topics模式 Tpoic通配符)
6.1 編寫路由配置類
package com.gmtgo.demo.topic;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 大帥
*/
@Configuration
public class TopicQueueConfig {
/**
* 聲明隊(duì)列名.
*/
private final String topic1 = "topic_queue_1";
private final String topic2 = "topic_queue_2";
/**
* 聲明交換機(jī)的名字.
*/
private final String topicExchange = "topicExchange";
/**
* 聲明隊(duì)列.
*
* @return
*/
@Bean
public Queue topicQueue1() {
return new Queue(topic1);
}
@Bean
public Queue topicQueue2() {
return new Queue(topic2);
}
/**
* 聲明路由交換機(jī).
*
* @return
*/
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(topicExchange);
}
/**
* 隊(duì)列綁定交換機(jī),指定routingKey,也可在可視化工具中進(jìn)行綁定.
*
* @return
*/
@Bean
Binding bindingTopicExchange1(Queue topicQueue1, TopicExchange exchange) {
return BindingBuilder.bind(topicQueue1).to(exchange).with("topic.keyA");
}
/**
* 隊(duì)列綁定交換機(jī),指定routingKey,也可在可視化工具中進(jìn)行綁定.
* 綁定的routing key 也可以使用通配符:
* *:匹配不多不少一個(gè)詞
* #:匹配一個(gè)或多個(gè)詞
*
* @return
*/
@Bean
Binding bindingTopicExchange2(Queue topicQueue2, TopicExchange exchange) {
return BindingBuilder.bind(topicQueue2).to(exchange).with("topic.#");
}
}
6.2 編寫生產(chǎn)者
package com.gmtgo.demo.topic;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author 大帥
*/
@Slf4j
@Component
public class TopicProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessageA() {
for (int i = 0; i < 5; i++) {
String message = "通配符模式--routingKey=topic.keyA消息" + i;
log.info("我是生產(chǎn)信息:{}", message);
rabbitTemplate.convertAndSend("topicExchange", "topic.keyA", message);
}
}
public void sendMessageB() {
for (int i = 0; i < 5; i++) {
String message = "通配符模式--routingKey=topic.#消息" + i;
log.info("我是生產(chǎn)信息:{}", message);
rabbitTemplate.convertAndSend("topicExchange", "topic.keyD.keyE", message);
}
}
}
6.3 編寫消費(fèi)者1
package com.gmtgo.demo.topic;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author 大帥
*/
@Slf4j
@Component
public class TopicConsumers1 {
@RabbitListener(queues = "topic_queue_1")
public void readMessage(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("我是消費(fèi)信息1:{}",new String(message.getBody()));
}
}
6.4 編寫消費(fèi)者2
package com.gmtgo.demo.topic;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author 大帥
*/
@Slf4j
@Component
public class TopicConsumers2 {
@RabbitListener(queues = "topic_queue_2")
public void readMessage(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("我是消費(fèi)信息2:{}",new String(message.getBody()));
}
}
6.5 編寫訪問(wèn)類
package com.gmtgo.demo.topic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author 大帥
*/
@RestController
@RequestMapping(value = "rabbitMq")
public class TopicRabbitMqController {
@Autowired
private TopicProducer topicProducer;
@RequestMapping(value = "topicQueueTest1")
public String topicQueueTest1() {
topicProducer.sendMessageA();
return "success";
}
@RequestMapping(value = "topicQueueTest2")
public String topicQueueTest2() {
topicProducer.sendMessageB();
return "success";
}
}
6.6 測(cè)試啟動(dòng)項(xiàng)目訪問(wèn)topicQueueTest1 , topicQueueTest2
- 訪問(wèn)地址http://127.0.0.1:8801/rabbitMq/topicQueueTest1
- 訪問(wèn)地址http://127.0.0.1:8801/rabbitMq/topicQueueTest2
- 結(jié)果:
topicQueueTest1,兩個(gè)消費(fèi)者都能消費(fèi)

topicQueueTest2,只有消費(fèi)者2 可以消費(fèi)

至此,五種隊(duì)列的實(shí)現(xiàn)已結(jié)束!
7. 實(shí)現(xiàn)生產(chǎn)者消息確認(rèn)
7.1 配置文件
######################### RabbitMQ配置 ######################## # RabbitMQ主機(jī) spring.rabbitmq.host=127.0.0.1 # RabbitMQ虛擬主機(jī) spring.rabbitmq.virtual-host=demo # RabbitMQ服務(wù)端口 spring.rabbitmq.port=5672 # RabbitMQ服務(wù)用戶名 spring.rabbitmq.username=admin # RabbitMQ服務(wù)密碼 spring.rabbitmq.password=admin # RabbitMQ服務(wù)發(fā)布確認(rèn)屬性配置 ## NONE值是禁用發(fā)布確認(rèn)模式,是默認(rèn)值 ## CORRELATED值是發(fā)布消息成功到交換器后會(huì)觸發(fā)回調(diào)方法 ## SIMPLE值經(jīng)測(cè)試有兩種效果,其一效果和CORRELATED值一樣會(huì)觸發(fā)回調(diào)方法,其二在發(fā)布消息成功后使用rabbitTemplate調(diào)用waitForConfirms或waitForConfirmsOrDie方法等待broker節(jié)點(diǎn)返回發(fā)送結(jié)果,根據(jù)返回結(jié)果來(lái)判定下一步的邏輯,要注意的點(diǎn)是waitForConfirmsOrDie方法如果返回false則會(huì)關(guān)閉channel,則接下來(lái)無(wú)法發(fā)送消息到broker; spring.rabbitmq.publisher-confirm-type=simple # 連接超時(shí)時(shí)間 spring.rabbitmq.connection-timeout=20000 # RabbitMQ服務(wù)開(kāi)啟消息發(fā)送確認(rèn) spring.rabbitmq.publisher-returns=true ######################### simple模式配置 ######################## # RabbitMQ服務(wù) 消息接收確認(rèn)模式 ## NONE:不確認(rèn) ## AUTO:自動(dòng)確認(rèn) ## MANUAL:手動(dòng)確認(rèn) spring.rabbitmq.listener.simple.acknowledge-mode=manual # 指定最小的消費(fèi)者數(shù)量 spring.rabbitmq.listener.simple.concurrency=1 # 指定最大的消費(fèi)者數(shù)量 spring.rabbitmq.listener.simple.max-concurrency=1 # 每次只消費(fèi)一個(gè)消息 spring.rabbitmq.listener.simple.prefetch=1 # 開(kāi)啟支持重試 spring.rabbitmq.listener.simple.retry.enabled=true # 啟用強(qiáng)制信息,默認(rèn)為false spring.rabbitmq.template.mandatory=true
7.2 編寫消息發(fā)送確認(rèn)類 RabbitConfirmCallback
package com.gmtgo.demo.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
/**
* @author 大帥
*/
@Slf4j
public class RabbitConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("=======ConfirmCallback=========");
log.info("correlationData {} " , correlationData);
log.info("ack = {}" , ack);
log.info("cause = {}" , cause);
log.info("=======ConfirmCallback=========");
}
}
7.3 編寫消息發(fā)送交換機(jī)返回機(jī)制RabbitConfirmReturnCallBack
package com.gmtgo.demo.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
/**
* @author 大帥
*/
@Slf4j
public class RabbitConfirmReturnCallBack implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("--------------ReturnCallback----------------");
log.info("message = " + message);
log.info("replyCode = {}", replyCode);
log.info("replyText = {}", replyText);
log.info("exchange = {}", exchange);
log.info("routingKey = {}", routingKey);
log.info("--------------ReturnCallback----------------");
}
}
7.4 RabbitMQ配置
在我們的rabbit隊(duì)列配置類里設(shè)置RabbitTemplate
舉例:
package com.gmtgo.demo.topic;
import com.gmtgo.demo.config.RabbitConfirmCallback;
import com.gmtgo.demo.config.RabbitConfirmReturnCallBack;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
/**
* @author 大帥
*/
@Configuration
public class TopicQueueConfig {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void initRabbitTemplate() {
// 設(shè)置生產(chǎn)者消息確認(rèn)
rabbitTemplate.setConfirmCallback(new RabbitConfirmCallback());
rabbitTemplate.setReturnCallback(new RabbitConfirmReturnCallBack());
}
/**
* 聲明隊(duì)列名.
*/
private final String topic1 = "topic_queue_1";
private final String topic2 = "topic_queue_2";
/**
* 聲明交換機(jī)的名字.
*/
private final String topicExchange = "topicExchange";
/**
* 聲明隊(duì)列.
*
* @return
*/
@Bean
public Queue topicQueue1() {
return new Queue(topic1);
}
@Bean
public Queue topicQueue2() {
return new Queue(topic2);
}
/**
* 聲明路由交換機(jī).
*
* @return
*/
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(topicExchange);
}
/**
* 隊(duì)列綁定交換機(jī),指定routingKey,也可在可視化工具中進(jìn)行綁定.
*
* @return
*/
@Bean
Binding bindingTopicExchange1(Queue topicQueue1, TopicExchange exchange) {
return BindingBuilder.bind(topicQueue1).to(exchange).with("topic.keyA");
}
/**
* 隊(duì)列綁定交換機(jī),指定routingKey,也可在可視化工具中進(jìn)行綁定.
* 綁定的routing key 也可以使用通配符:
* *:匹配不多不少一個(gè)詞
* #:匹配一個(gè)或多個(gè)詞
*
* @return
*/
@Bean
Binding bindingTopicExchange2(Queue topicQueue2, TopicExchange exchange) {
return BindingBuilder.bind(topicQueue2).to(exchange).with("topic.#");
}
}
啟動(dòng)項(xiàng)目發(fā)送消息,消息被正常消費(fèi),confim回調(diào)返回ack=true如果我們將exchange修改,發(fā)送到一個(gè)不存在的exchange中,會(huì)怎么樣呢?
會(huì)發(fā)現(xiàn)confirm回調(diào)為false,打印出結(jié)果為不存在topicExchange1111的交換機(jī)

如果我們?cè)谙M(fèi)端處理邏輯時(shí)出錯(cuò)會(huì)怎么樣呢?修改消費(fèi)端代碼我們?cè)谙M(fèi)時(shí)讓它報(bào)錯(cuò)

confirm回調(diào)為true,但是在rabbitmq的web界面會(huì)發(fā)現(xiàn)存在5條沒(méi)有消費(fèi)的消息

如果我們把
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
中最后一個(gè)參數(shù)改為false呢,會(huì)發(fā)現(xiàn)在web管理界面沒(méi)有未被消費(fèi)的消息,說(shuō)明這條消息已經(jīng)被摒棄。
實(shí)際開(kāi)發(fā)中,到底是打回到隊(duì)列呢還是摒棄,要看自己的需求,但是打回隊(duì)列應(yīng)該有次數(shù)限制,不然會(huì)陷入死循環(huán)。
繼續(xù)測(cè)試,將routingKey修改為一個(gè)沒(méi)有的key,
7.5 結(jié)論
- 如果消息沒(méi)有到exchange,則confirm回調(diào),ack=false
- 如果消息到達(dá)exchange,則confirm回調(diào),ack=true
- exchange到queue成功,則不回調(diào)return
- exchange到queue失敗,則回調(diào)return
8. 項(xiàng)目示例代碼:
下載地址:springboot-rabbitmq-demo_1619322789961
到此這篇關(guān)于rabbitmq五種模式詳解(含實(shí)現(xiàn)代碼)的文章就介紹到這了,更多相關(guān)rabbitmq五種模式內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringCloud通過(guò)Feign傳遞List類型參數(shù)方式
這篇文章主要介紹了SpringCloud通過(guò)Feign傳遞List類型參數(shù)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03
springboot中如何配置LocalDateTime JSON返回時(shí)間戳
這篇文章主要介紹了springboot中如何配置LocalDateTime JSON返回時(shí)間戳問(wèn)題。具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-06-06
Java導(dǎo)出Excel通用工具類實(shí)例代碼
這篇文章主要給大家介紹了關(guān)于Java導(dǎo)出Excel通用工具類的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04
JavaWeb基于Session實(shí)現(xiàn)的用戶登陸注銷方法示例
為了安全起見(jiàn),session常常用來(lái)保存用戶的登錄信息。那么服務(wù)器是怎么來(lái)實(shí)現(xiàn)的呢?下面這篇文章就來(lái)給大家介紹了關(guān)于JavaWeb基于Session實(shí)現(xiàn)的用戶登陸注銷的相關(guān)資料,需要的朋友可以參考借鑒,下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。2017-12-12
Java日常練習(xí)題,每天進(jìn)步一點(diǎn)點(diǎn)(40)
下面小編就為大家?guī)?lái)一篇Java基礎(chǔ)的幾道練習(xí)題(分享)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧,希望可以幫到你2021-07-07
關(guān)于@ComponentScan注解的用法及作用說(shuō)明
這篇文章主要介紹了關(guān)于@ComponentScan注解的用法及作用說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-09-09
java實(shí)現(xiàn)一個(gè)接口調(diào)取另一個(gè)接口(接口一調(diào)取接口二)
這篇文章主要介紹了java實(shí)現(xiàn)一個(gè)接口調(diào)取另一個(gè)接口(接口一調(diào)取接口二),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09

