Springboot?整合?RabbitMQ?消息隊列?詳情
生產(chǎn)者工程
POM依賴
可以在創(chuàng)建工程時直接選擇添加依賴。
application文件
因為rabbitmq具有默認地址及用戶信息,所以如果是本地rabbitmq可以不需要進行配置。
RabbitMQ配置文件:
在使用相關交換機及隊列時,我們需要實現(xiàn)聲明交換機及隊列,如果沒有對應信息,則啟動項目會失敗。所以在使用springboot整合rabbitmq時,我們可以通過配置文件來進行交換機、隊列的聲明及二者之間的關系綁定。 由于目前在演示Fanout模式,所以使用FanoutExchange來聲明交換機,其他模式則使用相對應的TopicExchange,DirectExchange來聲明。
@Configuration public class RabbitMQConfiguration { //聲明fanout模式的交換機 @Bean public FanoutExchange fanoutExchange() { ? ? return new FanoutExchange("fanout_order_exchange", true, false); } //聲明隊列 @Bean public Queue smsQueue() { ? ? return new Queue("sms.fanout.queue", true); } @Bean public Queue emailQueue() { ? ? return new Queue("email.fanout.queue", true); } @Bean public Queue duanxinQueue() { ? ? return new Queue("duanxin.fanout.queue", true); } //綁定 @Bean public Binding smsBinding() { ? ? return BindingBuilder.bind(smsQueue()).to(fanoutExchange()); } @Bean public Binding emailBinding() { ? ? return BindingBuilder.bind(emailQueue()).to(fanoutExchange()); } @Bean public Binding duanxinBinding() { ? ? return BindingBuilder.bind(duanxinQueue()).to(fanoutExchange()); } }
生產(chǎn)者業(yè)務代碼
這部分代碼就簡單的通過調用rabbitTemplate來進行消息的分發(fā)。@Service public class OrderService {
@Autowired private RabbitTemplate rabbitTemplate; public void makeOrder() { ? ? // 保存訂單 ? ? String orderId = UUID.randomUUID().toString(); ? ? System.out.println("下單成功:" + orderId); ? ? // 通過MQ完成消息的分發(fā) ? ? // 參數(shù)1:交換機 ;參數(shù)2:路由key/隊列名;參數(shù)3:消息內容 ? ? String exchangeName = "fanout_order_exchange"; ? ? rabbitTemplate.convertAndSend(exchangeName, "", orderId); } }
消費者:
消費者工程和生產(chǎn)者工程類似,我們首先需要引入依賴,然后在application文件中進行相關的配置即可開始編寫代碼。 在消費者工程中我們也可以編寫rabbitmq的配置文件來進行交換機及隊列的聲明。建議在消費端編寫配置文件,因為消費端是先啟動的工程,如果交換機和隊列未創(chuàng)建會導致工程啟動失敗。 消息監(jiān)聽
我們通過RabbitListener注解來監(jiān)聽消息隊列。需要注意的是我們需要通過Component注解將該監(jiān)聽交給spring管理,否則不能正常接收服務端的消息。 這邊只給出一個email的消息監(jiān)聽,上文生產(chǎn)者聲明的duanxin,sms隊列可以自行創(chuàng)建,只需要修改隊列名即可。@Service public class OrderService {
@RabbitListener(queues = {"email.fanout.queue"}) @Component public class FanoutEmailService { ? ? @RabbitHandler ? ? public void receive(String message) { ? ? ? ? System.out.println("email fanout -----》接收到" + message); ? ? } }
測試
首先啟動消費者工程,然后在生產(chǎn)者工程中創(chuàng)建測試類發(fā)送消息即可。
@SpringBootTest class SpringbootOrderRabbitmqProducerApplicationTests { @Autowired private OrderService orderService; @Test void contextLoads() { ? ? orderService.makeOrder(); } }
當發(fā)送消息后,我們可以在控制臺中發(fā)現(xiàn)消費者成功接受消息。
Direct 模式
生產(chǎn)者
建立工程的步驟和上文相同。
配置文件
配置和上文基本相同,由于該部分測試direct模式,所以需要使用DirectExchange創(chuàng)建交換機。需要注意的是該類中的方法名不能和上文rabbitmq的配置文件中的方法名相同,因為我們使用bean注解將其交給spring管理,如果名字相同,則會啟動項目失敗。
@Configuration public class DirectRabbitMQConfiguration { //聲明direct模式的交換機 @Bean public DirectExchange directExchange() { ? ? return new DirectExchange("direct_order_exchange", true, false); } //聲明隊列 @Bean public Queue smsDirectQueue() { ? ? return new Queue("sms.direct.queue", true); } @Bean public Queue emailDirectQueue() { ? ? return new Queue("email.direct.queue", true); } @Bean public Queue duanxinDirectQueue() { ? ? return new Queue("duanxin.direct.queue", true); } //綁定 @Bean public Binding smsDirectBinding() { ? ? return BindingBuilder.bind(smsDirectQueue()).to(directExchange()).with("sms"); } @Bean public Binding emailDirectBinding() { ? ? return BindingBuilder.bind(emailDirectQueue()).to(directExchange()).with("email"); } @Bean public Binding duanxinDirectBinding() { ? ? return BindingBuilder.bind(duanxinDirectQueue()).to(directExchange()).with("duanxin"); } }
業(yè)務代碼
@Service public class OrderService { ? ? @Autowired ? ? private RabbitTemplate rabbitTemplate; ? ? public void makeOrderDirect() { ? ? ? ? // 保存訂單 ? ? ? ? String orderId = UUID.randomUUID().toString(); ? ? ? ? System.out.println("下單成功:" + orderId); ? ? ? ? String exchangeName = "direct_order_exchange"; ? ? ? ? rabbitTemplate.convertAndSend(exchangeName, "sms", orderId); ? ? ? ? rabbitTemplate.convertAndSend(exchangeName, "email", orderId); ? ? } }
消費者
消息監(jiān)聽
和上文相同,只需注意隊列名即可。
@RabbitListener(queues = {"email.direct.queue"}) @Component public class DirectEmailService { ? ? @RabbitHandler ? ? public void receive(String message) { ? ? ? ? System.out.println("email direct -----》接收到" + message); ? ? } }
Topic 模式
上文中個模式都是通過配置文件來聲明交換機,隊列及綁定二者之間的關系;實際上我們還可以通過注解的方式來聲明交換機及注解。
生產(chǎn)者
由于使用注解方式聲明,所以我們不需要創(chuàng)建配置文件,直接編寫業(yè)務代碼即可。測試的時候我們只需修改路由名即可,具體如何修改,請前往文章開頭鏈接查看各模式是如何使用的。
@Service public class OrderService { @Autowired private RabbitTemplate rabbitTemplate; public void makeOrderTopic() { ? ? // 保存訂單 ? ? String orderId = UUID.randomUUID().toString(); ? ? System.out.println("下單成功:" + orderId); ? ? String exchangeName = "topic_order_exchange"; ? ? String routingKey = "com.email"; ? ? rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId); } }
消費者
代碼和上文基本相同,區(qū)別在于我們直接在RabbitListener注解中將隊列和交換機進行綁定。需要注意的是各參數(shù)中都是使用字符串。 value對應的是隊列,相應的參數(shù)分別是隊列名、持久化、自動刪除。 exchange對應的交換機,相應的參數(shù)分別是交換機名以及交換機類型。 key對應的是路由名。
@RabbitListener(bindings = @QueueBinding( ? ? ? ? value = @Queue(value = "email.topic.queue",durable = "true",autoDelete = "false"), ? ? ? ? exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC), ? ? ? ? key = "*.email.#" )) @Component public class TopicEmailService { ? ? @RabbitHandler ? ? public void receive(String message) { ? ? ? ? System.out.println("email topic -----》接收到" + message); ? ? } }
以上便是springboot 整合 rabbitmq的兩種方式。但是在日常開發(fā)中更推薦使用配置文件的形式來實現(xiàn),因為在配置文件中可以更好的處理過期時間、死信隊列等消息隊列中的高級特性。
到此這篇關于Springboot 整合 RabbitMQ 消息隊列 詳情的文章就介紹到這了,更多相關Springboot 整合 RabbitMQ 內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
結合線程池實現(xiàn)apache?kafka消費者組的誤區(qū)及解決方法
這篇文章主要介紹了結合線程池實現(xiàn)apache?kafka消費者組的誤區(qū)及解決方法,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-07-07MySQL查詢字段實現(xiàn)字符串分割split功能的示例代碼
本文主要介紹了MySQL查詢字段實現(xiàn)字符串分割split功能的示例代碼,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-01-01springboot讀取application.yml報錯問題及解決
這篇文章主要介紹了springboot讀取application.yml報錯問題及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-06-06