SpringBoot整合RabbitMQ及原理
上一篇:RabbitMQ基礎知識
1、相關依賴
這里無需指定版本號,讓其跟著SpringBoot版本走。本示例使用SpringBoot版本號為2.7.10。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency>
2、生產者、消費者
創(chuàng)建兩個SpringBoot應用,模擬消息生產者與消費者【publisher、consumer】。
2-1生產者
編寫配置文件,用戶名和密碼等自行修改 這里虛擬機的名稱是上一篇文章中新建的。
server.port=8082 #rabbitmq服務器ip spring.rabbitmq.host=localhost #rabbitmq的端口 spring.rabbitmq.port=5672 #用戶名 spring.rabbitmq.username=用戶名 #密碼 spring.rabbitmq.password=密碼 #配置虛擬機 spring.rabbitmq.virtual-host=demo
聲明交換機、隊列并綁定:
@Configuration
public class RabbitMqConfig {
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
return rabbitTemplate;
}
@Bean
public MessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public DirectExchange getExchange(){
return new DirectExchange("directExchange",false,false);
}
@Bean
public Queue getQueue(){
return new Queue("publisher.addUser",true,false,false);
}
@Bean
public Binding getBinding(DirectExchange exchange,Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("publisher.addUser");
}
}新建User實體類
@Data
public class User {
private Long id;
private String name;
private String desc;
}在方法中使用RabbitTemplate來發(fā)送消息:
public interface PublisherService {
/**
* 添加用戶
* @param user 用戶信息
*/
void addUser(User user);
}@RequiredArgsConstructor
@Service
public class PublisherServiceImpl implements PublisherService{
private final RabbitTemplate rabbitTemplate;
@Override
public void addUser(User user) {
rabbitTemplate.convertAndSend("directExchange","publisher.addUser",user);
}
}以上需要注意的就是交換機的名稱、隊列名、routingKey。示例中使用的是直連交換機,routingKey需要和隊列名保持一致。不懂的可以查看上一篇文章。
controller:
@RequiredArgsConstructor
@RestController
@RequestMapping("/user")
public class UserController {
private final PublisherService publisherService;
@PostMapping("/add")
public void add(){
User user = new User();
user.setId(1000L);
user.setName("黃忠");
user.setDesc("老兵不死,只是逐漸凋零");
publisherService.addUser(user);
}
}2-2消費者
消費者的配置和生產者一樣,不贅述了,直接看代碼:
@Service
@Slf4j
public class ConsumerService {
@RabbitListener(queues ="publisher.addUser")
public void addUser(String userStr){
User user = JSONObject.parseObject(userStr,User.class);
log.info(user.toString());
}
}@RabbitListener 注解是指定某方法作為消息消費的方法,指定隊列名稱。@RabbitListener 如果標注在類上,需配合 @RabbitHandler 注解一起使用,根據接受的參數類型進入具體的方法中。
2-3測試
消費端在啟動時可能會報找不到交換機或隊列,只需要讓生產者發(fā)送一次消息,從控制臺就可以看到相關的交換機和隊列等信息了。

可以看到消費者成功消費了消息:


3、消費流程
通過上述操作,我們已經會簡單地使用RabbitMQ了,接下來了解一下它的整個流程。如此可以讓我們掌握的更牢固。

生產者:
- 生產者連接到Message Broker【也就是RabbitMQ服務】,建立一個連接( Connection)開啟一個信道(Channel)。
- 生產者聲明一個交換機,并設置相關屬性,比如交換機類型、是否持久化等。
- 生產者聲明一個隊列并設置相關屬性。
- 生產者通過路由鍵【Routing Key】將交換機和隊列綁定。
- 生產者發(fā)送消息至RabbitMQ Broker,其中包含路由鍵、交換器等信息。
- 相應的交換機根據接收到的路由鍵查找相匹配的隊列。
- 如果找到,則將從生產者發(fā)送過來的消息存入相應的隊列中。
- 如果沒有找到,則根據生產者配置的屬性選擇丟棄還是回退給生產者
- 關閉信道。
- 關閉連接。
消費者:
- 消費者連接到RabbitMQ Broker ,建立一個連接(Connection),開啟一個信道(Channel) 。
- 消費者向RabbitMQ Broker 請求消費相應隊列中的消息,可能會設置相應的回調函數,
- 等待RabbitMQ Broker 回應并投遞相應隊列中的消息,消費者接收消息。
- 消費者確認(ack) 接收到的消息。
- RabbitMQ 從隊列中刪除相應己經被確認的消息。
- 關閉信道。
- 關閉連接。
到此這篇關于SpringBoot整合RabbitMQ及其原理分析的文章就介紹到這了,更多相關SpringBoot整合RabbitMQ內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
SpringBoot快速整合RabbitMq小案例(使用步驟)
這篇文章主要介紹了SpringBoot快速整合RabbitMq小案例,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-06-06
Java線程池隊列PriorityBlockingQueue原理分析
這篇文章主要介紹了Java線程池隊列PriorityBlockingQueue原理分析,PriorityBlockingQueue隊列是?JDK1.5?的時候出來的一個阻塞隊列,但是該隊列入隊的時候是不會阻塞的,永遠會加到隊尾,需要的朋友可以參考下2023-12-12

