欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RabbitMQ消息中間件示例詳解

 更新時(shí)間:2018年12月16日 14:26:15   作者:-Finley-  
這篇文章主要給大家介紹了關(guān)于RabbitMQ消息中間件的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一學(xué)習(xí)學(xué)習(xí)吧

前言

RabbitMQ 是使用 Erlang 語言開發(fā)的消息中間件, 其遵循了高級(jí)消息隊(duì)列協(xié)議(Advanced Message Queuing Protocol, AMQP)。

與 Kafka 等消息隊(duì)列相比,RabbitMQ 最大的優(yōu)勢(shì)在于其較高的可靠性:

  • 提供確認(rèn)(ACK)和重傳機(jī)制保證消息完成消費(fèi), 消費(fèi)者異常不會(huì)導(dǎo)致消息丟失
  • 提供消息持久化機(jī)制, broker 崩潰不會(huì)導(dǎo)致消息丟失
  • 集群模式下工作, 保證高可用

因?yàn)榫哂休^高可靠性和一致性, RabbitMQ 可以勝任訂單處理、秒殺等一致性要求較高的業(yè)務(wù)場(chǎng)景。

RabbitMQ 概念與機(jī)制

RabbitMQ 中的概念模型:

  • Broker: 消息中間件實(shí)例, 可能是單個(gè)節(jié)點(diǎn)也可能是運(yùn)行在多節(jié)點(diǎn)集群上的邏輯實(shí)體
  • 消息(Message): 消息由消息頭和消息體兩部分組成。消息頭中包括routing-key、priority等標(biāo)準(zhǔn)消息頭以及其它自定義消息頭,用于定義RabbitMQ對(duì)消息行為。消息體是字節(jié)流,包含消息內(nèi)容。
  • 連接(Connection): 客戶端與 Broker 之間的 TCP連接
  • 信道(Channel): Channel 是建立在 TCP 連接上的邏輯(虛擬)連接。多個(gè) Channel 復(fù)用同一個(gè) TCP 連接, 以避免建立 TCP 連接的巨大開銷。 RabbitMQ 官方要求每個(gè)線程使用獨(dú)立的 Channel, 禁止多個(gè)線程共用 Channel。
  • 生產(chǎn)者(Publisher): 發(fā)送消息的客戶端線程
  • 消費(fèi)者(Consumer): 處理消息的客戶端線程
  • 交換機(jī)(Exchange): 交換機(jī)負(fù)責(zé)將消息投遞到相應(yīng)的隊(duì)列
  • 隊(duì)列(Queue): 接收并保存交換機(jī)投遞的消息,直至被消費(fèi)者成功消費(fèi)。邏輯結(jié)構(gòu)遵循先進(jìn)先出FIFO。
  • 綁定(Binding): 將隊(duì)列(Queue)注冊(cè)到交換機(jī)(Exchange)的路由表
  • 虛擬主機(jī)(Vhost): 每個(gè)Broker下可建立多個(gè)vhost, 每個(gè) vhost 可建立獨(dú)立的 Exchange、Queue、綁定及權(quán)限系統(tǒng)。同一個(gè) Broker 下的 vhost 共享 Connection、Channel 和 用戶系統(tǒng),就是說可以使用同一個(gè)用戶身份使用同一個(gè) Channel 訪問不同 vhost。

交換機(jī)(Exchange)

生產(chǎn)者發(fā)送的消息會(huì)首先送到交換機(jī)(Exchange), 交換機(jī)根據(jù)自身類型和消息的 routing-key 等信息將消息投遞到綁定的消息隊(duì)列中。

RabbitMQ中的四種標(biāo)準(zhǔn)交換機(jī):

direct: 如果消息的 routing-key 與隊(duì)列的 binding-key 完全相同,direct類型的交換機(jī)則會(huì)將消息投遞到該隊(duì)列中。

  • 多個(gè)隊(duì)列可以使用相同的 binding-key 綁定到同一個(gè) direct 交換機(jī),direct 交換機(jī)會(huì)把消息投遞到所有 binding-key 與消息 routing-key 相同的隊(duì)列

topic: 允許隊(duì)列的 binding-key 中包含通配符*和#, topic 交換機(jī)會(huì)將消息投遞到 binding-key 與 routing-key 匹配的隊(duì)列中。

  • 通配符按照關(guān)鍵字進(jìn)行匹配,如news.cn.a中的關(guān)鍵字是news、cn和a,即關(guān)鍵字按照.分割
  • #通配符匹配0個(gè)或多個(gè)關(guān)鍵字, news.#.a可以匹配news.a, news.cn.a和news.asia.cn.a等
  • *通配符匹配一個(gè)關(guān)鍵字, news.*.a匹配news.cn.a不匹配news.a、news.asia.cn.a

fanout: fanout 交換機(jī)不進(jìn)行任何匹配, 將消息投遞到所有綁定的隊(duì)列

header: header 交換機(jī)根據(jù)消息頭進(jìn)行投遞,現(xiàn)在已較少使用

我們可以使用 RabbitMQ 的插件機(jī)制使用第三方交換機(jī)或自行開發(fā)交換機(jī)。如實(shí)現(xiàn)延時(shí)投遞的delayed-message-exchange。

消息頭中的delivery-mode可以設(shè)置為 persistent(持久化) 或者 transient(易失)。 Exchange 和 Queue 在處理持久化的消息時(shí)都會(huì)先將消息寫入磁盤中再進(jìn)行下一步處理, 即使 RabbitMQ 崩潰也不會(huì)丟失。

消費(fèi)者客戶端通常使用的channel.basicConsume使用推(push)模式投遞消息, 即當(dāng)有新消息時(shí) Broker 通過 channel 主動(dòng)向客戶端發(fā)送消息。客戶端也可以使用channel.basicGet從 Broker 拉取消息。

ACK機(jī)制

RabbitMQ 提供了確認(rèn)送達(dá)(acknowledge)機(jī)制保證消息被正確處理不會(huì)丟失。

確認(rèn)送達(dá)的回執(zhí)有三種:

  • ACK: 消息已被成功處理
  • NACK: 消息處理異常, 需要重新投遞
  • REJECT: 消息非法, 丟棄消息

RabbitMQ 的 Queue 可以設(shè)置 no_ack=true, 則消息被投遞后即刪除不等待回執(zhí)。

channel.basicConsume 可以指定auto_ack模式,若auto_ack=true當(dāng)客戶端收到完整消息后即會(huì)自動(dòng)發(fā)出ACK回執(zhí),否則必須顯式的發(fā)出回執(zhí)。

Java 代碼示例

首先安裝并啟動(dòng)RabbitMQ實(shí)例, Mac用戶可以使用 Homebrew 進(jìn)行安裝:

brew install rabbitmq

啟動(dòng)服務(wù):

brew services start rabbitmq

或者使用官方docker鏡像:

docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3-management

RabbitMQ官網(wǎng)提供了Ubuntu、RPM以及Windows等多種平臺(tái)安裝方式。

RabbitMQ默認(rèn)TCP端口為5672, Web控制臺(tái)默認(rèn)端口15672。

在Maven中添加依賴:

<dependency>
 <groupId>com.rabbitmq</groupId>
 <artifactId>amqp-client</artifactId>
 <version>5.5.1</version>
</dependency>

編寫生產(chǎn)者:

package rabbit;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @author finley
 */
public class RabbitProducer {

 public static void main(String[] args) throws IOException, TimeoutException {
  ConnectionFactory factory = new ConnectionFactory();
  factory.setUsername("guest");
  factory.setPassword("guest");
  factory.setHost("localhost");
  try (Connection conn = factory.newConnection();
    Channel channel = conn.createChannel()) {
   String exchangeName = "test-exchange";
   channel.exchangeDeclare(exchangeName, "direct", true);

   String routingKey = "hello";

   byte[] msg = "hello world".getBytes();
   AMQP.BasicProperties.Builder propsBuilder = new AMQP.BasicProperties.Builder();
   propsBuilder.deliveryMode(2); // persistent
   propsBuilder.priority(0); // normal
   propsBuilder.contentType("text/plain");
   channel.basicPublish(exchangeName, routingKey, propsBuilder.build(), msg);
  }
 }
}

編寫消費(fèi)者:

package rabbit;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.*;

/**
 * @author finley
 */
public class RabbitConsumer {

 public static void main(String[] args) throws IOException, TimeoutException {
  ConnectionFactory factory = new ConnectionFactory();
  factory.setUsername("guest");
  factory.setPassword("guest");
  factory.setHost("localhost");
  try (Connection conn = factory.newConnection();
    Channel channel = conn.createChannel()) {
   String exchangeName = "test-exchange";
   channel.exchangeDeclare(exchangeName, "direct", true);

   String queueName = channel.queueDeclare().getQueue();
   String bindingKey = "hello";
   channel.queueBind(queueName, exchangeName, bindingKey);

   while(true) {
    channel.basicConsume(queueName, false, "", new DefaultConsumer(channel) {
     @Override
     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
      String routingKey = envelope.getRoutingKey();
      String contentType = properties.getContentType();
      String bodyStr = new String(body, "UTF-8");
      System.out.println("routingKey: " + routingKey + ", contentType: " + contentType + ", body: " + bodyStr);
      long deliveryTag = envelope.getDeliveryTag();
      channel.basicAck(deliveryTag, false);
     }
    });
   }
  }
 }

}

RabbitMQ 的消息為字節(jié), 可以將 Java 對(duì)象序列化后作為消息體發(fā)送。

總結(jié)

以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,如果有疑問大家可以留言交流,謝謝大家對(duì)腳本之家的支持。

相關(guān)文章

  • java實(shí)現(xiàn)Socket通信之單線程服務(wù)

    java實(shí)現(xiàn)Socket通信之單線程服務(wù)

    這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)Socket通信的單線程服務(wù),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2018-07-07
  • 利用JavaFX工具構(gòu)建Reactive系統(tǒng)

    利用JavaFX工具構(gòu)建Reactive系統(tǒng)

    這篇文章主要介紹了使用JavaFX構(gòu)建Reactive系統(tǒng),利用JavaFX工具集中的新的超棒特性來構(gòu)建響應(yīng)式的快速應(yīng)用程序,感興趣的小伙伴們可以參考一下
    2016-02-02
  • Java下載文件的4種方式總結(jié)

    Java下載文件的4種方式總結(jié)

    這篇文章主要給大家總結(jié)介紹了關(guān)于Java下載文件的4種方式,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-01-01
  • 一篇文章帶你搞定 springsecurity基于數(shù)據(jù)庫的認(rèn)證(springsecurity整合mybatis)

    一篇文章帶你搞定 springsecurity基于數(shù)據(jù)庫的認(rèn)證(springsecurity整合mybatis)

    這篇文章主要介紹了一篇文章帶你搞定 springsecurity基于數(shù)據(jù)庫的認(rèn)證(springsecurity整合mybatis),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-10-10
  • 如何為Repository添加自定義方法

    如何為Repository添加自定義方法

    這篇文章主要介紹了如何為Repository添加自定義方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-11-11
  • 最新評(píng)論