欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

springboot整合rabbitmq的示例代碼

 更新時(shí)間:2017年12月12日 08:38:26   作者:胡運(yùn)凡  
本篇文章主要介紹了springboot整合rabbitmq的示例代碼,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧

概述

  1. RabbitMQ是一個(gè)開源的消息代理和隊(duì)列服務(wù)器,用來(lái)通過(guò)普通協(xié)議在完全不同的應(yīng)用之間共享數(shù)據(jù),或者簡(jiǎn)單地將作業(yè)隊(duì)列以便讓分布式服務(wù)器進(jìn)行處理。
  2. 它現(xiàn)實(shí)了AMQP協(xié)議,并且遵循Mozilla Public License開源協(xié)議,它支持多種語(yǔ)言,可以方便的和spring集成。
  3. 消息隊(duì)列使用消息將應(yīng)用程序連接起來(lái),這些消息通過(guò)像RabbitMQ這樣的消息代理服務(wù)器在應(yīng)用程序之間路由。

基本概念

Broker

用來(lái)處理數(shù)據(jù)的消息隊(duì)列服務(wù)器實(shí)體

vhost

由RabbitMQ服務(wù)器創(chuàng)建的虛擬消息主機(jī),擁有自己的權(quán)限機(jī)制,一個(gè)broker里可以開設(shè)多個(gè)vhost,用于不同用戶的權(quán)限隔離,vhost之間是也完全隔離的。

productor

產(chǎn)生用于消息通信的數(shù)據(jù)

channel

消息通道,在AMQP中可以建立多個(gè)channel,每個(gè)channel代表一個(gè)會(huì)話任務(wù)。

exchange

direct

轉(zhuǎn)發(fā)消息到routing-key指定的隊(duì)列

fanout

fanout

轉(zhuǎn)發(fā)消息到所有綁定的隊(duì)列,類似于一種廣播發(fā)送的方式。

topic

topic

按照規(guī)則轉(zhuǎn)發(fā)消息,這種規(guī)則多為模式匹配,也顯得更加靈活

queue

queue

  1.  隊(duì)列是RabbitMQ的內(nèi)部對(duì)象,存儲(chǔ)消息
  2. 以動(dòng)態(tài)的增加消費(fèi)者,隊(duì)列將接受到的消息以輪詢(round-robin)的方式均勻的分配給多個(gè)消費(fèi)者。

binding

表示交換機(jī)和隊(duì)列之間的關(guān)系,在進(jìn)行綁定時(shí),帶有一個(gè)額外的參數(shù)binding-key,來(lái)和routing-key相匹配。

consumer

監(jiān)聽消息隊(duì)列來(lái)進(jìn)行消息數(shù)據(jù)的讀取

springboot下三種Exchange模式(fanout,direct,topic)實(shí)現(xiàn)

pom.xml中引用spring-boot-starter-amqp

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

增加rabbitmq配置

spring: 
 rabbitmq: 
 host: localhost 
 port: 5672 
 username: guest 
 password: guest

direct

direct模式一般情況下只需要定義queue 使用自帶交換機(jī)(defaultExchange)無(wú)需綁定交換機(jī)

  @Configuration
public class RabbitP2PConfigure {  
 public static final String QUEUE_NAME = "p2p-queue";
  @Bean
  public Queue queue() {
    return new Queue(QUEUE_NAME, true);
  }

}
@RunWith(SpringRunner.class)
@SpringBootTest(classes = BootCoreTestApplication.class)
@Slf4j
public class RabbitTest {
  @Autowired
  private AmqpTemplate amqpTemplate;

  /**
  * 發(fā)送
  */
  @Test
  public void sendLazy() throws InterruptedException {
    City city = new City(234556666L, "direct_name", "direct_code");
    amqpTemplate.convertAndSend(RabbitLazyConfigure.QUEUE_NAME, city);
  }
  
  /**
  * 領(lǐng)取
  */
  @Test
  public void receive() throws InterruptedException {
    Object obj = amqpTemplate.receiveAndConvert(RabbitLazyConfigure.QUEUE_NAME);
    Assert.notNull(obj, "");
    log.debug(obj.toString());
  }
}

適用場(chǎng)景:點(diǎn)對(duì)點(diǎn)

fanout

fanout則模式需要將多個(gè)queue綁定在同一個(gè)交換機(jī)上

@Configuration
public class RabbitFanoutConfigure {
  public static final String EXCHANGE_NAME = "fanout-exchange";
  public static final String FANOUT_A = "fanout.A";
  public static final String FANOUT_B = "fanout.B";
  public static final String FANOUT_C = "fanout.C";

  @Bean
  public Queue AMessage() {
    return new Queue(FANOUT_A);
  }

  @Bean
  public Queue BMessage() {
    return new Queue(FANOUT_B);
  }

  @Bean
  public Queue CMessage() {
    return new Queue(FANOUT_C);
  }

  @Bean
  public FanoutExchange fanoutExchange() {
    return new FanoutExchange(EXCHANGE_NAME);
  }

  @Bean
  public Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {
    return BindingBuilder.bind(AMessage).to(fanoutExchange);
  }

  @Bean
  public Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
    return BindingBuilder.bind(BMessage).to(fanoutExchange);
  }

  @Bean
  public Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
    return BindingBuilder.bind(CMessage).to(fanoutExchange);
  }

}

發(fā)送者

@Slf4j
public class Sender {

  @Autowired
  private AmqpTemplate rabbitTemplate;

  public void sendFanout(Object message) {
    log.debug("begin send fanout message<" + message + ">");
    rabbitTemplate.convertAndSend(RabbitFanoutConfigure.EXCHANGE_NAME, "", message);
  }

}

我們可以通過(guò)@RabbitListener監(jiān)聽多個(gè)queue來(lái)進(jìn)行消費(fèi)

@Slf4j
@RabbitListener(queues = {
    RabbitFanoutConfigure.FANOUT_A,
    RabbitFanoutConfigure.FANOUT_B,
    RabbitFanoutConfigure.FANOUT_C
})
public class Receiver {

  @RabbitHandler
  public void receiveMessage(String message) {
    log.debug("Received <" + message + ">");
  }
}

適用場(chǎng)景
- 大規(guī)模多用戶在線(MMO)游戲可以使用它來(lái)處理排行榜更新等全局事件
- 體育新聞網(wǎng)站可以用它來(lái)近乎實(shí)時(shí)地將比分更新分發(fā)給移動(dòng)客戶端
- 分發(fā)系統(tǒng)使用它來(lái)廣播各種狀態(tài)和配置更新
- 在群聊的時(shí)候,它被用來(lái)分發(fā)消息給參與群聊的用戶

topic

這種模式較為復(fù)雜,簡(jiǎn)單來(lái)說(shuō),就是每個(gè)隊(duì)列都有其關(guān)心的主題,所有的消息都帶有一個(gè)“標(biāo)題”,Exchange會(huì)將消息轉(zhuǎn)發(fā)到所有關(guān)注主題能與RouteKey模糊匹配的隊(duì)列。

在進(jìn)行綁定時(shí),要提供一個(gè)該隊(duì)列關(guān)心的主題,如“topic.# (“#”表示0個(gè)或若干個(gè)關(guān)鍵字,“*”表示一個(gè)關(guān)鍵字。 )

@Configuration
public class RabbitTopicConfigure {
  public static final String EXCHANGE_NAME = "topic-exchange";
  public static final String TOPIC = "topic";
  public static final String TOPIC_A = "topic.A";
  public static final String TOPIC_B = "topic.B";

  @Bean
  public Queue queueTopic() {
    return new Queue(RabbitTopicConfigure.TOPIC);
  }

  @Bean
  public Queue queueTopicA() {
    return new Queue(RabbitTopicConfigure.TOPIC_A);
  }

  @Bean
  public Queue queueTopicB() {
    return new Queue(RabbitTopicConfigure.TOPIC_B);
  }

  @Bean
  public TopicExchange exchange() {
    TopicExchange topicExchange = new TopicExchange(EXCHANGE_NAME);
    topicExchange.setDelayed(true);
    return new TopicExchange(EXCHANGE_NAME);
  }

  @Bean
  public Binding bindingExchangeTopic(Queue queueTopic, TopicExchange exchange) {
    return BindingBuilder.bind(queueTopic).to(exchange).with(RabbitTopicConfigure.TOPIC);
  }

  @Bean
  public Binding bindingExchangeTopics(Queue queueTopicA, TopicExchange exchange) {
    return BindingBuilder.bind(queueTopicA).to(exchange).with("topic.#");
  }
}

同時(shí)去監(jiān)聽三個(gè)queue

@Slf4j
@RabbitListener(queues = {
    RabbitTopicConfigure.TOPIC,
    RabbitTopicConfigure.TOPIC_A,
    RabbitTopicConfigure.TOPIC_B
})
public class Receiver {
  @RabbitHandler
  public void receiveMessage(String message) {
    log.debug("Received <" + message + ">");
  }
}

通過(guò)測(cè)試我們可以發(fā)現(xiàn)

@RunWith(SpringRunner.class)
@SpringBootTest(classes = BootCoreTestApplication.class)
public class RabbitTest {
  @Autowired
  private AmqpTemplate rabbitTemplate;

  @Test
  public void sendAll() {
    rabbitTemplate.convertAndSend(RabbitTopicConfigure.EXCHANGE_NAME, "topic.test", "send All");
  }

  @Test
  public void sendTopic() {
    rabbitTemplate.convertAndSend(RabbitTopicConfigure.EXCHANGE_NAME, RabbitTopicConfigure.TOPIC, "send Topic");
  }

  @Test
  public void sendTopicA() {
    rabbitTemplate.convertAndSend(RabbitTopicConfigure.EXCHANGE_NAME, RabbitTopicConfigure.TOPIC_A, "send TopicA");
  }
}

適用場(chǎng)景
- 分發(fā)有關(guān)于特定地理位置的數(shù)據(jù),例如銷售點(diǎn)
- 由多個(gè)工作者(workers)完成的后臺(tái)任務(wù),每個(gè)工作者負(fù)責(zé)處理某些特定的任務(wù)
- 股票價(jià)格更新(以及其他類型的金融數(shù)據(jù)更新)
- 涉及到分類或者標(biāo)簽的新聞更新(例如,針對(duì)特定的運(yùn)動(dòng)項(xiàng)目或者隊(duì)伍)
- 云端的不同種類服務(wù)的協(xié)調(diào)
- 分布式架構(gòu)/基于系統(tǒng)的軟件封裝,其中每個(gè)構(gòu)建者僅能處理一個(gè)特定的架構(gòu)或者系統(tǒng)。

延遲隊(duì)列

延遲消費(fèi):

  1. 如用戶生成訂單之后,需要過(guò)一段時(shí)間校驗(yàn)訂單的支付狀態(tài),如果訂單仍未支付則需要及時(shí)地關(guān)閉訂單。
  2. 用戶注冊(cè)成功之后,需要過(guò)一段時(shí)間比如一周后校驗(yàn)用戶的使用情況,如果發(fā)現(xiàn)用戶活躍度較低,則發(fā)送郵件或者短信來(lái)提醒用戶使用。

延遲重試:

  1. 如消費(fèi)者從隊(duì)列里消費(fèi)消息時(shí)失敗了,但是想要延遲一段時(shí)間后自動(dòng)重試。
  2. 如果不使用延遲隊(duì)列,那么我們只能通過(guò)一個(gè)輪詢掃描程序去完成。這種方案既不優(yōu)雅,也不方便做成統(tǒng)一的服務(wù)便于開發(fā)人員使用。但是使用延遲隊(duì)列的話,我們就可以輕而易舉地完成。

設(shè)置交換機(jī)延遲屬性為true

@Configuration
public class RabbitLazyConfigure {
  public static final String QUEUE_NAME = "lazy-queue-t";
  public static final String EXCHANGE_NAME = "lazy-exchange-t";

  @Bean
  public Queue queue() {
    return new Queue(QUEUE_NAME, true);
  }

  @Bean
  public DirectExchange defaultExchange() {
    DirectExchange directExchange = new DirectExchange(EXCHANGE_NAME, true, false);
    directExchange.setDelayed(true);
    return directExchange;
  }

  @Bean
  public Binding binding() {
    return BindingBuilder.bind(queue()).to(defaultExchange()).with(QUEUE_NAME);
  }

}

發(fā)送時(shí)設(shè)置延遲時(shí)間即可

@Slf4j
public class Sender {
  @Autowired
  private AmqpTemplate rabbitTemplate;
  public void sendLazy(Object msg) {
    log.debug("begin send lazy message<" + msg + ">");
    rabbitTemplate.convertAndSend(RabbitLazyConfigure.EXCHANGE_NAME,
        RabbitLazyConfigure.QUEUE_NAME, msg, message -> {
          message.getMessageProperties().setHeader("x-delay", 10000);
          return message;
        }
    );
  }
}

結(jié)束

各種使用案例請(qǐng)直接查看官方文檔

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • 淺聊java8中數(shù)值流的使用

    淺聊java8中數(shù)值流的使用

    java8為我提供的簡(jiǎn)單快捷的數(shù)值流計(jì)算API,本文就基于幾個(gè)常見的場(chǎng)景介紹一下數(shù)值流API的使用,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以了解下
    2023-10-10
  • JavaEE多線程中阻塞隊(duì)列的項(xiàng)目實(shí)踐

    JavaEE多線程中阻塞隊(duì)列的項(xiàng)目實(shí)踐

    本文主要介紹了JavaEE多線程中阻塞隊(duì)列的項(xiàng)目實(shí)踐,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-09-09
  • MyBatis-Plus多數(shù)據(jù)源的示例代碼

    MyBatis-Plus多數(shù)據(jù)源的示例代碼

    本文主要介紹了MyBatis-Plus多數(shù)據(jù)源的示例代碼,包括依賴配置、數(shù)據(jù)源配置、Mapper 和 Service 的定義,具有一定的參考價(jià)值,感興趣的可以了解一下
    2024-05-05
  • Java注解簡(jiǎn)介和使用詳細(xì)講解

    Java注解簡(jiǎn)介和使用詳細(xì)講解

    Java 語(yǔ)言中的類、構(gòu)造器、方法、成員變量、參數(shù)等都可以被注解進(jìn)行標(biāo)注,這篇文章主要介紹了Java注解的介紹和使用詳細(xì)講解,需要的朋友可以參考下
    2023-02-02
  • Java優(yōu)秀類庫(kù)Hutool使用示例

    Java優(yōu)秀類庫(kù)Hutool使用示例

    Hutool是一個(gè)小而全的Java工具類庫(kù),通過(guò)靜態(tài)方法封裝,降低相關(guān)API的學(xué)習(xí)成本,提高工作效率,涵蓋了Java開發(fā)開發(fā)中的方方面面,使用Hutool可節(jié)省開發(fā)人員對(duì)項(xiàng)目中公用類和公用工具方法的封裝時(shí)間,使開發(fā)專注于業(yè)務(wù),同時(shí)可以最大限度的避免封裝不完善帶來(lái)的bug
    2023-02-02
  • SpringBoot中如何進(jìn)行統(tǒng)一異常處理

    SpringBoot中如何進(jìn)行統(tǒng)一異常處理

    大家好,本篇文章主要講的是SpringBoot中如何進(jìn)行統(tǒng)一異常處理,感興趣的同學(xué)趕快來(lái)看一看吧,對(duì)你有幫助的話記得收藏一下
    2022-02-02
  • 基于Java的電梯系統(tǒng)實(shí)現(xiàn)過(guò)程

    基于Java的電梯系統(tǒng)實(shí)現(xiàn)過(guò)程

    這篇文章主要介紹了基于Java的電梯系統(tǒng)實(shí)現(xiàn)過(guò)程,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-10-10
  • org.apache.ibatis.annotations不存在的問(wèn)題

    org.apache.ibatis.annotations不存在的問(wèn)題

    這篇文章主要介紹了org.apache.ibatis.annotations不存在的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-10-10
  • JAVA新手學(xué)習(xí)篇之類和對(duì)象詳解

    JAVA新手學(xué)習(xí)篇之類和對(duì)象詳解

    這篇文章主要給大家介紹了關(guān)于JAVA新手學(xué)習(xí)篇之類和對(duì)象的相關(guān)資料,Java是面向?qū)ο蟮木幊陶Z(yǔ)言,主旨在于通過(guò)對(duì)象封裝屬性和方法實(shí)現(xiàn)功能,面向?qū)ο笈c面向過(guò)程的區(qū)別在于關(guān)注點(diǎn)的不同,需要的朋友可以參考下
    2024-10-10
  • SpringMVC日期類型接收空值異常問(wèn)題解決方法

    SpringMVC日期類型接收空值異常問(wèn)題解決方法

    這篇文章主要介紹了SpringMVC日期類型接收空值異常問(wèn)題解決方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-10-10

最新評(píng)論