欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java中RabbitMQ消息隊(duì)列的交換機(jī)詳解

 更新時(shí)間:2023年07月31日 10:15:09   作者:迷鹿小女子  
這篇文章主要介紹了Java中的RabbitMQ交換機(jī)詳解,消息隊(duì)列是指利用高效可靠的消息傳遞機(jī)制進(jìn)行與平臺(tái)無(wú)關(guān)的數(shù)據(jù)交流,并基于數(shù)據(jù)通信來(lái)進(jìn)行分布式系統(tǒng)的集成,是在消息的傳輸過(guò)程中保存消息的容器,需要的朋友可以參考下

RabbitMQ交換機(jī)

在這里插入圖片描述

交換機(jī)屬性

  • Name:交換機(jī)名稱(chēng)
  • Type:交換機(jī)類(lèi)型 direct、topic、fanout、headers
  • Durability:是否需要持久化,true為持久化
  • Auto Delete:當(dāng)最后一個(gè)綁定到Exchange上的隊(duì)列刪除后,自動(dòng)刪除該Exchange
  • Internal:當(dāng)前Exchange是否用于RabbitMQ內(nèi)部使用,默認(rèn)為False
  • Arguments:擴(kuò)展參數(shù),用于擴(kuò)展AMQP協(xié)議,定制化使用

直流交換機(jī)

直連交換機(jī)Direct Exchange(完全匹配路由key)

所有發(fā)送到Direct Exchange的消息會(huì)被轉(zhuǎn)發(fā)到RouteKey中指定的Queue

注意:Direct模式可以使用RabbitMQ自帶的Exchange:default Exchange,所以不需要將Exchange進(jìn)行任何綁定(binding)操作,消息傳遞時(shí),RouteKey必須完全匹配才會(huì)被隊(duì)列接收,否則該消息會(huì)被拋棄;

在這里插入圖片描述

消費(fèi)端代碼

package com.xieminglu.rabbitmqapi.exchange.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Consumer4DirectExchange {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;
        connectionFactory.setHost("192.168.248.134");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //4 聲明
        String exchangeName = "test_direct_exchange";
        String exchangeType = "direct";
        String queueName = "test_direct_queue";
        String routingKey = "test.direct";
        //表示聲明了一個(gè)交換機(jī)
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        //表示聲明了一個(gè)隊(duì)列
        channel.queueDeclare(queueName, false, false, false, null);
        //建立一個(gè)綁定關(guān)系:
        channel.queueBind(queueName, exchangeName, routingKey);
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //參數(shù):隊(duì)列名稱(chēng)、是否自動(dòng)ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
        //循環(huán)獲取消息
        while(true){
            //獲取消息,如果沒(méi)有消息,這一步將會(huì)一直阻塞
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息:" + msg);
        }
    }
}

生產(chǎn)端代碼

package com.xieminglu.rabbitmqapi.exchange.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * @author 小李飛刀
 * @site www.javaxl.com
 * @company
 * @create  2019-11-18 10:22
 */
public class Producer4DirectExchange {
    public static void main(String[] args) throws Exception {
        //1 創(chuàng)建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.248.134");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //2 創(chuàng)建Connection
        Connection connection = connectionFactory.newConnection();
        //3 創(chuàng)建Channel
        Channel channel = connection.createChannel();
        //4 聲明
        String exchangeName = "test_direct_exchange";
        String routingKey = "test.direct";
//        String routingKey = "test.direct111"; //收不到
        //5 發(fā)送
        String msg = "Hello World RabbitMQ 4  Direct Exchange Message 111 ... ";
        channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());
    }
}

代碼的區(qū)別: 一條消息只會(huì)發(fā)送在一個(gè)隊(duì)列里

在這里插入圖片描述

創(chuàng)建一個(gè)交換機(jī)與隊(duì)列

在這里插入圖片描述

所綁定的交換機(jī)

在這里插入圖片描述

控制臺(tái)輸出

在這里插入圖片描述

主題交換機(jī)

主題交換機(jī)Topic Exchange(匹配路由規(guī)則的交換機(jī))

所有發(fā)送到Topic Exchange的消息被轉(zhuǎn)發(fā)到所有關(guān)系RouteKey中指定Topic的Queue上;

Exchange將RouteKey和某Topic進(jìn)行模糊匹配,此時(shí)隊(duì)列需要綁定一個(gè)Topic;

注意:可以使用通配符進(jìn)行模糊匹配

  • 符號(hào):“#” 匹配一個(gè)或者多個(gè)詞
  • 符號(hào):“” 匹配不多不少一個(gè)詞

列如:

  • “log.#” 能夠匹配到 “log.info.oa”
  • “log.” 能夠匹配到 “log.err”

在這里插入圖片描述

消費(fèi)端代碼

package com.xieminglu.rabbitmqapi.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Consumer4TopicExchange {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;
        connectionFactory.setHost("192.168.248.134");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //4 聲明
        String exchangeName = "test_topic_exchange";
        String exchangeType = "topic";
        String queueName = "test_topic_queue";
        String routingKey = "user.#";
//        String routingKey = "user.*";
        // 1 聲明交換機(jī)
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        // 2 聲明隊(duì)列
        channel.queueDeclare(queueName, false, false, false, null);
        // 3 建立交換機(jī)和隊(duì)列的綁定關(guān)系:
        channel.queueBind(queueName, exchangeName, routingKey);
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //參數(shù):隊(duì)列名稱(chēng)、是否自動(dòng)ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
        //循環(huán)獲取消息
        while(true){
            //獲取消息,如果沒(méi)有消息,這一步將會(huì)一直阻塞
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息:" + msg);
        }
    }
}

生產(chǎn)端代碼

package com.xieminglu.rabbitmqapi.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer4TopicExchange {
    public static void main(String[] args) throws Exception {
        //1 創(chuàng)建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.248.134");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //2 創(chuàng)建Connection
        Connection connection = connectionFactory.newConnection();
        //3 創(chuàng)建Channel
        Channel channel = connection.createChannel();
        //4 聲明
        String exchangeName = "test_topic_exchange";
        String routingKey1 = "user.save";
        String routingKey2 = "user.update";
        String routingKey3 = "user.delete.abc";
        //5 發(fā)送
        String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
        channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
        channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
        channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());
        channel.close();
        connection.close();
    }
}

代碼的區(qū)別: 一條消息會(huì)發(fā)送在多個(gè)隊(duì)列里 消費(fèi)端:

在這里插入圖片描述

生產(chǎn)端:

在這里插入圖片描述

控制臺(tái)輸出

在這里插入圖片描述

并且可以同時(shí)綁定多個(gè)交換機(jī)

在這里插入圖片描述

輸出交換機(jī)

輸出交換機(jī)Fanout Exchange(不做路由)

  • 不處理路由鍵,只需要簡(jiǎn)單的將隊(duì)列綁定到交換機(jī)上;
  • 發(fā)送到交換機(jī)的消息都會(huì)被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上;
  • Fanout交換機(jī)轉(zhuǎn)發(fā)消息是最快的

在這里插入圖片描述

消費(fèi)端代碼

package com.xieminglu.rabbitmqapi.exchange.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Consumer4FanoutExchange {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;
        connectionFactory.setHost("192.168.248.134");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //4 聲明
        String exchangeName = "test_fanout_exchange";
        String exchangeType = "fanout";
        String queueName = "test_fanout_queue";
        String routingKey = "";    //不設(shè)置路由鍵
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //參數(shù):隊(duì)列名稱(chēng)、是否自動(dòng)ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
        //循環(huán)獲取消息
        while(true){
            //獲取消息,如果沒(méi)有消息,這一步將會(huì)一直阻塞
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息:" + msg);
        }
    }
}

生產(chǎn)端代碼

package com.xieminglu.rabbitmqapi.exchange.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer4FanoutExchange {
    public static void main(String[] args) throws Exception {
        //1 創(chuàng)建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.248.134");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //2 創(chuàng)建Connection
        Connection connection = connectionFactory.newConnection();
        //3 創(chuàng)建Channel
        Channel channel = connection.createChannel();
        //4 聲明
        String exchangeName = "test_fanout_exchange";
        //5 發(fā)送
        for(int i = 0; i < 10; i ++) {
            String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
            channel.basicPublish(exchangeName, "", null , msg.getBytes());
        }
        channel.close();
        connection.close();
    }
}

消費(fèi)端:

在這里插入圖片描述

生產(chǎn)端:

在這里插入圖片描述

在這里插入圖片描述

控制臺(tái)輸出

在這里插入圖片描述

在這里插入圖片描述

在這里插入圖片描述

Binding-綁定

  • Exchange和Exchange、Queue之間的連接關(guān)系;
  • Binding中可以包含RoutingKey或者參數(shù)

Queue-消息隊(duì)列

  • 消息隊(duì)列,實(shí)際存儲(chǔ)消息數(shù)據(jù)
  • Durability:是否持久化
  • Durable:是,Transient:否
  • Auto delete:如選yes,代表當(dāng)最后一個(gè)監(jiān)聽(tīng)被移除之后,該Queue會(huì)自動(dòng)被刪除

Message-消息

  • 服務(wù)器和應(yīng)用程序之間傳遞的數(shù)據(jù)
  • 本質(zhì)上就是一段數(shù)據(jù),由Properties和Payload(Body)組成
  • 常用屬性:delivery model、headers(自定義屬性)

Message-其他屬性

  • content_type、content_encoding、priority
  • correlation_id、reply_to、expiration、message_id
  • Timestamp、type、user_id、app_id、cluster_id

Virtual host-虛擬主機(jī)

  • 虛擬地址,用于進(jìn)行邏輯隔離,最上層的消息路由
  • 一個(gè)Virtual Host里面可以有若干個(gè)Exchange和Queue
  • 同一個(gè)Virtual Host里面不能有相同名稱(chēng)的Exchange或Queue

總結(jié)一下

交換機(jī)的概念

在沒(méi)有交換機(jī)的時(shí)候,我們的消息隊(duì)列會(huì)處理所有發(fā)給這個(gè)消息隊(duì)列的消息,然后由消費(fèi)者一個(gè)一個(gè)消費(fèi)這個(gè)隊(duì)列里面的消息,如果由集群的話(huà)還會(huì)分?jǐn)倢?duì)這個(gè)消息隊(duì)列的處理。只不過(guò)這里面有一個(gè)

Message acknowledgment的概念

這將會(huì)導(dǎo)致嚴(yán)重的bug——Queue中堆積的消息會(huì)越來(lái)越多

在這里插入圖片描述

當(dāng)然一般的消息中間件都不會(huì)這么干,我們使用了交換機(jī)后,我們看到我們的三種策略,其實(shí)都可以說(shuō)由交換機(jī)去找跟它所綁定的消息隊(duì)列,如果生產(chǎn)端的路由鍵不符合要求或找不到消息隊(duì)列定好的路由鍵的話(huà)就會(huì)進(jìn)行其他處理。

在這里插入圖片描述

到此這篇關(guān)于Java中RabbitMQ消息隊(duì)列的交換機(jī)詳解的文章就介紹到這了,更多相關(guān)RabbitMQ交換機(jī)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論