欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java搭建RabbitMq消息中間件過程詳解

 更新時間:2019年12月23日 09:26:00   作者:1024。  
這篇文章主要介紹了Java搭建RabbitMq消息中間件過程詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下

這篇文章主要介紹了Java搭建RabbitMq消息中間件過程詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下

前言

當(dāng)系統(tǒng)中出現(xiàn)“生產(chǎn)“和“消費(fèi)“的速度或穩(wěn)定性等因素不一致的時候,就需要消息隊(duì)列。

名詞

  • exchange: 交換機(jī)
  • routingkey: 路由key
  • queue:隊(duì)列

控制臺端口:15672

  exchange和queue是需要綁定在一起的,然后消息發(fā)送到exchange再由exchange通過routingkey發(fā)送到對應(yīng)的隊(duì)列中。

使用場景

1.技能訂單3分鐘自動取消,改變狀態(tài)

2.直播開始前15分鐘提醒

3.直播狀態(tài)自動結(jié)束

流程

  生產(chǎn)者發(fā)送消息 —> order_pre_exchange交換機(jī) —> order_per_ttl_delay_queue隊(duì)列

  —> 時間到期 —> order_delay_exchange交換機(jī) —> order_delay_process_queue隊(duì)列 —> 消費(fèi)者

第一步:在pom文件中添加

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

第二步:在application.properties文件中添加

spring.rabbitmq.host=172.xx.xx.xxx
spring.rabbitmq.port=5672
spring.rabbitmq.username=rabbit
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true

第三步:配置 OrderQueueConfig

package com.tuohang.platform.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * rabbitMQ的隊(duì)列設(shè)置(生產(chǎn)者發(fā)送的消息,永遠(yuǎn)是先進(jìn)入exchange,再通過路由,轉(zhuǎn)發(fā)到隊(duì)列)
 * 
 * 
 * @author Administrator
 * @version 1.0
 * @Date 2018年9月18日
 */
@Configuration
public class OrderQueueConfig {

  /**
   * 訂單緩沖交換機(jī)名稱
   */
  public final static String ORDER_PRE_EXCHANGE_NAME = "order_pre_exchange";

  /**
   * 發(fā)送到該隊(duì)列的message會在一段時間后過期進(jìn)入到order_delay_process_queue 【隊(duì)列里所有的message都有統(tǒng)一的失效時間】
   */
  public final static String ORDER_PRE_TTL_DELAY_QUEUE_NAME = "order_pre_ttl_delay_queue";

  /**
   * 訂單的交換機(jī)DLX 名字
   */
  final static String ORDER_DELAY_EXCHANGE_NAME = "order_delay_exchange";

  /**
   * 訂單message時間過期后進(jìn)入的隊(duì)列,也就是訂單實(shí)際的消費(fèi)隊(duì)列
   */
  public final static String ORDER_DELAY_PROCESS_QUEUE_NAME = "order_delay_process_queue";

  /**
   * 訂單在緩沖隊(duì)列過期時間(毫秒)30分鐘
   */
  public final static int ORDER_QUEUE_EXPIRATION = 1800000;

  /**
   * 訂單緩沖交換機(jī)
   * 
   * @return
   */
  @Bean
  public DirectExchange preOrderExange() {
    return new DirectExchange(ORDER_PRE_EXCHANGE_NAME);
  }

  /**
   * 創(chuàng)建order_per_ttl_delay_queue隊(duì)列,訂單消息經(jīng)過緩沖交換機(jī),會進(jìn)入該隊(duì)列
   * 
   * @return
   */
  @Bean
  public Queue delayQueuePerOrderTTLQueue() {
    return QueueBuilder.durable(ORDER_PRE_TTL_DELAY_QUEUE_NAME)
        .withArgument("x-dead-letter-exchange", ORDER_DELAY_EXCHANGE_NAME) // DLX
        .withArgument("x-dead-letter-routing-key", ORDER_DELAY_PROCESS_QUEUE_NAME) // dead letter攜帶的routing key
        .withArgument("x-message-ttl", ORDER_QUEUE_EXPIRATION) // 設(shè)置訂單隊(duì)列的過期時間
        .build();
  }

  /**
   * 將order_pre_exchange綁定到order_pre_ttl_delay_queue隊(duì)列
   *
   * @param delayQueuePerOrderTTLQueue
   * @param preOrderExange
   * @return
   */
  @Bean
  public Binding queueOrderTTLBinding(Queue delayQueuePerOrderTTLQueue, DirectExchange preOrderExange) {
    return BindingBuilder.bind(delayQueuePerOrderTTLQueue).to(preOrderExange).with(ORDER_PRE_TTL_DELAY_QUEUE_NAME);
  }

  /**
   * 創(chuàng)建訂單的DLX exchange
   *
   * @return
   */
  @Bean
  public DirectExchange delayOrderExchange() {
    return new DirectExchange(ORDER_DELAY_EXCHANGE_NAME);
  }

  /**
   * 創(chuàng)建order_delay_process_queue隊(duì)列,也就是訂單實(shí)際消費(fèi)隊(duì)列
   *
   * @return
   */
  @Bean
  public Queue delayProcessOrderQueue() {
    return QueueBuilder.durable(ORDER_DELAY_PROCESS_QUEUE_NAME).build();
  }

  /**
   * 將DLX綁定到實(shí)際消費(fèi)隊(duì)列
   *
   * @param delayProcessOrderQueue
   * @param delayExchange
   * @return
   */
  @Bean
  public Binding dlxOrderBinding(Queue delayProcessOrderQueue, DirectExchange delayOrderExchange) {
    return BindingBuilder.bind(delayProcessOrderQueue).to(delayOrderExchange).with(ORDER_DELAY_PROCESS_QUEUE_NAME);
  }

  /**
   * 監(jiān)聽訂單實(shí)際消費(fèi)者隊(duì)列order_delay_process_queue
   * 
   * @param connectionFactory
   * @param processReceiver
   * @return
   */
  @Bean
  public SimpleMessageListenerContainer orderProcessContainer(ConnectionFactory connectionFactory,
      OrderProcessReceiver processReceiver) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(ORDER_DELAY_PROCESS_QUEUE_NAME); // 監(jiān)聽order_delay_process_queue
    container.setMessageListener(new MessageListenerAdapter(processReceiver));
    return container;
  }
}

消費(fèi)者 OrderProcessReceiver :

package com.tuohang.platform.config;

import java.util.Objects;

import org.apache.tools.ant.types.resources.selectors.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;

/**
 * 訂單延遲處理消費(fèi)者
 * 
 * 
 * @author Administrator
 * @version 1.0
 * @Date 2018年9月18日
 */
@Component
public class OrderProcessReceiver implements ChannelAwareMessageListener {

  private static Logger logger = LoggerFactory.getLogger(OrderProcessReceiver.class);

  String msg = "The failed message will auto retry after a certain delay";

  @Override
  public void onMessage(Message message, Channel channel) throws Exception {
    try {
      processMessage(message);
    } catch (Exception e) {
      // 如果發(fā)生了異常,則將該消息重定向到緩沖隊(duì)列,會在一定延遲之后自動重做
      channel.basicPublish(OrderQueueConfig.ORDER_PRE_EXCHANGE_NAME, OrderQueueConfig.ORDER_PRE_TTL_DELAY_QUEUE_NAME, null,
          msg.getBytes());
    }
  }
  
  /**
   * 處理訂單消息,如果訂單未支付,取消訂單(如果當(dāng)消息內(nèi)容為FAIL_MESSAGE的話,則需要拋出異常)
   *
   * @param message
   * @throws Exception
   */
  public void processMessage(Message message) throws Exception {
    String realMessage = new String(message.getBody());
    logger.info("Received <" + realMessage + ">");
    // 取消訂單
    if(!Objects.equals(realMessage, msg)) {
//      SpringKit.getBean(ITestService.class).resetSexById(Long.valueOf(realMessage));
      System.out.println("測試111111-----------"+new Date());
      System.out.println(message);
    }
  }
}

或者

/**
 * 測試 rabbit 消費(fèi)者
 * 
 * 
 * @author Administrator
 * @version 1.0
 * @Date 2018年9月25日
 */
@Component
@RabbitListener(queues = TestQueueConfig.TEST_DELAY_PROCESS_QUEUE_NAME)
public class TestProcessReceiver {

  private static Logger logger = LoggerFactory.getLogger(TestProcessReceiver.class);

  String msg = "The failed message will auto retry after a certain delay";

  @RabbitHandler
  public void onMessage(Message message, Channel channel) throws Exception {
    try {
      processMessage(message);
      //告訴服務(wù)器收到這條消息 已經(jīng)被我消費(fèi)了 可以在隊(duì)列刪掉;否則消息服務(wù)器以為這條消息沒處理掉 后續(xù)還會在發(fā)
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
      // 如果發(fā)生了異常,則將該消息重定向到緩沖隊(duì)列,會在一定延遲之后自動重做
      channel.basicPublish(TestQueueConfig.TEST_PRE_EXCHANGE_NAME, TestQueueConfig.TEST_PRE_TTL_DELAY_QUEUE_NAME, null,
          msg.getBytes());
    }
  }
  
  /**
   * 處理訂單消息,如果訂單未支付,取消訂單(如果當(dāng)消息內(nèi)容為FAIL_MESSAGE的話,則需要拋出異常)
   *
   * @param message
   * @throws Exception
   */
  public void processMessage(Message message) throws Exception {
    String realMessage = new String(message.getBody());
    logger.info("Received < " + realMessage + " >");
    // 取消訂單
    if(!Objects.equals(realMessage, msg)) {
      System.out.println("測試111111-----------"+new Date());
    }else {
      System.out.println("rabbit else...");
    }
  }
}

生產(chǎn)者

/**
   * 測試rabbitmq
   * 
   * @return
   */
  @RequestMapping(value = "/testrab")
  public String testraa() {
    GenericResult gr = null;
    try {
      String name = "test_pre_ttl_delay_queue";
  long expiration = 10000;//10s 過期時間
      rabbitTemplate.convertAndSend(name,String.valueOf(123456));
 // 在單個消息上設(shè)置過期時間
 //rabbitTemplate.convertAndSend(name,(Object)String.valueOf(123456), new ExpirationMessagePostProcessor(expiration));


    } catch (ServiceException e) {
      e.printStackTrace();
      gr = new GenericResult(StateCode.ERROR, languageMap.get("network_error"), e.getMessage());
    }
    
    return getWrite(gr);
  }

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • 深入學(xué)習(xí)java中的Groovy 和 Scala 類

    深入學(xué)習(xí)java中的Groovy 和 Scala 類

    本文將探討三種下一代 JVM 語言:Groovy、Scala 和 Clojure,比較并對比新的功能和范例,讓 Java 開發(fā)人員對自己近期的未來發(fā)展有大體的認(rèn)識。,需要的朋友可以參考下
    2019-06-06
  • Spring Boot接收單個String入?yún)⒌慕鉀Q方法

    Spring Boot接收單個String入?yún)⒌慕鉀Q方法

    這篇文章主要給大家介紹了關(guān)于Spring Boot接收單個String入?yún)⒌慕鉀Q方法,文中通過示例代碼介紹的非常詳細(xì),對大家學(xué)習(xí)或者使用spring boot具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2018-11-11
  • SpringBoot整合Canal與RabbitMQ監(jiān)聽數(shù)據(jù)變更記錄

    SpringBoot整合Canal與RabbitMQ監(jiān)聽數(shù)據(jù)變更記錄

    這篇文章主要介紹了SpringBoot整合Canal與RabbitMQ監(jiān)聽數(shù)據(jù)變更記錄,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價值,需要的小伙伴可以參考一下
    2022-09-09
  • spring boot @PathVariable傳遞帶反斜杠參數(shù) / 的處理

    spring boot @PathVariable傳遞帶反斜杠參數(shù) / 的處理

    這篇文章主要介紹了spring boot @PathVariable傳遞帶反斜杠參數(shù) / 的處理操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-02-02
  • Java多線程案例之單例模式懶漢+餓漢+枚舉

    Java多線程案例之單例模式懶漢+餓漢+枚舉

    這篇文章主要介紹了Java多線程案例之單例模式懶漢+餓漢+枚舉,文章著重介紹在多線程的背景下簡單的實(shí)現(xiàn)單例模式,需要的小伙伴可以參考一下
    2022-06-06
  • Groovy動態(tài)語言使用教程簡介

    Groovy動態(tài)語言使用教程簡介

    這篇文章主要為大家介紹了Groovy動態(tài)語言使用教程簡介,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-09-09
  • 淺談Java中ArrayList線程不安全怎么辦

    淺談Java中ArrayList線程不安全怎么辦

    本文主要介紹了Java中ArrayList線程不安全怎么辦,主要有三種解決的方法,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-08-08
  • Java實(shí)現(xiàn)多線程文件下載的代碼示例

    Java實(shí)現(xiàn)多線程文件下載的代碼示例

    本篇文章主要介紹了Java實(shí)現(xiàn)多線程下載的代碼示例,Java多線程可以充分利用CPU的資源,具有一定的參考價值,感興趣的小伙伴們可以參考一下。
    2017-02-02
  • 簡單了解java類型轉(zhuǎn)換常見的錯誤

    簡單了解java類型轉(zhuǎn)換常見的錯誤

    這篇文章主要介紹了簡單了解java類型轉(zhuǎn)換常見的錯誤,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-04-04
  • 通過圖例了解PowerDesigner使用方法

    通過圖例了解PowerDesigner使用方法

    這篇文章主要介紹了通過圖例了解PowerDesigner使用方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-08-08

最新評論