欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

運(yùn)用.net core中實(shí)例講解RabbitMQ

 更新時(shí)間:2021年09月03日 11:25:32   作者:包子wxl  
RabbitMQ是實(shí)現(xiàn)了高級(jí)消息隊(duì)列協(xié)議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件),本文詳細(xì)講解了RabbitMQ以及運(yùn)用.net core中實(shí)例講解其6中模式,感興趣的小伙伴一起來學(xué)習(xí)吧

一、RabbitMQ簡介

是一個(gè)開源的消息代理和隊(duì)列服務(wù)器,用來通過普通協(xié)議在完全不同的應(yīng)用之間共享數(shù)據(jù),RabbitMQ是使用Erlang(高并發(fā)語言)語言來編寫的,并且RabbitMQ是基于AMQP協(xié)議的。

(1) AMQP協(xié)議

Advanced Message Queuing Protocol(高級(jí)消息隊(duì)列協(xié)議)

(2)AMQP專業(yè)術(shù)語

(多路復(fù)用->在同一個(gè)線程中開啟多個(gè)通道進(jìn)行操作)

  • Server:又稱broker,接受客戶端的鏈接,實(shí)現(xiàn)AMQP實(shí)體服務(wù)
  • Connection:連接,應(yīng)用程序與broker的網(wǎng)絡(luò)連接
  • Channel:網(wǎng)絡(luò)信道,幾乎所有的操作都在channel中進(jìn)行,Channel是進(jìn)行消息讀寫的通道??蛻舳丝梢越⒍鄠€(gè)channel,每個(gè)channel代表一個(gè)會(huì)話任務(wù)。
  • Message:消息,服務(wù)器與應(yīng)用程序之間傳送的數(shù)據(jù),由Properties和Body組成.Properties可以對(duì)消息進(jìn)行修飾,必須消息的優(yōu)先級(jí)、延遲等高級(jí)特性;Body則是消息體內(nèi)容。
  • virtualhost: 虛擬地址,用于進(jìn)行邏輯隔離,最上層的消息路由。一個(gè)virtual host里面可以有若干個(gè)Exchange和Queue,同一個(gè)Virtual Host 里面不能有相同名稱的Exchange 或 Queue。
  • Exchange:交換機(jī),接收消息,根據(jù)路由鍵轉(zhuǎn)單消息到綁定隊(duì)列
  • Binding: Exchange和Queue之間的虛擬鏈接,binding中可以包換routing key
  • Routing key: 一個(gè)路由規(guī)則,虛擬機(jī)可用它來確定如何路由一個(gè)特定消息。(如負(fù)載均衡)

(3)RabbitMQ整體架構(gòu)

ClientA(生產(chǎn)者)發(fā)送消息到Exchange1(交換機(jī)),同時(shí)帶上RouteKey(路由Key),Exchange1找到綁定交換機(jī)為它和綁定傳入的RouteKey的隊(duì)列,把消息轉(zhuǎn)發(fā)到對(duì)應(yīng)的隊(duì)列,消費(fèi)者Client1,Client2,Client3只需要指定對(duì)應(yīng)的隊(duì)列名即可以消費(fèi)隊(duì)列數(shù)據(jù)。

交換機(jī)和隊(duì)列多對(duì)多關(guān)系,實(shí)際開發(fā)中一般是一個(gè)交換機(jī)對(duì)多個(gè)隊(duì)列,防止設(shè)計(jì)復(fù)雜化。

二、安裝RabbitMQ

安裝方式不影響下面的使用,這里用Docker安裝

 #15672端口為web管理端的端口,5672為RabbitMQ服務(wù)的端口
docker run -d --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -p 15672:15672 -p 5672:5672 rabbitmq:3-management

輸入:ip:5672訪問驗(yàn)證。

建一個(gè)名為develop的Virtual host(虛擬主機(jī))使用,項(xiàng)目中一般是一個(gè)項(xiàng)目建一個(gè)Virtual host用,能夠隔離隊(duì)列。

切換Virtual host

三、RabbitMQ六種隊(duì)列模式在.NetCore中使用

(1)簡單隊(duì)列

最簡單的工作隊(duì)列,其中一個(gè)消息生產(chǎn)者,一個(gè)消息消費(fèi)者,一個(gè)隊(duì)列。也稱為點(diǎn)對(duì)點(diǎn)模式

描述:一個(gè)生產(chǎn)者 P 發(fā)送消息到隊(duì)列 Q,一個(gè)消費(fèi)者 C 接收

建一個(gè)RabbitMQHelper.cs類

/// <summary>
    /// RabbitMQ幫助類
    /// </summary>
    public class RabbitMQHelper
    {
        private static ConnectionFactory factory;
        private static object lockObj = new object();
        /// <summary>
        /// 獲取單個(gè)RabbitMQ連接
        /// </summary>
        /// <returns></returns>
        public static IConnection GetConnection()
        {
            if (factory == null)
            {
                lock (lockObj)
                {
                    if (factory == null)
                    {
                         factory = new ConnectionFactory
                        {
                            HostName = "172.16.2.84",//ip
                            Port = 5672,//端口
                            UserName = "admin",//賬號(hào)
                            Password = "123456",//密碼
                            VirtualHost = "develop" //虛擬主機(jī)
                        };
                    }
                }
            }
            return factory.CreateConnection();
        }
    }

生產(chǎn)者代碼

新建發(fā)送類Send.cs

public static void SimpleSendMsg()
        {
            string queueName = "simple_order";//隊(duì)列名
            //創(chuàng)建連接
            using (var connection = RabbitMQHelper.GetConnection())
            {
                //創(chuàng)建信道
                using (var channel = connection.CreateModel())
                {//創(chuàng)建隊(duì)列
                    channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
                    for (var i = 0; i < 10; i++)
                    {
                        string message = $"Hello RabbitMQ MessageHello,{i + 1}";
                        var body = Encoding.UTF8.GetBytes(message);//發(fā)送消息
                        channel.BasicPublish(exchange: "", routingKey: queueName, mandatory: false, basicProperties: null, body);
                        Console.WriteLine($"發(fā)送消息到隊(duì)列:{queueName},內(nèi)容:{message}");
                    }
                }
            }
        }

創(chuàng)建隊(duì)列參數(shù)解析:

durable:是否持久化。

exclusive:排他隊(duì)列,只有創(chuàng)建它的連接(connection)能連,創(chuàng)建它的連接關(guān)閉,會(huì)自動(dòng)刪除隊(duì)列。

autoDelete:被消費(fèi)后,消費(fèi)者數(shù)量都斷開時(shí)自動(dòng)刪除隊(duì)列。

arguments:創(chuàng)建隊(duì)列的參數(shù)。

發(fā)送消息參數(shù)解析:

exchange:交換機(jī),為什么能傳空呢,因?yàn)镽abbitMQ內(nèi)置有一個(gè)默認(rèn)的交換機(jī),如果傳空時(shí),就會(huì)用默認(rèn)交換機(jī)。

routingKey:路由名稱,這里用隊(duì)列名稱做路由key。

mandatory:true告訴服務(wù)器至少將消息route到一個(gè)隊(duì)列種,否則就將消息return給發(fā)送者;false:沒有找到路由則消息丟棄。

執(zhí)行效果:

隊(duì)列產(chǎn)生10條消息。

消費(fèi)者代碼

新建Recevie.cs類

public static void SimpleConsumer()
        {
            string queueName = "simple_order";
            var connection = RabbitMQHelper.GetConnection();
            {
                //創(chuàng)建信道
                var channel = connection.CreateModel();
                {
                    //創(chuàng)建隊(duì)列
                     channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
                    var consumer = new EventingBasicConsumer(channel);
                    int i = 0;
                    consumer.Received += (model, ea) =>
                    {
                        //消費(fèi)者業(yè)務(wù)處理
                        var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                        Console.WriteLine($"{i},隊(duì)列{queueName}消費(fèi)消息長度:{message.Length}");
                        i++;
                    };
                    channel.BasicConsume(queueName, true, consumer);
                }
            }
        }

消費(fèi)者只需要知道隊(duì)列名就可以消費(fèi)了,不需要Exchange和routingKey。

注:消費(fèi)者這里有一個(gè)創(chuàng)建隊(duì)列,它本身不需要,是預(yù)防消費(fèi)端程序先執(zhí)行,沒有隊(duì)列會(huì)報(bào)錯(cuò)。

執(zhí)行效果:

消息已經(jīng)被消費(fèi)完。

(2)工作隊(duì)列模式

一個(gè)消息生產(chǎn)者,一個(gè)交換器,一個(gè)消息隊(duì)列,多個(gè)消費(fèi)者。同樣也稱為點(diǎn)對(duì)點(diǎn)模式

生產(chǎn)者P發(fā)送消息到隊(duì)列,多個(gè)消費(fèi)者C消費(fèi)隊(duì)列的數(shù)據(jù)。

工作隊(duì)列也稱為公平性隊(duì)列模式,循環(huán)分發(fā),RabbitMQ將按順序?qū)⒚織l消息發(fā)送給下一個(gè)消費(fèi)者,每個(gè)消費(fèi)者將獲得相同數(shù)量的消息。

生產(chǎn)者

Send.cs代碼:

/// <summary>
        /// 工作隊(duì)列模式
        /// </summary>
        public static void WorkerSendMsg()
        {
            string queueName = "worker_order";//隊(duì)列名
            //創(chuàng)建連接
            using (var connection = RabbitMQHelper.GetConnection())
            {
                //創(chuàng)建信道
                using (var channel = connection.CreateModel())
                {
                    //創(chuàng)建隊(duì)列
                    channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
             var properties = channel.CreateBasicProperties();
             properties.Persistent = true; //消息持久化
                    for ( var i=0;i<10;i++)
                    {
                        string message = $"Hello RabbitMQ MessageHello,{i+1}";
                        var body = Encoding.UTF8.GetBytes(message);
                        //發(fā)送消息到rabbitmq
                        channel.BasicPublish(exchange: "", routingKey: queueName, mandatory: false, basicProperties: properties, body);
                        Console.WriteLine($"發(fā)送消息到隊(duì)列:{queueName},內(nèi)容:{message}");
                    }
                }
            }
        }

參數(shù)durable:true,需要持久化,實(shí)際項(xiàng)目中肯定需要持久化的,不然重啟RabbitMQ數(shù)據(jù)就會(huì)丟失了。

執(zhí)行效果:

寫入10條數(shù)據(jù),有持久化標(biāo)識(shí)D

消費(fèi)端

Recevie代碼:

public static void WorkerConsumer()
        {
            string queueName = "worker_order";
            var connection = RabbitMQHelper.GetConnection();
            {
                //創(chuàng)建信道
                var channel = connection.CreateModel();
                {
                    //創(chuàng)建隊(duì)列
                    channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
                    var consumer = new EventingBasicConsumer(channel);
                    //prefetchCount:1來告知RabbitMQ,不要同時(shí)給一個(gè)消費(fèi)者推送多于 N 個(gè)消息,也確保了消費(fèi)速度和性能
                    channel.BasicQos(prefetchSize: 0, prefetchCount:1, global: false);
                    int i = 1;
                    int index = new Random().Next(10);
                    consumer.Received += (model, ea) =>
                    {
                        //處理業(yè)務(wù)
                        var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                        Console.WriteLine($"{i},消費(fèi)者:{index},隊(duì)列{queueName}消費(fèi)消息長度:{message.Length}");
                         channel.BasicAck(ea.DeliveryTag, false); //消息ack確認(rèn),告訴mq這條隊(duì)列處理完,可以從mq刪除了               Thread.Sleep(1000);                        i++;
                    };
                    channel.BasicConsume(queueName,autoAck:false, consumer);
                }
            }
        }

BasicQos參數(shù)解析:

prefetchSize:每條消息大小,一般設(shè)為0,表示不限制。

prefetchCount:1,作用限流,告訴RabbitMQ不要同時(shí)給一個(gè)消費(fèi)者推送多于N個(gè)消息,消費(fèi)者會(huì)把N條消息緩存到本地一條條消費(fèi),如果不設(shè),RabbitMQ會(huì)進(jìn)可能快的把消息推到客戶端,導(dǎo)致客戶端內(nèi)存升高。設(shè)置合理可以不用頻繁從RabbitMQ 獲取能提升消費(fèi)速度和性能,設(shè)的太多的話則會(huì)增大本地內(nèi)存,需要根據(jù)機(jī)器性能合理設(shè)置,官方建議設(shè)為30。

global:是否為全局設(shè)置。

這些限流設(shè)置針對(duì)消費(fèi)者autoAck:false時(shí)才有效,如果是自動(dòng)Ack的,限流不生效。

執(zhí)行兩個(gè)消費(fèi)者,效果:

可以看到消費(fèi)者號(hào)的標(biāo)識(shí),8,2,8,2是平均的,一個(gè)消費(fèi)者5個(gè),RabbitMQ上也能看到有2個(gè)消費(fèi)者,Unacked數(shù)是2,因?yàn)槊總€(gè)客戶端的限流數(shù)是1。

工作隊(duì)列模式也是很常用的隊(duì)列模式。

(3)發(fā)布訂閱模式

Pulish/Subscribe,無選擇接收消息,一個(gè)消息生產(chǎn)者,一個(gè)交換機(jī)(交換機(jī)類型為fanout),多個(gè)消息隊(duì)列,多個(gè)消費(fèi)者。稱為發(fā)布/訂閱模式

在應(yīng)用中,只需要簡單的將隊(duì)列綁定到交換機(jī)上。一個(gè)發(fā)送到交換機(jī)的消息都會(huì)被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上。很像子網(wǎng)廣播,每臺(tái)子網(wǎng)內(nèi)的主機(jī)都獲得了一份復(fù)制的消息。

生產(chǎn)者P只需把消息發(fā)送到交換機(jī)X,綁定這個(gè)交換機(jī)的隊(duì)列都會(huì)獲得一份一樣的數(shù)據(jù)。

應(yīng)用場景:適合于用同一份數(shù)據(jù)源做不同的業(yè)務(wù)。

生產(chǎn)者代碼

/// <summary>
        /// 發(fā)布訂閱, 扇形隊(duì)列
        /// </summary>
        public static void SendMessageFanout()
        {
            //創(chuàng)建連接
            using (var connection = RabbitMQHelper.GetConnection())
            {
                //創(chuàng)建信道
                using (var channel = connection.CreateModel())
                {
                    string exchangeName = "fanout_exchange";
                    //創(chuàng)建交換機(jī),fanout類型
                    channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
                    string queueName1 = "fanout_queue1";
                    string queueName2 = "fanout_queue2";
                    string queueName3 = "fanout_queue3";
                    //創(chuàng)建隊(duì)列
                    channel.QueueDeclare(queueName1, false, false, false);
                    channel.QueueDeclare(queueName2, false, false, false);
                    channel.QueueDeclare(queueName3, false, false, false);

                    //把創(chuàng)建的隊(duì)列綁定交換機(jī),routingKey不用給值,給了也沒意義的
                    channel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: "");
                    channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "");
                    channel.QueueBind(queue: queueName3, exchange: exchangeName, routingKey: "");
             var properties = channel.CreateBasicProperties();
             properties.Persistent = true; //消息持久化
                    //向交換機(jī)寫10條消息
                    for (int i = 0; i < 10; i++)
                    {
                        string message = $"RabbitMQ Fanout {i + 1} Message";
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchangeName, routingKey: "", null, body);
                        Console.WriteLine($"發(fā)送Fanout消息:{message}");
                    }
                }
            }
        }

執(zhí)行代碼:

向交換機(jī)發(fā)送10條消息,則綁定這個(gè)交換機(jī)的3個(gè)隊(duì)列都會(huì)有10條消息。

消費(fèi)端的代碼和工作隊(duì)列的一樣,只需知道隊(duì)列名即可消費(fèi),聲明時(shí)要和生產(chǎn)者的聲明一樣。

(4)路由模式(推薦使用)

在發(fā)布/訂閱模式的基礎(chǔ)上,有選擇的接收消息,也就是通過 routing 路由進(jìn)行匹配條件是否滿足接收消息。

上圖是一個(gè)結(jié)合日志消費(fèi)級(jí)別的配圖,在路由模式它會(huì)把消息路由到那些 binding key 與 routing key 完全匹配的 Queue 中,此模式也就是 Exchange 模式中的direct模式。

生產(chǎn)者P發(fā)送數(shù)據(jù)是要指定交換機(jī)(X)和routing發(fā)送消息 ,指定的routingKey=error,則隊(duì)列Q1和隊(duì)列Q2都會(huì)有一份數(shù)據(jù),如果指定routingKey=into,或=warning,交換機(jī)(X)只會(huì)把消息發(fā)到Q2隊(duì)列。

生產(chǎn)者代碼

/// <summary>
        /// 路由模式,點(diǎn)到點(diǎn)直連隊(duì)列
        /// </summary>
        public static void SendMessageDirect()
        {
            //創(chuàng)建連接
            using (var connection = RabbitMQHelper.GetConnection())
            {
                //創(chuàng)建信道
                using (var channel = connection.CreateModel())
                {
                    //聲明交換機(jī)對(duì)象,fanout類型
                    string exchangeName = "direct_exchange";
                    channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
                    //創(chuàng)建隊(duì)列
                    string queueName1 = "direct_errorlog";
                    string queueName2 = "direct_alllog";
                    channel.QueueDeclare(queueName1, true, false, false);
                    channel.QueueDeclare(queueName2, true, false, false);

                    //把創(chuàng)建的隊(duì)列綁定交換機(jī),direct_errorlog隊(duì)列只綁定routingKey:error
                    channel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: "error");
                    //direct_alllog隊(duì)列綁定routingKey:error,info
                    channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "info");
                    channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "error");
             var properties = channel.CreateBasicProperties();
             properties.Persistent = true; //消息持久化
                    //向交換機(jī)寫10條錯(cuò)誤日志和10條Info日志
                    for (int i = 0; i < 10; i++)
                    {
                        string message = $"RabbitMQ Direct {i + 1} error Message";
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchangeName, routingKey: "error", properties, body);
                        Console.WriteLine($"發(fā)送Direct消息error:{message}");

                        string message2 = $"RabbitMQ Direct {i + 1} info Message";
                        var body2 = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchangeName, routingKey: "info", properties, body2);
                        Console.WriteLine($"info:{message2}");

                    }
                }
            }
        }

這里創(chuàng)建一個(gè)direct類型的交換機(jī),兩個(gè)路由key,一個(gè)error,一個(gè)info,兩個(gè)隊(duì)列,一個(gè)隊(duì)列只綁定error,一個(gè)隊(duì)列綁定error和info,向error和info各發(fā)10條消息。

執(zhí)行代碼:

查看RabbitMQ管理界面,direct_errorlog隊(duì)列10條,而direct_alllog有20條,因?yàn)閐irect_alllog隊(duì)列兩個(gè)routingKey的消息都進(jìn)去了。

點(diǎn)進(jìn)去看下兩個(gè)隊(duì)列綁定的交換機(jī)和routingKey

消費(fèi)者代碼

消費(fèi)者和工作隊(duì)列一樣,只需根據(jù)隊(duì)列名消費(fèi)即可,這里只消費(fèi)direct_errorlog隊(duì)列作示例

public static void DirectConsumer()
        {
            string queueName = "direct_errorlog";
            var connection = RabbitMQHelper.GetConnection();
            {
                //創(chuàng)建信道
                var channel = connection.CreateModel();
                {
                    //創(chuàng)建隊(duì)列
                    channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
                    var consumer = new EventingBasicConsumer(channel);
                    ///prefetchCount:1來告知RabbitMQ,不要同時(shí)給一個(gè)消費(fèi)者推送多于 N 個(gè)消息,也確保了消費(fèi)速度和性能
                    ///global:是否設(shè)為全局的
                    ///prefetchSize:單條消息大小,通常設(shè)0,表示不做限制
                    //是autoAck=false才會(huì)有效
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true);
                    int i = 1;
                    consumer.Received += (model, ea) =>
                    {
                        //處理業(yè)務(wù)
                        var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                        Console.WriteLine($"{i},隊(duì)列{queueName}消費(fèi)消息長度:{message.Length}");
                        channel.BasicAck(ea.DeliveryTag, false); //消息ack確認(rèn),告訴mq這條隊(duì)列處理完,可以從mq刪除了
                        i++;
                    };
                    channel.BasicConsume(queueName, autoAck: false, consumer);
                }
            }
        }

普通場景中推薦使用路由模式,因?yàn)槁酚赡J接薪粨Q機(jī),有路由key,能夠更好的拓展各種應(yīng)用場景。

(5)主題模式

topics(主題)模式跟routing路由模式類似,只不過路由模式是指定固定的路由鍵 routingKey,而主題模式是可以模糊匹配路由鍵 routingKey,類似于SQL中 = 和 like 的關(guān)系。

P 表示為生產(chǎn)者、 X 表示交換機(jī)、C1C2 表示為消費(fèi)者,紅色表示隊(duì)列。

topics 模式與 routing 模式比較相近,topics 模式不能具有任意的 routingKey,必須由一個(gè)英文句點(diǎn)號(hào)“.”分隔的字符串(我們將被句點(diǎn)號(hào)“.”分隔開的每一段獨(dú)立的字符串稱為一個(gè)單詞),比如 "lazy.orange.a"。topics routingKey 中可以存在兩種特殊字符"*"與“#”,用于做模糊匹配,其中“*”用于匹配一個(gè)單詞,“#”用于匹配多個(gè)單詞(可以是零個(gè))。

以上圖為例:

如果發(fā)送消息的routingKey設(shè)置為:

aaa.orange.rabbit,那么消息會(huì)路由到Q1與Q2,

routingKey=aaa.orange.bb的消息會(huì)路由到Q1,

routingKey=lazy.aa.bb.cc的消息會(huì)路由到Q2;

routingKey=lazy.aa.rabbit的消息會(huì)路由到 Q2(只會(huì)投遞給Q2一次,雖然這個(gè)routingKey 與 Q2 的兩個(gè) bindingKey 都匹配);

沒匹配routingKey的消息將會(huì)被丟棄。

生產(chǎn)者代碼

public static void SendMessageTopic()
        {
            //創(chuàng)建連接
            using (var connection = RabbitMQHelper.GetConnection())
            {
                //創(chuàng)建信道
                using (var channel = connection.CreateModel())
                {
                    //聲明交換機(jī)對(duì)象,fanout類型
                    string exchangeName = "topic_exchange";
                    channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
                    //隊(duì)列名
                    string queueName1 = "topic_queue1";
                    string queueName2 = "topic_queue2";
                    //路由名
                    string routingKey1 = "*.orange.*";
                    string routingKey2 = "*.*.rabbit";
                    string routingKey3 = "lazy.#";
                    channel.QueueDeclare(queueName1, true, false, false);
                    channel.QueueDeclare(queueName2, true, false, false);

                    //把創(chuàng)建的隊(duì)列綁定交換機(jī),routingKey指定routingKey
                    channel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: routingKey1);
                    channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: routingKey2);
                    channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: routingKey3);
                    //向交換機(jī)寫10條消息
                    for (int i = 0; i < 10; i++)
                    {
                        string message = $"RabbitMQ Direct {i + 1} Message";
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchangeName, routingKey: "aaa.orange.rabbit", null, body);
                        channel.BasicPublish(exchangeName, routingKey: "lazy.aa.rabbit", null, body);
                        Console.WriteLine($"發(fā)送Topic消息:{message}");
                    }
                }
            }
        }

這里演示了 routingKey為aaa.orange.rabbit,和lazy.aa.rabbit的情況,第一個(gè)匹配到Q1和Q2,第二個(gè)匹配到Q2,所以應(yīng)該Q1是10條,Q2有20條,

執(zhí)行后看rabbitMQ界面:

(6)RPC模式

與上面其他5種所不同之處,該模式是擁有請(qǐng)求/回復(fù)的。也就是有響應(yīng)的,上面5種都沒有。

RPC是指遠(yuǎn)程過程調(diào)用,也就是說兩臺(tái)服務(wù)器A,B,一個(gè)應(yīng)用部署在A服務(wù)器上,想要調(diào)用B服務(wù)器上應(yīng)用提供的處理業(yè)務(wù),處理完后然后在A服務(wù)器繼續(xù)執(zhí)行下去,把異步的消息以同步的方式執(zhí)行。

客戶端(C)聲明一個(gè)排他隊(duì)列自己訂閱,然后發(fā)送消息到RPC隊(duì)列同時(shí)也把這個(gè)排他隊(duì)列名也在消息里傳進(jìn)去,服務(wù)端監(jiān)聽RPC隊(duì)列,處理完業(yè)務(wù)后把處理結(jié)果發(fā)送到這個(gè)排他隊(duì)列,然后客戶端收到結(jié)果,繼續(xù)處理自己的邏輯。

RPC的處理流程:

  • 當(dāng)客戶端啟動(dòng)時(shí),創(chuàng)建一個(gè)匿名的回調(diào)隊(duì)列。
  • 客戶端為RPC請(qǐng)求設(shè)置2個(gè)屬性:replyTo:設(shè)置回調(diào)隊(duì)列名字;correlationId:標(biāo)記request。
  • 請(qǐng)求被發(fā)送到rpc_queue隊(duì)列中。
  • RPC服務(wù)器端監(jiān)聽rpc_queue隊(duì)列中的請(qǐng)求,當(dāng)請(qǐng)求到來時(shí),服務(wù)器端會(huì)處理并且把帶有結(jié)果的消息發(fā)送給客戶端。接收的隊(duì)列就是replyTo設(shè)定的回調(diào)隊(duì)列。
  • 客戶端監(jiān)聽回調(diào)隊(duì)列,當(dāng)有消息時(shí),檢查correlationId屬性,如果與request中匹配,那就是結(jié)果了。

服務(wù)端代碼

public class RPCServer
    {
        public static void RpcHandle()
        {

            var connection = RabbitMQHelper.GetConnection();
            {
                var channel = connection.CreateModel();
                {
                    string queueName = "rpc_queue";
                    channel.QueueDeclare(queue: queueName, durable: false,
                      exclusive: false, autoDelete: false, arguments: null);
                    channel.BasicQos(0, 1, false);
                    var consumer = new EventingBasicConsumer(channel);
                    channel.BasicConsume(queue: queueName,
                      autoAck: false, consumer: consumer);
                    Console.WriteLine("【服務(wù)端】等待RPC請(qǐng)求...");

                    consumer.Received += (model, ea) =>
                    {
                        string response = null;

                        var body = ea.Body.ToArray();
                        var props = ea.BasicProperties;
                        var replyProps = channel.CreateBasicProperties();
                        replyProps.CorrelationId = props.CorrelationId;

                        try
                        {
                            var message = Encoding.UTF8.GetString(body);
                            Console.WriteLine($"【服務(wù)端】接收到數(shù)據(jù):{ message},開始處理");
                            response = $"消息:{message},處理完成";
                        }
                        catch (Exception e)
                        {
                            Console.WriteLine("錯(cuò)誤:" + e.Message);
                            response = "";
                        }
                        finally
                        {
                            var responseBytes = Encoding.UTF8.GetBytes(response);
                            channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,
                              basicProperties: replyProps, body: responseBytes);
                            channel.BasicAck(deliveryTag: ea.DeliveryTag,
                              multiple: false);
                        }
                    };
                }
            }
        }
       
    }

客戶端

public class RPCClient
    {
        private readonly IConnection connection;
        private readonly IModel channel;
        private readonly string replyQueueName;
        private readonly EventingBasicConsumer consumer;
        private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
        private readonly IBasicProperties props;

        public RPCClient()
        {
            connection = RabbitMQHelper.GetConnection();

            channel = connection.CreateModel();
            replyQueueName = channel.QueueDeclare().QueueName;
            consumer = new EventingBasicConsumer(channel);

            props = channel.CreateBasicProperties();
            var correlationId = Guid.NewGuid().ToString();
            props.CorrelationId = correlationId; //給消息id
            props.ReplyTo = replyQueueName;//回調(diào)的隊(duì)列名,Client關(guān)閉后會(huì)自動(dòng)刪除

            consumer.Received += (model, ea) =>
            {
                var body = ea.Body.ToArray();
                var response = Encoding.UTF8.GetString(body);
                //監(jiān)聽的消息Id和定義的消息Id相同代表這條消息服務(wù)端處理完成
                if (ea.BasicProperties.CorrelationId == correlationId)
                {
                    respQueue.Add(response);
                }
            };

            channel.BasicConsume(
                consumer: consumer,
                queue: replyQueueName,
                autoAck: true);
        }

        public string Call(string message)
        {
            var messageBytes = Encoding.UTF8.GetBytes(message);
            //發(fā)送消息
            channel.BasicPublish(
                exchange: "",
                routingKey: "rpc_queue",
                basicProperties: props,
                body: messageBytes);
            //等待回復(fù)
            return respQueue.Take();
        }

        public void Close()
        {
            connection.Close();
        }
    }

執(zhí)行代碼

static void Main(string[] args)
        {
            Console.WriteLine("Hello World!");
            //啟動(dòng)服務(wù)端,正常邏輯是在另一個(gè)程序
            RPCServer.RpcHandle();
            //實(shí)例化客戶端
            var rpcClient = new RPCClient();
            string message = $"消息id:{new Random().Next(1, 1000)}";
            Console.WriteLine($"【客服端】RPC請(qǐng)求中,{message}");
            //向服務(wù)端發(fā)送消息,等待回復(fù)
            var response = rpcClient.Call(message);
            Console.WriteLine("【客服端】收到回復(fù)響應(yīng):{0}", response);
            rpcClient.Close();
            Console.ReadKey();
        }

測試效果:

z執(zhí)行完,客服端close后,可以接著自己的下一步業(yè)務(wù)處理。

總結(jié)

以上便是RabbitMQ的6中模式在.net core中實(shí)際使用,其中(1)簡單隊(duì)列,(2)工作隊(duì)列,(4)路由模式,(6)RPC模式的交換機(jī)類型都是direct,(3)發(fā)布訂閱的交換機(jī)是fanout,(5)topics的交換機(jī)是topic。正常場景用的是direct,默認(rèn)交換機(jī)也是direct類型的,推薦用(4)路由模式,因?yàn)橹付ń粨Q機(jī)名比起默認(rèn)的交換機(jī)會(huì)容易擴(kuò)展場景,其他的交換機(jī)看業(yè)務(wù)場景所需使用。

下面位置可以看到交換機(jī)類型,amq.開頭那幾個(gè)是內(nèi)置的,避免交換機(jī)過多可以直接使用。

到此這篇關(guān)于運(yùn)用.net core中實(shí)例講解RabbitMQ的文章就介紹到這了,更多相關(guān).net core RabbitMQ內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • ASP.NET內(nèi)置對(duì)象之Application對(duì)象

    ASP.NET內(nèi)置對(duì)象之Application對(duì)象

    Application對(duì)象是HttpApplicationState類的一個(gè)實(shí)例,它可以產(chǎn)生一個(gè)所有Web應(yīng)用程序都可以存取的變量,這個(gè)變量的可以存取范圍涵蓋全部使用者,也就是說只要正在使用這個(gè)網(wǎng)頁的程序都可以存取這個(gè)變量。
    2008-09-09
  • asp.net下Oracle,SQL Server,Access萬能數(shù)據(jù)庫通用類

    asp.net下Oracle,SQL Server,Access萬能數(shù)據(jù)庫通用類

    Oracle,SQL Server,Access萬能數(shù)據(jù)庫通用類!,使用asp.net開發(fā)多數(shù)據(jù)庫系統(tǒng)的朋友可以參考下。
    2010-10-10
  • 正確使用dotnet-*工具的方法

    正確使用dotnet-*工具的方法

    這篇文章介紹了正確使用dotnet-*工具的方法,對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-12-12
  • 在asp.net(C#)中采用自定義標(biāo)簽和XML、XSL顯示數(shù)據(jù)

    在asp.net(C#)中采用自定義標(biāo)簽和XML、XSL顯示數(shù)據(jù)

    在asp.net(C#)中采用自定義標(biāo)簽和XML、XSL顯示數(shù)據(jù)的實(shí)現(xiàn)代碼。
    2009-06-06
  • .NET使用CsvHelper快速讀取和寫入CSV文件的操作方法

    .NET使用CsvHelper快速讀取和寫入CSV文件的操作方法

    在日常開發(fā)中使用CSV文件進(jìn)行數(shù)據(jù)導(dǎo)入和導(dǎo)出、數(shù)據(jù)交換是非常常見的需求,今天我們來講講在.NET中如何使用CsvHelper這個(gè)開源庫快速實(shí)現(xiàn)CSV文件讀取和寫入,需要的朋友可以參考下
    2024-06-06
  • Asp.Net實(shí)現(xiàn)FORM認(rèn)證的一些使用技巧(必看篇)

    Asp.Net實(shí)現(xiàn)FORM認(rèn)證的一些使用技巧(必看篇)

    下面小編就為大家?guī)硪黄狝sp.Net實(shí)現(xiàn)FORM認(rèn)證的一些使用技巧(必看篇)。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2016-08-08
  • ASP.NET中DataTable與DataSet之間的轉(zhuǎn)換示例

    ASP.NET中DataTable與DataSet之間的轉(zhuǎn)換示例

    如果你的數(shù)據(jù)不需要做關(guān)系映射,直接用DataTable效率比較高,下面有個(gè)不錯(cuò)的示例,感興趣的朋友可以參考下
    2013-09-09
  • 二級(jí)域名Cookie問題的解決方法

    二級(jí)域名Cookie問題的解決方法

    今天博客園全面采用二級(jí)域名后,發(fā)現(xiàn)即使用戶已經(jīng)登錄,但在訪問二級(jí)域名Blog頁面時(shí)都顯示沒有登錄(表現(xiàn)為發(fā)表評(píng)論時(shí)要求輸入驗(yàn)證碼, 收藏功能無法正常使用),再次登錄后,進(jìn)入其他二級(jí)域名還是需要登錄。
    2008-10-10
  • Asp.net中使用文本框的值動(dòng)態(tài)生成控件的方法

    Asp.net中使用文本框的值動(dòng)態(tài)生成控件的方法

    這篇文章主要介紹了Asp.net中使用文本框的值動(dòng)態(tài)生成控件的方法,非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友可以參考下
    2016-05-05
  • DataGrid使用心得(調(diào)用及連接數(shù)據(jù)庫等等)

    DataGrid使用心得(調(diào)用及連接數(shù)據(jù)庫等等)

    在工作中遇到把DataGrid中綁定的后臺(tái)數(shù)據(jù)庫數(shù)據(jù)展示給用戶時(shí)把負(fù)數(shù)變?yōu)?的小問題,現(xiàn)在把它記錄下來包括DataGrid的調(diào)用/連接數(shù)據(jù)庫進(jìn)行操作等等,感興趣的朋友可以了解下,或許本新得對(duì)你有所幫助
    2013-02-02

最新評(píng)論