SpringBoot整合RabbitMQ及原理
1、相關(guān)依賴
這里無需指定版本號,讓其跟著SpringBoot版本走。本示例使用SpringBoot版本號為2.7.10。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency>
2、生產(chǎn)者、消費者
創(chuàng)建兩個SpringBoot應(yīng)用,模擬消息生產(chǎn)者與消費者【publisher、consumer】。
2-1生產(chǎn)者
編寫配置文件,用戶名和密碼等自行修改 這里虛擬機的名稱是上一篇文章中新建的。
server.port=8082 #rabbitmq服務(wù)器ip spring.rabbitmq.host=localhost #rabbitmq的端口 spring.rabbitmq.port=5672 #用戶名 spring.rabbitmq.username=用戶名 #密碼 spring.rabbitmq.password=密碼 #配置虛擬機 spring.rabbitmq.virtual-host=demo
聲明交換機、隊列并綁定:
@Configuration
public class RabbitMqConfig {
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
return rabbitTemplate;
}
@Bean
public MessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public DirectExchange getExchange(){
return new DirectExchange("directExchange",false,false);
}
@Bean
public Queue getQueue(){
return new Queue("publisher.addUser",true,false,false);
}
@Bean
public Binding getBinding(DirectExchange exchange,Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("publisher.addUser");
}
}新建User實體類
@Data
public class User {
private Long id;
private String name;
private String desc;
}在方法中使用RabbitTemplate來發(fā)送消息:
public interface PublisherService {
/**
* 添加用戶
* @param user 用戶信息
*/
void addUser(User user);
}@RequiredArgsConstructor
@Service
public class PublisherServiceImpl implements PublisherService{
private final RabbitTemplate rabbitTemplate;
@Override
public void addUser(User user) {
rabbitTemplate.convertAndSend("directExchange","publisher.addUser",user);
}
}以上需要注意的就是交換機的名稱、隊列名、routingKey。示例中使用的是直連交換機,routingKey需要和隊列名保持一致。不懂的可以查看上一篇文章。
controller:
@RequiredArgsConstructor
@RestController
@RequestMapping("/user")
public class UserController {
private final PublisherService publisherService;
@PostMapping("/add")
public void add(){
User user = new User();
user.setId(1000L);
user.setName("黃忠");
user.setDesc("老兵不死,只是逐漸凋零");
publisherService.addUser(user);
}
}2-2消費者
消費者的配置和生產(chǎn)者一樣,不贅述了,直接看代碼:
@Service
@Slf4j
public class ConsumerService {
@RabbitListener(queues ="publisher.addUser")
public void addUser(String userStr){
User user = JSONObject.parseObject(userStr,User.class);
log.info(user.toString());
}
}@RabbitListener 注解是指定某方法作為消息消費的方法,指定隊列名稱。@RabbitListener 如果標注在類上,需配合 @RabbitHandler 注解一起使用,根據(jù)接受的參數(shù)類型進入具體的方法中。
2-3測試
消費端在啟動時可能會報找不到交換機或隊列,只需要讓生產(chǎn)者發(fā)送一次消息,從控制臺就可以看到相關(guān)的交換機和隊列等信息了。

可以看到消費者成功消費了消息:


3、消費流程
通過上述操作,我們已經(jīng)會簡單地使用RabbitMQ了,接下來了解一下它的整個流程。如此可以讓我們掌握的更牢固。

生產(chǎn)者:
- 生產(chǎn)者連接到Message Broker【也就是RabbitMQ服務(wù)】,建立一個連接( Connection)開啟一個信道(Channel)。
- 生產(chǎn)者聲明一個交換機,并設(shè)置相關(guān)屬性,比如交換機類型、是否持久化等。
- 生產(chǎn)者聲明一個隊列并設(shè)置相關(guān)屬性。
- 生產(chǎn)者通過路由鍵【Routing Key】將交換機和隊列綁定。
- 生產(chǎn)者發(fā)送消息至RabbitMQ Broker,其中包含路由鍵、交換器等信息。
- 相應(yīng)的交換機根據(jù)接收到的路由鍵查找相匹配的隊列。
- 如果找到,則將從生產(chǎn)者發(fā)送過來的消息存入相應(yīng)的隊列中。
- 如果沒有找到,則根據(jù)生產(chǎn)者配置的屬性選擇丟棄還是回退給生產(chǎn)者
- 關(guān)閉信道。
- 關(guān)閉連接。
消費者:
- 消費者連接到RabbitMQ Broker ,建立一個連接(Connection),開啟一個信道(Channel) 。
- 消費者向RabbitMQ Broker 請求消費相應(yīng)隊列中的消息,可能會設(shè)置相應(yīng)的回調(diào)函數(shù),
- 等待RabbitMQ Broker 回應(yīng)并投遞相應(yīng)隊列中的消息,消費者接收消息。
- 消費者確認(ack) 接收到的消息。
- RabbitMQ 從隊列中刪除相應(yīng)己經(jīng)被確認的消息。
- 關(guān)閉信道。
- 關(guān)閉連接。
到此這篇關(guān)于SpringBoot整合RabbitMQ及其原理分析的文章就介紹到這了,更多相關(guān)SpringBoot整合RabbitMQ內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot實現(xiàn)在一個模塊中引入另一個模塊
這篇文章主要介紹了SpringBoot實現(xiàn)在一個模塊中引入另一個模塊的方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-10-10
SpringBoot快速整合RabbitMq小案例(使用步驟)
這篇文章主要介紹了SpringBoot快速整合RabbitMq小案例,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-06-06
Java線程池隊列PriorityBlockingQueue原理分析
這篇文章主要介紹了Java線程池隊列PriorityBlockingQueue原理分析,PriorityBlockingQueue隊列是?JDK1.5?的時候出來的一個阻塞隊列,但是該隊列入隊的時候是不會阻塞的,永遠會加到隊尾,需要的朋友可以參考下2023-12-12

