SpringBoot整合RabbitMQ 手動(dòng)應(yīng)答(簡(jiǎn)單demo)
版本說明
- JDK 1.8
- RabbitMQ 3.7.15 Erlang 22.0
- SpringBoot 2.3.3.RELEASE
- // TODO 2021年1月8日 整理CentOS安裝RabbitMQ流程
1. 在RabbitMQ的Web管理界面,創(chuàng)建test隊(duì)列
參數(shù)的含義
durability:是否持久化(重啟或宕機(jī)后消息依然保存)
- durable 持久
- transient 暫時(shí)
新建maven項(xiàng)目。
2. pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.3.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.demo</groupId> <artifactId>rabbitmq-demo</artifactId> <version>1.0.0</version> <properties> <lombok.version>1.18.12</lombok.version> </properties> <dependencies> <!--web 模塊--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- AMQP --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!-- lombok--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> <scope>provided</scope> <version>${lombok.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
3. application.yaml
server: port: 20002 spring: rabbitmq: # 這里我改了本地的hosts,實(shí)際地址是192.168.0.121 host: vm.com port: 5672 virtual-host: / username: admin password: admin # 開啟消息確認(rèn)模式 # 消息發(fā)送到交換機(jī)確認(rèn)機(jī)制,是否確認(rèn)回調(diào) # publisher-confirms: true # 是否返回回調(diào) publisher-returns: true template: #開啟mandatory: true, basic.return方法將消息返還給生產(chǎn)者 mandatory: true listener: simple: # 手動(dòng)應(yīng)答 acknowledge-mode: manual # 最少消費(fèi)者數(shù)量 concurrency: 1 # 最多消費(fèi)者數(shù)量 max-concurrency: 10 # 支持重試 retry: enabled: true
端口
- 5672:RabbitMQ的通信端口
- 15672:Web管理界面端口
4. RabbitmqDemo.java
@SpringBootApplication @EnableRabbit public class RabbitmqDemoApplication { public static void main(String[] args) { SpringApplication.run(RabbitmqDemoApplication.class, args); } }
5. RabbitConfig.java
@Configuration @Slf4j public class RabbitConfig { private RabbitTemplate rabbitTemplate; @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { rabbitTemplate = new RabbitTemplate(connectionFactory); return rabbitTemplate; } }
配置RabbitMQ的消息模板。
6. 消息生產(chǎn)者 produce.java
@Component public class Producer { // @Qualifier("rabbitTemplate") @Autowired private RabbitTemplate rabbitTemplate; public void send() { for (int i = 0; i < 5; i++) { System.out.println("生產(chǎn)者發(fā)送消息,序號(hào)為: " + i); rabbitTemplate.convertAndSend("test", String.valueOf(i)); } } }
初始化消息發(fā)送模板RabbitTemplate,@Qualifier注解用于限定具體的實(shí)現(xiàn)類,這里可以不指定。
7. 消息消費(fèi)者 consumer.java
消費(fèi)者1和消費(fèi)者2均監(jiān)聽test隊(duì)列。
不同的是,消費(fèi)者1收到消息后返回確認(rèn)應(yīng)答basicAck。
而消費(fèi)者2收到消息后返回拒絕應(yīng)答basicRegect,消息被消費(fèi)者拒絕后重新回到test隊(duì)列中,等待下次發(fā)送給消費(fèi)者。
@Component @Slf4j public class Consumer { /** * 消費(fèi)者1 模擬正常處理消息的情況,消息處理完畢發(fā)送確認(rèn)應(yīng)答 * @param message * @param channel * @throws IOException */ @RabbitListener(queues = "test") public void process1(Message message, Channel channel) throws IOException { log.info("消費(fèi)者1 接收消息: " + new String(message.getBody())); log.info("消費(fèi)者1 確認(rèn)應(yīng)答消息:" + new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); } /** * 消費(fèi)者2 模擬處理消息出錯(cuò)的情況,消費(fèi)者2向rabbitmq發(fā)送拒絕應(yīng)答。 * 處理失敗的消息會(huì)被重新放入ready中,再次發(fā)送給消費(fèi)者,直至收到確認(rèn)應(yīng)答 * @param message * @param channel * @throws IOException */ @RabbitListener(queues = "test") public void process2(Message message, Channel channel) throws IOException { log.info("消費(fèi)者2 接收消息:" + new String(message.getBody())); log.info("消費(fèi)者2 拒絕應(yīng)答消息:" + new String(message.getBody())); channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); } }
8. 測(cè)試RabbitMqController.java
@RestController @RequestMapping("") public class RabbitMqController { @Autowired private Producer producer; @GetMapping("/send") public String send() { producer.send(); return "發(fā)送完成"; } }
9. 測(cè)試
使用postman或?yàn)g覽器使用Get方法請(qǐng)求http://localhost:20001/send,生產(chǎn)者會(huì)向RabbitMQ的test隊(duì)列發(fā)送5條消息:
生產(chǎn)者發(fā)送消息,序號(hào)為: 0
生產(chǎn)者發(fā)送消息,序號(hào)為: 1
生產(chǎn)者發(fā)送消息,序號(hào)為: 2
生產(chǎn)者發(fā)送消息,序號(hào)為: 3
生產(chǎn)者發(fā)送消息,序號(hào)為: 4
可以看出序號(hào)為2的消息3次被消費(fèi)者2接收,消費(fèi)者2也3次發(fā)送拒絕應(yīng)答,直到第4次才被消費(fèi)者1接收,并返回確認(rèn)應(yīng)答。
到此這篇關(guān)于SpringBoot整合RabbitMQ 手動(dòng)應(yīng)答 簡(jiǎn)單demo的文章就介紹到這了,更多相關(guān)SpringBoot整合RabbitMQ 內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring Boot實(shí)現(xiàn)STOMP協(xié)議的WebSocket的方法步驟
這篇文章主要介紹了Spring Boot實(shí)現(xiàn)STOMP協(xié)議的WebSocket的方法步驟,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2019-05-05java顯示當(dāng)前的系統(tǒng)時(shí)間
這篇文章主要介紹了java如何顯示當(dāng)前的系統(tǒng)時(shí)間,代碼很簡(jiǎn)單,自己可以自定義顯示的系統(tǒng)時(shí)間的顏色和字體,需要的朋友可以參考下2015-10-10解析Linux系統(tǒng)中JVM內(nèi)存2GB上限的詳解
本篇文章是對(duì)Linux系統(tǒng)中JVM內(nèi)存2GB上限進(jìn)行了詳細(xì)的分析介紹,需要的朋友參考下2013-05-05