SpringBoot實現RabbitMQ三種使用方式
基于API的方式
1.使用AmqpAdmin定制消息發(fā)送組件
@Autowired private AmqpAdmin amqpAdmin; @Test public void amqpAdmin(){ //1.定義fanout類型的交換器 amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange")); //2.定義兩個默認持久化隊列,分別處理email和sms amqpAdmin.declareQueue(new Queue("fanout_queue_email")); amqpAdmin.declareQueue(new Queue("fanout_queue_sms")); //3.將隊列分別與交換器進行綁定 // 隊列名 是隊列 交換機的名稱 路由 其它參數 amqpAdmin.declareBinding(new Binding("fanout_queue_email",Binding.DestinationType.QUEUE,"fanout_exchange","",null)); amqpAdmin.declareBinding(new Binding("fanout_queue_sms",Binding.DestinationType.QUEUE,"fanout_exchange","",null)); }
2.消息發(fā)送者發(fā)送消息
創(chuàng)建實體類
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Data @AllArgsConstructor @NoArgsConstructor public class User { private Integer id; private String name; }
發(fā)送消息
@Autowired private RabbitTemplate re; @Test//消息發(fā)送者 public void subPublisher(){ User user = new User(1,"小滿"); re.convertAndSend("fanout_exchange", "", user); }
如圖所以,如果我們直接發(fā)送的話就會報這個錯,有兩種解決方法,第一種是比較常用的讓實體類User實現序列化Serializable接口,這里我們不做演示,第二種是寫一個配置類,只有在RabbitMQ可以使用
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { //定制JSON格式的消息轉化器 @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }
加上配置類后我們發(fā)送就不會報錯了,我們也可以在RabbitMQ的可視化端口看到我們發(fā)送的消息
3.發(fā)送完消息后接下來就是消費消息了,定義接收消息的業(yè)務
import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service public class RabbitMQService { //發(fā)布訂閱模式: @RabbitListener可以指定當前方法監(jiān)控哪一個隊列 @RabbitListener(queues = "fanout_queue_email")//消費者可以消費多個隊列的消息 public void subConsumerEmail(Message message){ //當隊列中有內容是方法會自動執(zhí)行 推薦Object來接收 //官網推薦Message byte[] body = message.getBody();//Message將數據存放在body中 String msg = new String(body); System.out.println("郵件業(yè)務接收到消息:"+msg); } @RabbitListener(queues = "fanout_queue_sms") public void subConsumerSms(Message message){ byte[] body = message.getBody(); String msg = new String(body); System.out.println("短信業(yè)務接收到消息:"+msg); } }
4.重新運行發(fā)送端就可以接收到我們發(fā)送的數據,接收的數據可能打印在任意一個控制臺中,這是idea的機制,我們不需要管
基于配置類的方式
1.在config配置類中定義
import org.springframework.amqp.core.*; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { //定制JSON格式的消息轉化器 @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } // 1.fanout創(chuàng)建一個交換機 @Bean public Exchange fanoutExchange(){ return ExchangeBuilder.fanoutExchange("fanout_exchange").build(); } //2.定義消息隊列 @Bean public Queue fanoutQueueEmail(){ return new Queue("fanout_queue_email"); } @Bean public Queue fanoutQueueSms(){ return new Queue("fanout_queue_sms"); } //3.將創(chuàng)建的隊列綁定到對應的交換機上 @Bean public Binding bingingEmail(){ return BindingBuilder.bind(fanoutQueueEmail()).to(fanoutExchange()).with("").noargs(); } @Bean public Binding bingingSms(){ return BindingBuilder.bind(fanoutQueueSms()).to(fanoutExchange()).with("").noargs(); } }
2.為了避免api的影響,我們可以在可視化端口將基于api創(chuàng)建的交換機和隊列刪除
1)刪除交換機
2)刪除隊列,前面也是點擊隊列的名字
可以看到我已經將交換機和消息隊列都已經刪除,接下來我們重新啟動項目 ,配置類可以在啟動的時候自動創(chuàng)建
我們的訂閱發(fā)布模式也是可以正常運行
基于注解類的方式
1.我們要現將基于配置類的方式注釋掉,避免影響我們測試
import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service public class RabbitMQService { @RabbitListener(bindings = @QueueBinding( value = @Queue("fanout_queue_email"), exchange=@Exchange(value = "fanout_exchange",type = "fanout") )) public void subConsumerEmail(Message message){ //當隊列中有內容是方法會自動執(zhí)行 推薦Object來接收 //官網推薦Message byte[] body = message.getBody();//Message將數據存放在body中 String msg = new String(body); System.out.println("郵件業(yè)務接收到消息:"+msg); } @RabbitListener(bindings = @QueueBinding( value = @Queue("fanout_queue_sms"), exchange=@Exchange(value = "fanout_exchange",type = "fanout") )) public void subConsumerSms(Message message){ byte[] body = message.getBody(); String msg = new String(body); System.out.println("短信業(yè)務接收到消息:"+msg); } }
提前將交換機和隊列刪除,然后運行,就會發(fā)現會在啟動時會自動生成交換機和隊列,測試也不會有影響
到此這篇關于SpringBoot實現RabbitMQ三種使用方式的文章就介紹到這了,更多相關SpringBoot RabbitMQ使用內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Spring @Configuration和@Component的區(qū)別
今天小編就為大家分享一篇關于Spring @Configuration和@Component的區(qū)別,小編覺得內容挺不錯的,現在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2018-12-12Java中的Runnable,Callable,Future,FutureTask的比較
這篇文章主要介紹了Java中的Runnable,Callable,Future,FutureTask的比較的相關資料,需要的朋友可以參考下2017-02-02