springboot整合rabbitmq的示例代碼
概述
- RabbitMQ是一個開源的消息代理和隊列服務(wù)器,用來通過普通協(xié)議在完全不同的應(yīng)用之間共享數(shù)據(jù),或者簡單地將作業(yè)隊列以便讓分布式服務(wù)器進(jìn)行處理。
- 它現(xiàn)實(shí)了AMQP協(xié)議,并且遵循Mozilla Public License開源協(xié)議,它支持多種語言,可以方便的和spring集成。
- 消息隊列使用消息將應(yīng)用程序連接起來,這些消息通過像RabbitMQ這樣的消息代理服務(wù)器在應(yīng)用程序之間路由。
基本概念
Broker
用來處理數(shù)據(jù)的消息隊列服務(wù)器實(shí)體
vhost
由RabbitMQ服務(wù)器創(chuàng)建的虛擬消息主機(jī),擁有自己的權(quán)限機(jī)制,一個broker里可以開設(shè)多個vhost,用于不同用戶的權(quán)限隔離,vhost之間是也完全隔離的。
productor
產(chǎn)生用于消息通信的數(shù)據(jù)
channel
消息通道,在AMQP中可以建立多個channel,每個channel代表一個會話任務(wù)。
exchange
direct
轉(zhuǎn)發(fā)消息到routing-key指定的隊列
fanout
fanout
轉(zhuǎn)發(fā)消息到所有綁定的隊列,類似于一種廣播發(fā)送的方式。
topic
topic
按照規(guī)則轉(zhuǎn)發(fā)消息,這種規(guī)則多為模式匹配,也顯得更加靈活
queue
queue
- 隊列是RabbitMQ的內(nèi)部對象,存儲消息
- 以動態(tài)的增加消費(fèi)者,隊列將接受到的消息以輪詢(round-robin)的方式均勻的分配給多個消費(fèi)者。
binding
表示交換機(jī)和隊列之間的關(guān)系,在進(jìn)行綁定時,帶有一個額外的參數(shù)binding-key,來和routing-key相匹配。
consumer
監(jiān)聽消息隊列來進(jìn)行消息數(shù)據(jù)的讀取
springboot下三種Exchange模式(fanout,direct,topic)實(shí)現(xiàn)
pom.xml中引用spring-boot-starter-amqp
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
增加rabbitmq配置
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest
direct
direct模式一般情況下只需要定義queue 使用自帶交換機(jī)(defaultExchange)無需綁定交換機(jī)
@Configuration public class RabbitP2PConfigure { public static final String QUEUE_NAME = "p2p-queue"; @Bean public Queue queue() { return new Queue(QUEUE_NAME, true); } }
@RunWith(SpringRunner.class) @SpringBootTest(classes = BootCoreTestApplication.class) @Slf4j public class RabbitTest { @Autowired private AmqpTemplate amqpTemplate; /** * 發(fā)送 */ @Test public void sendLazy() throws InterruptedException { City city = new City(234556666L, "direct_name", "direct_code"); amqpTemplate.convertAndSend(RabbitLazyConfigure.QUEUE_NAME, city); } /** * 領(lǐng)取 */ @Test public void receive() throws InterruptedException { Object obj = amqpTemplate.receiveAndConvert(RabbitLazyConfigure.QUEUE_NAME); Assert.notNull(obj, ""); log.debug(obj.toString()); } }
適用場景:點(diǎn)對點(diǎn)
fanout
fanout則模式需要將多個queue綁定在同一個交換機(jī)上
@Configuration public class RabbitFanoutConfigure { public static final String EXCHANGE_NAME = "fanout-exchange"; public static final String FANOUT_A = "fanout.A"; public static final String FANOUT_B = "fanout.B"; public static final String FANOUT_C = "fanout.C"; @Bean public Queue AMessage() { return new Queue(FANOUT_A); } @Bean public Queue BMessage() { return new Queue(FANOUT_B); } @Bean public Queue CMessage() { return new Queue(FANOUT_C); } @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(EXCHANGE_NAME); } @Bean public Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(AMessage).to(fanoutExchange); } @Bean public Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(BMessage).to(fanoutExchange); } @Bean public Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(CMessage).to(fanoutExchange); } }
發(fā)送者
@Slf4j public class Sender { @Autowired private AmqpTemplate rabbitTemplate; public void sendFanout(Object message) { log.debug("begin send fanout message<" + message + ">"); rabbitTemplate.convertAndSend(RabbitFanoutConfigure.EXCHANGE_NAME, "", message); } }
我們可以通過@RabbitListener監(jiān)聽多個queue來進(jìn)行消費(fèi)
@Slf4j @RabbitListener(queues = { RabbitFanoutConfigure.FANOUT_A, RabbitFanoutConfigure.FANOUT_B, RabbitFanoutConfigure.FANOUT_C }) public class Receiver { @RabbitHandler public void receiveMessage(String message) { log.debug("Received <" + message + ">"); } }
適用場景
- 大規(guī)模多用戶在線(MMO)游戲可以使用它來處理排行榜更新等全局事件
- 體育新聞網(wǎng)站可以用它來近乎實(shí)時地將比分更新分發(fā)給移動客戶端
- 分發(fā)系統(tǒng)使用它來廣播各種狀態(tài)和配置更新
- 在群聊的時候,它被用來分發(fā)消息給參與群聊的用戶
topic
這種模式較為復(fù)雜,簡單來說,就是每個隊列都有其關(guān)心的主題,所有的消息都帶有一個“標(biāo)題”,Exchange會將消息轉(zhuǎn)發(fā)到所有關(guān)注主題能與RouteKey模糊匹配的隊列。
在進(jìn)行綁定時,要提供一個該隊列關(guān)心的主題,如“topic.# (“#”表示0個或若干個關(guān)鍵字,“*”表示一個關(guān)鍵字。 )
@Configuration public class RabbitTopicConfigure { public static final String EXCHANGE_NAME = "topic-exchange"; public static final String TOPIC = "topic"; public static final String TOPIC_A = "topic.A"; public static final String TOPIC_B = "topic.B"; @Bean public Queue queueTopic() { return new Queue(RabbitTopicConfigure.TOPIC); } @Bean public Queue queueTopicA() { return new Queue(RabbitTopicConfigure.TOPIC_A); } @Bean public Queue queueTopicB() { return new Queue(RabbitTopicConfigure.TOPIC_B); } @Bean public TopicExchange exchange() { TopicExchange topicExchange = new TopicExchange(EXCHANGE_NAME); topicExchange.setDelayed(true); return new TopicExchange(EXCHANGE_NAME); } @Bean public Binding bindingExchangeTopic(Queue queueTopic, TopicExchange exchange) { return BindingBuilder.bind(queueTopic).to(exchange).with(RabbitTopicConfigure.TOPIC); } @Bean public Binding bindingExchangeTopics(Queue queueTopicA, TopicExchange exchange) { return BindingBuilder.bind(queueTopicA).to(exchange).with("topic.#"); } }
同時去監(jiān)聽三個queue
@Slf4j @RabbitListener(queues = { RabbitTopicConfigure.TOPIC, RabbitTopicConfigure.TOPIC_A, RabbitTopicConfigure.TOPIC_B }) public class Receiver { @RabbitHandler public void receiveMessage(String message) { log.debug("Received <" + message + ">"); } }
通過測試我們可以發(fā)現(xiàn)
@RunWith(SpringRunner.class) @SpringBootTest(classes = BootCoreTestApplication.class) public class RabbitTest { @Autowired private AmqpTemplate rabbitTemplate; @Test public void sendAll() { rabbitTemplate.convertAndSend(RabbitTopicConfigure.EXCHANGE_NAME, "topic.test", "send All"); } @Test public void sendTopic() { rabbitTemplate.convertAndSend(RabbitTopicConfigure.EXCHANGE_NAME, RabbitTopicConfigure.TOPIC, "send Topic"); } @Test public void sendTopicA() { rabbitTemplate.convertAndSend(RabbitTopicConfigure.EXCHANGE_NAME, RabbitTopicConfigure.TOPIC_A, "send TopicA"); } }
適用場景
- 分發(fā)有關(guān)于特定地理位置的數(shù)據(jù),例如銷售點(diǎn)
- 由多個工作者(workers)完成的后臺任務(wù),每個工作者負(fù)責(zé)處理某些特定的任務(wù)
- 股票價格更新(以及其他類型的金融數(shù)據(jù)更新)
- 涉及到分類或者標(biāo)簽的新聞更新(例如,針對特定的運(yùn)動項目或者隊伍)
- 云端的不同種類服務(wù)的協(xié)調(diào)
- 分布式架構(gòu)/基于系統(tǒng)的軟件封裝,其中每個構(gòu)建者僅能處理一個特定的架構(gòu)或者系統(tǒng)。
延遲隊列
延遲消費(fèi):
- 如用戶生成訂單之后,需要過一段時間校驗訂單的支付狀態(tài),如果訂單仍未支付則需要及時地關(guān)閉訂單。
- 用戶注冊成功之后,需要過一段時間比如一周后校驗用戶的使用情況,如果發(fā)現(xiàn)用戶活躍度較低,則發(fā)送郵件或者短信來提醒用戶使用。
延遲重試:
- 如消費(fèi)者從隊列里消費(fèi)消息時失敗了,但是想要延遲一段時間后自動重試。
- 如果不使用延遲隊列,那么我們只能通過一個輪詢掃描程序去完成。這種方案既不優(yōu)雅,也不方便做成統(tǒng)一的服務(wù)便于開發(fā)人員使用。但是使用延遲隊列的話,我們就可以輕而易舉地完成。
設(shè)置交換機(jī)延遲屬性為true
@Configuration public class RabbitLazyConfigure { public static final String QUEUE_NAME = "lazy-queue-t"; public static final String EXCHANGE_NAME = "lazy-exchange-t"; @Bean public Queue queue() { return new Queue(QUEUE_NAME, true); } @Bean public DirectExchange defaultExchange() { DirectExchange directExchange = new DirectExchange(EXCHANGE_NAME, true, false); directExchange.setDelayed(true); return directExchange; } @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(defaultExchange()).with(QUEUE_NAME); } }
發(fā)送時設(shè)置延遲時間即可
@Slf4j public class Sender { @Autowired private AmqpTemplate rabbitTemplate; public void sendLazy(Object msg) { log.debug("begin send lazy message<" + msg + ">"); rabbitTemplate.convertAndSend(RabbitLazyConfigure.EXCHANGE_NAME, RabbitLazyConfigure.QUEUE_NAME, msg, message -> { message.getMessageProperties().setHeader("x-delay", 10000); return message; } ); } }
結(jié)束
各種使用案例請直接查看官方文檔
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
- springboot集成rabbitMQ之對象傳輸?shù)姆椒?/a>
- springboot實(shí)現(xiàn)rabbitmq的隊列初始化和綁定
- SpringBoot+RabbitMq具體使用的幾種姿勢
- SpringBoot使用RabbitMQ延時隊列(小白必備)
- springboot + rabbitmq 如何實(shí)現(xiàn)消息確認(rèn)機(jī)制(踩坑經(jīng)驗)
- SpringBoot中RabbitMQ集群的搭建詳解
- SpringBoot中連接多個RabbitMQ的方法詳解
- SpringBoot實(shí)現(xiàn)RabbitMQ監(jiān)聽消息的四種方式
- SpringBoot整合RabbitMQ之路由模式的實(shí)現(xiàn)
相關(guān)文章
Java 數(shù)據(jù)結(jié)構(gòu)與算法系列精講之時間復(fù)雜度與空間復(fù)雜度
對于一個算法,其時間復(fù)雜度和空間復(fù)雜度往往是相互影響的,當(dāng)追求一個較好的時間復(fù)雜度時,可能會使空間復(fù)雜度的性能變差,即可能導(dǎo)致占用較多的存儲空間,這篇文章主要給大家介紹了關(guān)于Java時間復(fù)雜度、空間復(fù)雜度的相關(guān)資料,需要的朋友可以參考下2022-02-02關(guān)于Spring Bean實(shí)例過程中使用反射和遞歸處理的Bean屬性填充問題
本文帶領(lǐng)大家一起學(xué)習(xí)下在Spring Bean實(shí)例過程中如何使用反射和遞歸處理的Bean屬性填充,需要在類 AbstractAutowireCapableBeanFactory 的 createBean 方法中添加補(bǔ)全屬性方法,具體操作方法跟隨小編一起學(xué)習(xí)下吧2021-06-06Java Swing BoxLayout箱式布局的實(shí)現(xiàn)代碼
這篇文章主要介紹了Java Swing BoxLayout箱式布局的實(shí)現(xiàn)代碼,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-12-12Springboot整合spring-boot-starter-data-elasticsearch的過程
本文詳細(xì)介紹了Springboot整合spring-boot-starter-data-elasticsearch的過程,包括版本要求、依賴添加、實(shí)體類添加、索引的名稱、分片、副本設(shè)置等,同時,還介紹了如何使用ElasticsearchRepository類進(jìn)行增刪改查操作2024-10-10