欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot整合RabbitMQ, 實現(xiàn)生產(chǎn)者與消費者的功能

 更新時間:2021年03月13日 09:23:05   作者:skypyb  
這篇文章主要介紹了SpringBoot整合RabbitMQ, 實現(xiàn)生產(chǎn)者與消費者的功能,幫助大家更好得理解和學習使用SpringBoot框架,感興趣的朋友可以了解下

自然,依賴是少不了的。除了spring-boot-starter-web依賴外。
就這個是最主要的依賴了,其他的看著辦就是了。我用的是gradle,用maven的看著弄也一樣的。無非就是包+包名+版本

//AMQP
compile('org.springframework.boot:spring-boot-starter-amqp:2.0.4.RELEASE')

這里有一個坑。導致我后來發(fā)送消息時一直連不上去。報錯: java.net.SocketException: socket closed。
我去網(wǎng)上尋找了許多方案。大致都是一個意思。沒有設置遠程連接權(quán)限。讓我添加一個用戶,并且設置最大權(quán)限。
 
下面是添加rabbitmq用戶的命令

#rabbitmqctl add_user 賬號 密碼
rabbitmqctl add_user admin 614
#分配用戶標簽(admin為要賦予administrator權(quán)限的剛創(chuàng)建的那個賬號的名字)
rabbitmqctl set_user_tags admin administrator
#設置權(quán)限,開啟遠程訪問
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

我用完之后去管控臺(http://ip:15672)看了一下用戶列表。確實已經(jīng)添加上去了,也是最大權(quán)限。
然鵝并沒有什么卵用
后來強行摸索出來了,原來是版本差異的原因。我SpringBoot本來是使用的是2.0.3版本,然后AMQP我使用的是2.0.4。可能有什么不兼容的地方。
把Springboot和AMQP的版本給同步成一個就好了。別的版本差一點根本沒啥問題,就AMQP特殊,也是醉了。
 
 
使用SpriongBoot的yml配置:重點是rabbitmq那一欄
設置好登錄用戶、密碼、地址端口、虛擬地址、超時時間就可以了

server:
 port: 8080
 servlet:
 context-path: /
spring:
 http:
 encoding:
  charset: UTF-8
 jackson:
  #前端頁面?zhèn)鱀ate值時格式化
  date-format: yyyy-MM-dd HH:mm:ss
  time-zone: GMT+8
 datasource:
  driver-class-name: com.mysql.cj.jdbc.Driver
  url: jdbc:mysql://192.168.194.128:3306/mysql?serverTimezone=Asia/Shanghai
  username: root
  password: 614
 rabbitmq:
 port: 5672
 host: 192.168.194.128
 username: admin
 password: 614
 virtual-host: /
 connection-timeout: 15s
#Redis配置
 redis:
 host: 192.168.194.128
 port: 6379
 #Redis連接池配置
 jedis:
  pool:
  min-idle: 0
  max-idle: 8
  max-active: 8
  max-wait: -1ms

這里又有個小坑,這個rabbitmq的超時時間(connection-timeout)配的我真的是醉了,我看的教程里寫的是15000,表示15秒,我一輸之后IDEA直接報紅線啊。
網(wǎng)上一找,全特么用毫秒值配的,行吧,應該我們用的不是一個版本。
點開看下這參數(shù)接受一個java.time.Duration對象,百思不得其解。這玩意咋配?我不會啊。找了二十分鐘的攻略才知道是這樣子配的,使用數(shù)字+時間標志。比如1h、1M、1m、1d、1s、1ms這種格式就行了。


咳咳,配置文件弄好后也就差不多可以使用rabbitmq發(fā)消息了。
生產(chǎn)端發(fā)消息。只需要使用 RabbitTemplate 類就夠了,看到這個名字,有沒有一種很熟悉的感覺?
Redis也有個這玩意 叫 RedisTemplate
 
關(guān)于發(fā)消息,在這兒最好還是先指定好exchange和routingKey,即交換機和路由鍵。
這樣發(fā)過去的消息才能被發(fā)到指定的交換機上,然后交換機在通過你的routingKey來發(fā)送給綁定了該routingKey的所有隊列。
所以首先登陸管控臺(http://ip:15672),到Exchanges和Queues菜單下,創(chuàng)建好交換機和隊列,還有他們之間的routingKey。這個步驟我就不詳細描述了。單靠語言不怎么能夠描述清楚。估計得配很多圖,有需要的自行g(shù)oogle把。
 
萬事俱備。正式開始發(fā)送消息。
先準備一個要發(fā)的玩意。根據(jù)業(yè)務需求自己創(chuàng)個model就行。我這隨便寫一個。
關(guān)于這個messageId,及消息唯一ID。他的作用是將該條消息數(shù)據(jù)和RabbitMQ發(fā)送的消息綁定起來。不要也不是不行。只是最好還是設置一個這個參數(shù)。

package com.skypyb.rabbitmq.entity;
import java.io.Serializable;
public class User1 implements Serializable{
 private Long id;
 private String name;
 private String messageId;//儲存消息發(fā)送的唯一標識
 public User1() {
 }
 public User1(Long id, String name, String messageId) {
  this.id = id;
  this.name = name;
  this.messageId = messageId;
 }
 public Long getId() {
  return id;
 }
 public void setId(Long id) {
  this.id = id;
 }
 public String getName() {
  return name;
 }
 public void setName(String name) {
  this.name = name;
 }
 public String getMessageId() {
  return messageId;
 }
 public void setMessageId(String messageId) {
  this.messageId = messageId;
 }
}

要發(fā)送的數(shù)據(jù)模型已經(jīng)準備好,接下來這個類是一個重點。即發(fā)送消息的類。
注入RabbbitTemplate,然后就可以通過他的 convertSendAndReceive() 方法進行消息的發(fā)送。
他有很多種重載,最好是選用我這種,比較可控。交換機、路由鍵、消息唯一ID全部指定好。

package com.skypyb.rabbitmq.producer;
import com.skypyb.rabbitmq.entity.User1;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component("user1Sender")
public class User1Sender {
  @Autowired
  private RabbitTemplate rabbitTemplate;//操作rabbitmq的模板
  public void send(User1 user1){
    CorrelationData correlationData= new CorrelationData();
    correlationData.setId(user1.getMessageId());
    rabbitTemplate.convertSendAndReceive(
        "user1-exchange",//exchange
        "user1.key1",//routingKey
        user1,//消息體內(nèi)容
        correlationData//消息唯一ID
    );
  }
}

emmmm,是不是感覺還是挺簡單的。一個方法調(diào)用,消息就過去了。就發(fā)送到指定的交換機了。交換機再通過你的routingKey轉(zhuǎn)發(fā)給綁定在上邊的隊列。生產(chǎn)端這邊就完事了。
 
寫個測試類測試一下。

package com.skypyb.test;
import com.skypyb.rabbitmq.Application;
import com.skypyb.rabbitmq.entity.User1;
import com.skypyb.rabbitmq.producer.User1Sender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.util.UUID;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class TestOne {
  @Autowired
  private User1Sender user1Sender;
  @Test
  public void testSend1(){
    User1 user1 = new User1();
    user1.setId(1L);
    user1.setName("測試用戶1");
    user1.setMessageId("user1$"+System.currentTimeMillis()+"$"+ UUID.randomUUID().toString());
    user1Sender.send(user1);
  }
}

運行完畢后。登陸管控臺(http://ip:15672),進入Queues菜單。即可發(fā)現(xiàn)消息隊列中已接收到一條消息,會是一個等待消費的狀態(tài)。
至于到底是哪個消息隊列來處理嘛,那就得看你的exchange通過你的routingKey具體把消息轉(zhuǎn)發(fā)到哪兒了。這個都是在管控臺里邊配置的。
 
生產(chǎn)端準備完畢。接下來是消費端。消費端也很簡單,yml需要添加消費端的配置。簽收模式最好選擇手動簽收??煽?。

server:
 port: 8081
 servlet:
  context-path: /
spring:
 http:
  encoding:
   charset: UTF-8
 jackson:
   #前端頁面?zhèn)鱀ate值時格式化
   date-format: yyyy-MM-dd HH:mm:ss
   time-zone: GMT+8
 datasource:
   driver-class-name: com.mysql.cj.jdbc.Driver
   url: jdbc:mysql://192.168.194.128:3306/mysql?serverTimezone=Asia/Shanghai
   username: root
   password: 614
 #rabbitmq基本配置
 rabbitmq:
  addresses: 192.168.194.128:5672
  username: admin
  password: 614
  virtual-host: /
  connection-timeout: 15s
 #rabbitmq消費端配置
  listener:
   simple:
    #并發(fā)數(shù)
    concurrency: 5
    #最大并發(fā)數(shù)
    max-concurrency: 10
    #簽收模式:手工簽收、自動簽收
    acknowledge-mode: manual
    #限流,在此消費端同一時間只有一條消息消費
    prefetch: 1
#Redis配置
 redis:
  host: 192.168.194.128
  port: 6379
  #Redis連接池配置
  jedis:
   pool:
    min-idle: 0
    max-idle: 8
    max-active: 8
    max-wait: -1ms

具體的消費者,具體解釋都寫在注釋中了。
 
關(guān)于@Exchange注解中設置的交換機的type屬性,主要是用這些值:

  • fanout:會把所有發(fā)到Exchange的消息路由到所有和它綁定的Queue
  • direct:會把消息路由到routing key和binding key完全相同的Queue,不相同的丟棄
  • topic:direct是嚴格匹配,那么topic就算模糊匹配,routing key和binding key都用.來區(qū)分單詞串,比如A.B.C,*匹配任意單詞,#匹配任意多個或0個單詞,比如。A.B.*可以匹配到A.B.C
  • headers:不依賴routing key和binding key,通過對比消息屬性中的headers屬性,對比Exchange和Queue綁定時指定的鍵值對,相同就路由過來

basicAck()方法可以確認消息消費。執(zhí)行后,消息隊列中這條消息就沒了。multiple參數(shù)表示是否批量消費,一般都選false。

package com.skypyb.rabbitmq.controller;
import com.rabbitmq.client.Channel;
import com.skypyb.rabbitmq.entity.User1;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
@Component
public class User1Receiver {
  /**
   * @param user1  消息體,使用 @Payload 注解
   * @param headers 消息頭,使用 @Headers 注解
   * @param channel
   */
  /*@RabbitListener表示監(jiān)聽的具體隊列.
    bindings屬性代表綁定。里邊有幾個值填寫,填寫好綁定的隊列名字和交換機名字
    指定好routingKey。若指定的這些參數(shù)不存在的話。則會自行給你創(chuàng)建好
    durable代表是否持久化
  */
  @RabbitListener(bindings = @QueueBinding(
      value = @Queue(value = "user1-queue", durable = "true"),
      exchange = @Exchange(name = "user1-exchange", durable = "true", type = "topic"),
      key = "user1.#"
  )
  )
  @RabbitHandler//標識這個方法用于消費消息
  public void onUser1Message(@Payload User1 user1,
                @Headers Map<String, Object> headers,
                Channel channel) throws IOException {
    //消費者操作
    System.out.println("-------收到消息辣!-----");
    System.out.println("發(fā)過來的用戶名為:" + user1.getName());
    //basicAck()表示確認已經(jīng)消費消息。通知一下mq,需要先得到 delivery tag
    //delivery tag可以從消息頭里邊get出來
    Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
    channel.basicAck(deliveryTag, false);
  }
}

把消費端的服務打開后,就已經(jīng)在監(jiān)聽了。若監(jiān)聽的隊列中已有消息,則會立即處理。直到隊列中沒消息為止。
若隊列為空,他就不會動,這個時候我啟動一下生產(chǎn)者那邊的測試,消息一發(fā)出去,立馬就被消費。非常完美。就是這個效果。
 
呼,偶爾也不想咸魚了啊,今天一天大概把RabbitMQ搞明白一些了,配置也會配了,消息也會發(fā)了。踩了一萬個坑,有不少是那種比較SB的采坑方式,一般人應該踩不到,我就不打出來了。
還是感覺有很多收獲的。就是累成麻瓜了。

以上就是SpringBoot整合RabbitMQ, 實現(xiàn)生產(chǎn)者與消費者的功能的詳細內(nèi)容,更多關(guān)于SpringBoot整合RabbitMQ, 實現(xiàn)生產(chǎn)者與消費者的功能的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Java基于循環(huán)遞歸回溯實現(xiàn)八皇后問題算法示例

    Java基于循環(huán)遞歸回溯實現(xiàn)八皇后問題算法示例

    這篇文章主要介紹了Java基于循環(huán)遞歸回溯實現(xiàn)八皇后問題算法,結(jié)合具體實例形式分析了java的遍歷、遞歸、回溯等算法實現(xiàn)八皇后問題的具體步驟與相關(guān)操作技巧,需要的朋友可以參考下
    2017-06-06
  • spring cloud consul注冊的服務報錯critical的解決

    spring cloud consul注冊的服務報錯critical的解決

    這篇文章主要介紹了spring cloud consul注冊的服務報錯critical的解決,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2019-03-03
  • SpringBoot中處理的轉(zhuǎn)發(fā)與重定向方式

    SpringBoot中處理的轉(zhuǎn)發(fā)與重定向方式

    這篇文章主要介紹了SpringBoot中處理的轉(zhuǎn)發(fā)與重定向方式,分別就轉(zhuǎn)發(fā)和重定向做了概念解說,結(jié)合示例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2022-11-11
  • 如何讓Jackson JSON生成的數(shù)據(jù)包含的中文以unicode方式編碼

    如何讓Jackson JSON生成的數(shù)據(jù)包含的中文以unicode方式編碼

    這篇文章主要介紹了如何讓Jackson JSON生成的數(shù)據(jù)包含的中文以unicode方式編碼。需要的朋友可以過來參考下,希望對大家有所幫助
    2013-12-12
  • Java Springboot自動裝配原理詳解

    Java Springboot自動裝配原理詳解

    這篇文章主要介紹了詳解SpringBoot自動配置原理,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2021-10-10
  • 總結(jié)Java對象被序列化的兩種方法

    總結(jié)Java對象被序列化的兩種方法

    今天給大家?guī)淼氖顷P(guān)于Java的相關(guān)知識,文章圍繞著Java對象被序列化的兩種方法展開,文中有非常詳細的介紹及代碼示例,需要的朋友可以參考下
    2021-06-06
  • mybatis通過XML的方式拼接動態(tài)sql

    mybatis通過XML的方式拼接動態(tài)sql

    動態(tài)SQL是一種在運行時構(gòu)造和執(zhí)行SQL語句的技術(shù),這篇文章主要為大家介紹了mybatis如何通過XML的方式拼接動態(tài)sql,有需要的小伙伴可以參考一下
    2024-12-12
  • JVM垃圾回收原理解析

    JVM垃圾回收原理解析

    這篇文章主要介紹了JVM垃圾回收原理解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2019-08-08
  • Java利用redis zset實現(xiàn)延時任務詳解

    Java利用redis zset實現(xiàn)延時任務詳解

    zset作為redis的有序集合數(shù)據(jù)結(jié)構(gòu)存在,排序的依據(jù)就是score。本文就將利用zset score這個排序的這個特性,來實現(xiàn)延時任務,感興趣的可以了解一下
    2022-08-08
  • Java switch多值匹配操作詳解

    Java switch多值匹配操作詳解

    這篇文章主要介紹了Java switch多值匹配操作詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-01-01

最新評論