SpringBoot整合RabbitMQ及原理
1、相關(guān)依賴
這里無(wú)需指定版本號(hào),讓其跟著SpringBoot版本走。本示例使用SpringBoot版本號(hào)為2.7.10。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency>
2、生產(chǎn)者、消費(fèi)者
創(chuàng)建兩個(gè)SpringBoot應(yīng)用,模擬消息生產(chǎn)者與消費(fèi)者【publisher、consumer】。
2-1生產(chǎn)者
編寫配置文件,用戶名和密碼等自行修改 這里虛擬機(jī)的名稱是上一篇文章中新建的。
server.port=8082 #rabbitmq服務(wù)器ip spring.rabbitmq.host=localhost #rabbitmq的端口 spring.rabbitmq.port=5672 #用戶名 spring.rabbitmq.username=用戶名 #密碼 spring.rabbitmq.password=密碼 #配置虛擬機(jī) spring.rabbitmq.virtual-host=demo
聲明交換機(jī)、隊(duì)列并綁定:
@Configuration public class RabbitMqConfig { @Bean public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter()); return rabbitTemplate; } @Bean public MessageConverter jackson2JsonMessageConverter() { return new Jackson2JsonMessageConverter(); } @Bean public DirectExchange getExchange(){ return new DirectExchange("directExchange",false,false); } @Bean public Queue getQueue(){ return new Queue("publisher.addUser",true,false,false); } @Bean public Binding getBinding(DirectExchange exchange,Queue queue){ return BindingBuilder.bind(queue).to(exchange).with("publisher.addUser"); } }
新建User實(shí)體類
@Data public class User { private Long id; private String name; private String desc; }
在方法中使用RabbitTemplate來(lái)發(fā)送消息:
public interface PublisherService { /** * 添加用戶 * @param user 用戶信息 */ void addUser(User user); }
@RequiredArgsConstructor @Service public class PublisherServiceImpl implements PublisherService{ private final RabbitTemplate rabbitTemplate; @Override public void addUser(User user) { rabbitTemplate.convertAndSend("directExchange","publisher.addUser",user); } }
以上需要注意的就是交換機(jī)的名稱、隊(duì)列名、routingKey。示例中使用的是直連交換機(jī),routingKey需要和隊(duì)列名保持一致。不懂的可以查看上一篇文章。
controller:
@RequiredArgsConstructor @RestController @RequestMapping("/user") public class UserController { private final PublisherService publisherService; @PostMapping("/add") public void add(){ User user = new User(); user.setId(1000L); user.setName("黃忠"); user.setDesc("老兵不死,只是逐漸凋零"); publisherService.addUser(user); } }
2-2消費(fèi)者
消費(fèi)者的配置和生產(chǎn)者一樣,不贅述了,直接看代碼:
@Service @Slf4j public class ConsumerService { @RabbitListener(queues ="publisher.addUser") public void addUser(String userStr){ User user = JSONObject.parseObject(userStr,User.class); log.info(user.toString()); } }
@RabbitListener 注解是指定某方法作為消息消費(fèi)的方法,指定隊(duì)列名稱。@RabbitListener 如果標(biāo)注在類上,需配合 @RabbitHandler 注解一起使用,根據(jù)接受的參數(shù)類型進(jìn)入具體的方法中。
2-3測(cè)試
消費(fèi)端在啟動(dòng)時(shí)可能會(huì)報(bào)找不到交換機(jī)或隊(duì)列,只需要讓生產(chǎn)者發(fā)送一次消息,從控制臺(tái)就可以看到相關(guān)的交換機(jī)和隊(duì)列等信息了。
可以看到消費(fèi)者成功消費(fèi)了消息:
3、消費(fèi)流程
通過(guò)上述操作,我們已經(jīng)會(huì)簡(jiǎn)單地使用RabbitMQ了,接下來(lái)了解一下它的整個(gè)流程。如此可以讓我們掌握的更牢固。
生產(chǎn)者:
- 生產(chǎn)者連接到Message Broker【也就是RabbitMQ服務(wù)】,建立一個(gè)連接( Connection)開啟一個(gè)信道(Channel)。
- 生產(chǎn)者聲明一個(gè)交換機(jī),并設(shè)置相關(guān)屬性,比如交換機(jī)類型、是否持久化等。
- 生產(chǎn)者聲明一個(gè)隊(duì)列并設(shè)置相關(guān)屬性。
- 生產(chǎn)者通過(guò)路由鍵【Routing Key】將交換機(jī)和隊(duì)列綁定。
- 生產(chǎn)者發(fā)送消息至RabbitMQ Broker,其中包含路由鍵、交換器等信息。
- 相應(yīng)的交換機(jī)根據(jù)接收到的路由鍵查找相匹配的隊(duì)列。
- 如果找到,則將從生產(chǎn)者發(fā)送過(guò)來(lái)的消息存入相應(yīng)的隊(duì)列中。
- 如果沒有找到,則根據(jù)生產(chǎn)者配置的屬性選擇丟棄還是回退給生產(chǎn)者
- 關(guān)閉信道。
- 關(guān)閉連接。
消費(fèi)者:
- 消費(fèi)者連接到RabbitMQ Broker ,建立一個(gè)連接(Connection),開啟一個(gè)信道(Channel) 。
- 消費(fèi)者向RabbitMQ Broker 請(qǐng)求消費(fèi)相應(yīng)隊(duì)列中的消息,可能會(huì)設(shè)置相應(yīng)的回調(diào)函數(shù),
- 等待RabbitMQ Broker 回應(yīng)并投遞相應(yīng)隊(duì)列中的消息,消費(fèi)者接收消息。
- 消費(fèi)者確認(rèn)(ack) 接收到的消息。
- RabbitMQ 從隊(duì)列中刪除相應(yīng)己經(jīng)被確認(rèn)的消息。
- 關(guān)閉信道。
- 關(guān)閉連接。
到此這篇關(guān)于SpringBoot整合RabbitMQ及其原理分析的文章就介紹到這了,更多相關(guān)SpringBoot整合RabbitMQ內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
mybatis不加@Parm注解報(bào)錯(cuò)的解決方案
這篇文章主要介紹了mybatis不加@Parm注解報(bào)錯(cuò)的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-11-11SpringBoot實(shí)現(xiàn)在一個(gè)模塊中引入另一個(gè)模塊
這篇文章主要介紹了SpringBoot實(shí)現(xiàn)在一個(gè)模塊中引入另一個(gè)模塊的方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-10-10SpringBoot快速整合RabbitMq小案例(使用步驟)
這篇文章主要介紹了SpringBoot快速整合RabbitMq小案例,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-06-06java實(shí)現(xiàn)寫入并保存txt文件的代碼詳解
在本篇文章里小編給大家整理了關(guān)于java實(shí)現(xiàn)寫入并保存txt文件的代碼實(shí)例內(nèi)容,需要的朋友們可以參考學(xué)習(xí)下。2020-02-02解決IDEA target文件夾越來(lái)越大的問(wèn)題
這篇文章主要介紹了解決IDEA target文件夾越來(lái)越大的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-02-02Java線程池隊(duì)列PriorityBlockingQueue原理分析
這篇文章主要介紹了Java線程池隊(duì)列PriorityBlockingQueue原理分析,PriorityBlockingQueue隊(duì)列是?JDK1.5?的時(shí)候出來(lái)的一個(gè)阻塞隊(duì)列,但是該隊(duì)列入隊(duì)的時(shí)候是不會(huì)阻塞的,永遠(yuǎn)會(huì)加到隊(duì)尾,需要的朋友可以參考下2023-12-12