欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot+RabbitMq具體使用的幾種姿勢(shì)

 更新時(shí)間:2019年05月15日 09:50:14   作者:小橘子2222  
這篇文章主要介紹了SpringBoot+RabbitMq具體使用的幾種姿勢(shì),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

目前主流的消息中間件有activemq,rabbitmq,rocketmq,kafka,我們要根據(jù)實(shí)際的業(yè)務(wù)場(chǎng)景來選擇一款合適的消息中間件,關(guān)注的主要指標(biāo)有,消息投遞的可靠性,可維護(hù)性,吞吐量以及中間件的特色等重要指標(biāo)來選擇,大數(shù)據(jù)領(lǐng)域肯定是kafka,那么傳統(tǒng)的業(yè)務(wù)場(chǎng)景就是解耦,異步,削峰。那么就在剩下的3款產(chǎn)品中選擇一款,從吞吐量,社區(qū)的活躍度,消息的可靠性出發(fā),一般的中小型公司選擇rabbitmq來說可能更為合適。那么我們就來看看如何使用它吧。

環(huán)境準(zhǔn)備

本案例基于springboot集成rabbitmq,本案例主要側(cè)重要實(shí)際的code,對(duì)于基礎(chǔ)理論知識(shí)請(qǐng)自行百度。

jdk-version:1.8

rabbitmq-version:3.7

springboot-version:2.1.4.RELEASE

pom文件

 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

yml配置文件

spring:
 rabbitmq:
 password: guest
 username: guest
 port: 5672
 addresses: 127.0.0.1
 #開啟發(fā)送失敗返回
 publisher-returns: true
 #開啟發(fā)送確認(rèn)
 publisher-confirms: true
 listener:
  simple:
  #指定最小的消費(fèi)者數(shù)量.
  concurrency: 2
  #指定最大的消費(fèi)者數(shù)量.
  max-concurrency: 2
  #開啟ack
  acknowledge-mode: auto
  #開啟ack
  direct:
  acknowledge-mode: auto
 #支持消息的確認(rèn)與返回
 template:
  mandatory: true

配置rabbitMq的姿勢(shì)

姿勢(shì)一

基于javaconfig

package com.lly.order.message;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @ClassName RabbitMqConfig
 * @Description rabbitMq配置類
 * @Author lly
 * @Date 2019-05-13 15:05
 * @Version 1.0
 **/
@Configuration
public class RabbitMqConfig {

 public final static String DIRECT_QUEUE = "directQueue";
 public final static String TOPIC_QUEUE_ONE = "topic_queue_one";
 public final static String TOPIC_QUEUE_TWO = "topic_queue_two";
 public final static String FANOUT_QUEUE_ONE = "fanout_queue_one";
 public final static String FANOUT_QUEUE_TWO = "fanout_queue_two";

 public final static String TOPIC_EXCHANGE = "topic_exchange";
 public final static String FANOUT_EXCHANGE = "fanout_exchange";

 public final static String TOPIC_ROUTINGKEY_ONE = "common_key";
 public final static String TOPIC_ROUTINGKEY_TWO = "*.key";

// direct模式隊(duì)列
 @Bean
 public Queue directQueue() {
  return new Queue(DIRECT_QUEUE, true);
 }
// topic 訂閱者模式隊(duì)列
 @Bean
 public Queue topicQueueOne() {
  return new Queue(TOPIC_QUEUE_ONE, true);
 }
 @Bean
 public Queue topicQueueTwo() {
  return new Queue(TOPIC_QUEUE_TWO, true);
 }
// fanout 廣播者模式隊(duì)列
 @Bean
 public Queue fanoutQueueOne() {
  return new Queue(FANOUT_QUEUE_ONE, true);
 }
 @Bean
 public Queue fanoutQueueTwo() {
  return new Queue(FANOUT_QUEUE_TWO, true);
 }
// topic 交換器
 @Bean
 public TopicExchange topExchange() {
  return new TopicExchange(TOPIC_EXCHANGE);
 }
// fanout 交換器
 @Bean
 public FanoutExchange fanoutExchange() {
  return new FanoutExchange(FANOUT_EXCHANGE);
 }

// 訂閱者模式綁定
 @Bean
 public Binding topExchangeBingingOne() {
  return BindingBuilder.bind(topicQueueOne()).to(topExchange()).with(TOPIC_ROUTINGKEY_ONE);
 }

 @Bean
 public Binding topicExchangeBingingTwo() {
  return BindingBuilder.bind(topicQueueTwo()).to(topExchange()).with(TOPIC_ROUTINGKEY_TWO);
 }
// 廣播模式綁定
 @Bean
 public Binding fanoutExchangeBingingOne() {
  return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange());
 }
 @Bean
 public Binding fanoutExchangeBingingTwo() {
  return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange());
 }
}

姿勢(shì)二

基于注解

package com.lly.order.message;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.time.LocalTime;
import java.util.UUID;


/**
 * @ClassName MQTest
 * @Description 消息隊(duì)列測(cè)試
 * @Author lly
 * @Date 2019-05-13 10:50
 * @Version 1.0
 **/
@Component
@Slf4j
public class MQTest implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

 private final static String QUEUE = "test_queue";

 @Autowired
 private AmqpTemplate amqpTemplate;

 @Autowired
 private RabbitTemplate rabbitTemplate;

 public MQTest(RabbitTemplate rabbitTemplate) {
  rabbitTemplate.setConfirmCallback(this);
  rabbitTemplate.setReturnCallback(this);
 }

 public void sendMq() {
  rabbitTemplate.convertAndSend("test_queue", "test_queue" + LocalTime.now());
  log.info("發(fā)送消息:{}", "test_queue" + LocalTime.now());
 }


 public void sendMqRabbit() {
  //回調(diào)id
  CorrelationData cId = new CorrelationData(UUID.randomUUID().toString());
//  rabbitTemplate.convertAndSend(RabbitMqConfig.FANOUT_EXCHANGE, "", "廣播者模式測(cè)試",cId);
  Object object = rabbitTemplate.convertSendAndReceive(RabbitMqConfig.FANOUT_EXCHANGE, "", "廣播者模式測(cè)試", cId);
  log.info("發(fā)送消息:{},object:{}", "廣播者模式測(cè)試" + LocalTime.now(), object);
 }

 //發(fā)送訂閱者模式
 public void sendMqExchange() {
  CorrelationData cId = new CorrelationData(UUID.randomUUID().toString());
  CorrelationData cId01 = new CorrelationData(UUID.randomUUID().toString());
  log.info("訂閱者模式->發(fā)送消息:routing_key_one");
  rabbitTemplate.convertSendAndReceive("topic_exchange", "routing_key_one", "routing_key_one" + LocalTime.now(), cId);
  log.info("訂閱者模式->發(fā)送消息routing_key_two");
  rabbitTemplate.convertSendAndReceive("topic_exchange", "routing_key_two", "routing_key_two" + LocalTime.now(), cId01);
 }
 //如果不存在,自動(dòng)創(chuàng)建隊(duì)列
 @RabbitListener(queuesToDeclare = @Queue("test_queue"))
 public void receiverMq(String msg) {
  log.info("接收到隊(duì)列消息:{}", msg);
 }
  //如果不存在,自動(dòng)創(chuàng)建隊(duì)列和交換器并且綁定
 @RabbitListener(bindings = {
   @QueueBinding(value = @Queue(value = "topic_queue01", durable = "true"),
     exchange = @Exchange(value = "topic_exchange", type = ExchangeTypes.TOPIC),
     key = "routing_key_one")})
 public void receiverMqExchage(String msg, Channel channel, Message message) throws IOException {

  long deliveryTag = message.getMessageProperties().getDeliveryTag();

  try {
   log.info("接收到topic_routing_key_one消息:{}", msg);
   //發(fā)生異常
   log.error("發(fā)生異常");
   int i = 1 / 0;
   //告訴服務(wù)器收到這條消息 已經(jīng)被我消費(fèi)了 可以在隊(duì)列刪掉 這樣以后就不會(huì)再發(fā)了 否則消息服務(wù)器以為這條消息沒處理掉 后續(xù)還會(huì)在發(fā)
   channel.basicAck(deliveryTag, false);
  } catch (Exception e) {
   log.error("接收消息失敗,重新放回隊(duì)列");
   //requeu,為true,代表重新放入隊(duì)列多次失敗重新放回會(huì)導(dǎo)致隊(duì)列堵塞或死循環(huán)問題,
   // 解決方案,剔除此消息,然后記錄到db中去補(bǔ)償
   //channel.basicNack(deliveryTag, false, true);
   //拒絕消息
   //channel.basicReject(deliveryTag, true);
  }
 }

 @RabbitListener(bindings = {
   @QueueBinding(value = @Queue(value = "topic_queue02", durable = "true"),
     exchange = @Exchange(value = "topic_exchange", type = ExchangeTypes.TOPIC),
     key = "routing_key_two")})
 public void receiverMqExchageTwo(String msg) {
  log.info("接收到topic_routing_key_two消息:{}", msg);
 }


 @RabbitListener(queues = RabbitMqConfig.FANOUT_QUEUE_ONE)
 public void receiverMqFanout(String msg, Channel channel, Message message) throws IOException {
  long deliveryTag = message.getMessageProperties().getDeliveryTag();
  try {
   log.info("接收到隊(duì)列fanout_queue_one消息:{}", msg);
   channel.basicAck(deliveryTag, false);
  } catch (Exception e) {
   e.printStackTrace();
   //多次失敗重新放回會(huì)導(dǎo)致隊(duì)列堵塞或死循環(huán)問題 丟棄這條消息
//   channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
   log.error("接收消息失敗");
  }
 }

 @RabbitListener(queues = RabbitMqConfig.FANOUT_QUEUE_TWO)
 public void receiverMqFanoutTwo(String msg) {
  log.info("接收到隊(duì)列fanout_queue_two消息:{}", msg);
 }

 /**
  * @return
  * @Author lly
  * @Description 確認(rèn)消息是否發(fā)送到exchange
  * @Date 2019-05-14 15:36
  * @Param [correlationData, ack, cause]
  **/
 @Override
 public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  log.info("消息唯一標(biāo)識(shí)id:{}", correlationData);
  log.info("消息確認(rèn)結(jié)果!");
  log.error("消息失敗原因,cause:{}", cause);
 }
 /**
  * @return
  * @Author lly
  * @Description 消息消費(fèi)發(fā)生異常時(shí)返回
  * @Date 2019-05-14 16:22
  * @Param [message, replyCode, replyText, exchange, routingKey]
  **/
 @Override
 public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  log.info("消息發(fā)送失敗id:{}", message.getMessageProperties().getCorrelationId());
  log.info("消息主體 message : ", message);
  log.info("消息主體 message : ", replyCode);
  log.info("描述:" + replyText);
  log.info("消息使用的交換器 exchange : ", exchange);
  log.info("消息使用的路由鍵 routing : ", routingKey);
 }
}

rabbitMq消息確認(rèn)的三種方式

# 發(fā)送消息后直接確認(rèn)消息
acknowledge-mode:none
# 根據(jù)消息消費(fèi)的情況,智能判定消息的確認(rèn)情況
acknowledge-mode:auto
# 手動(dòng)確認(rèn)消息的情況
acknowledge-mode:manual

我們以topic模式來試驗(yàn)下消息的ack

自動(dòng)確認(rèn)消息模式

手動(dòng)確認(rèn)消息模式

然后我們?cè)俅蜗M(fèi)消息,發(fā)現(xiàn)消息是沒有被確認(rèn)的,所以可以被再次消費(fèi)

發(fā)現(xiàn)同樣的消息還是存在的沒有被隊(duì)列刪除,必須手動(dòng)去ack,我們修改隊(duì)列1的手動(dòng)ack看看效果

channel.basicAck(deliveryTag, false);

重啟項(xiàng)目再次消費(fèi)消息

再次查看隊(duì)列里的消息,發(fā)現(xiàn)隊(duì)列01里的消息被刪除了,隊(duì)列02的還是存在。

消費(fèi)消息發(fā)生異常的情況,修改代碼 模擬發(fā)生異常的情況下發(fā)生了什么, 異常發(fā)生了,消息被重放進(jìn)了隊(duì)列

但是會(huì)導(dǎo)致消息不停的循環(huán)消費(fèi),然后失敗,致死循環(huán)調(diào)用大量服務(wù)器資源

所以我們正確的處理方式是,發(fā)生異常,將消息記錄到db,再通過補(bǔ)償機(jī)制來補(bǔ)償消息,或者記錄消息的重復(fù)次數(shù),進(jìn)行重試,超過幾次后再放到db中。

總結(jié)

通過實(shí)際的code我們了解的rabbitmq在項(xiàng)目的具體的整合情況,消息ack的幾種情況,方便在實(shí)際的場(chǎng)景中選擇合適的方案來使用。如有不足,還望不吝賜教。希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • 深入理解hibernate的三種狀態(tài)

    深入理解hibernate的三種狀態(tài)

    本篇文章主要介紹了深入理解hibernate的三種狀態(tài) ,主要包括了transient(瞬時(shí)狀態(tài)),persistent(持久化狀態(tài))以及detached(離線狀態(tài)),有興趣的同學(xué)可以了解一下
    2017-05-05
  • 前端和后端時(shí)間不一致問題解決的實(shí)踐指南

    前端和后端時(shí)間不一致問題解決的實(shí)踐指南

    這篇文章主要給大家介紹了關(guān)于前端和后端時(shí)間不一致問題解決的實(shí)踐指南,在SpringBoot項(xiàng)目中,可以通過設(shè)置application.yml文件中的屬性來統(tǒng)一時(shí)間格式和時(shí)區(qū),從而確保序列化和反序列化過程中的時(shí)間和時(shí)區(qū)一致性,需要的朋友可以參考下
    2025-01-01
  • Java實(shí)現(xiàn)圖形界面計(jì)算器

    Java實(shí)現(xiàn)圖形界面計(jì)算器

    這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)圖形界面計(jì)算器,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-11-11
  • SpringBoot實(shí)現(xiàn)單元測(cè)試示例詳解

    SpringBoot實(shí)現(xiàn)單元測(cè)試示例詳解

    單元測(cè)試(unit testing),是指對(duì)軟件中的最小可測(cè)試單元進(jìn)行檢查和驗(yàn)證。這篇文章主要為大家介紹了C語言實(shí)現(xiàn)單元測(cè)試的方法,需要的可以參考一下
    2022-11-11
  • 分享Java死鎖的4種排查工具

    分享Java死鎖的4種排查工具

    這篇文章主要介紹了分享Java死鎖的4種排查工具,死鎖指的是兩個(gè)或兩個(gè)以上的運(yùn)算單元,都在等待對(duì)方停止執(zhí)行,以取得系統(tǒng)資源,但是沒有一方提前退出,就稱為死鎖,下文更多相關(guān)內(nèi)容需要的小伙伴可以參考一下
    2022-05-05
  • SpringBoot之使用Feign實(shí)現(xiàn)微服務(wù)間的交互

    SpringBoot之使用Feign實(shí)現(xiàn)微服務(wù)間的交互

    這篇文章主要介紹了SpringBoot中使用Feign實(shí)現(xiàn)微服務(wù)間的交互,對(duì)微服務(wù)這方面感興趣的小伙伴可以參考閱讀本文
    2023-03-03
  • Java父類繼承中的static和final用法

    Java父類繼承中的static和final用法

    這篇文章主要介紹了Java父類繼承中的static和final用法說明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2020-10-10
  • 淺談Java中的LinkedHashSet哈希鏈表

    淺談Java中的LinkedHashSet哈希鏈表

    這篇文章主要介紹了淺談Java中的LinkedHashSet哈希鏈表,LinkedHashSet 是 Java 中的一個(gè)集合類,它是 HashSet 的子類,并實(shí)現(xiàn)了 Set 接口,與 HashSet 不同的是,LinkedHashSet 保留了元素插入的順序,并且具有 HashSet 的快速查找特性,需要的朋友可以參考下
    2023-09-09
  • Java如何在List或Map遍歷過程中刪除元素

    Java如何在List或Map遍歷過程中刪除元素

    相信大家在日常的開發(fā)過程中,經(jīng)常需要對(duì)List或Map里面的符合某種業(yè)務(wù)的數(shù)據(jù)進(jìn)行刪除,但是如果不了解里面的機(jī)制就容易掉入“陷阱”導(dǎo)致遺漏或者程序異常。下面這篇文章將會(huì)給大家詳細(xì)介紹Java如何在List和Map遍歷過程中刪除元素,有需要的朋友們可以參考借鑒。
    2016-12-12
  • 用java的spring實(shí)現(xiàn)一個(gè)簡(jiǎn)單的IOC容器示例代碼

    用java的spring實(shí)現(xiàn)一個(gè)簡(jiǎn)單的IOC容器示例代碼

    本篇文章主要介紹了用java實(shí)現(xiàn)一個(gè)簡(jiǎn)單的IOC容器示例代碼,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2017-03-03

最新評(píng)論