如何用.NETCore操作RabbitMQ
什么是RabbitMQ?
RabbitMQ是由erlang語言開發(fā)的一個基于AMQP(Advanced Message Queuing Protocol)協(xié)議的企業(yè)級消息隊列中間件??蓪崿F(xiàn)隊列,訂閱/發(fā)布,路由,通配符等工作模式。
為什么要使用RabbitMQ?
- 異步處理:比如發(fā)送郵件,發(fā)送短信等不需要等待處理結(jié)果的操作
- 應(yīng)用解耦:比如下單成功后,通知倉庫發(fā)貨,不需要等待倉庫回應(yīng),通過消息隊列去通知倉庫,降低應(yīng)用間耦合程序,可并行開發(fā)兩個功能模塊
- 流量削鋒:在搶購或者其他的活動頁,服務(wù)處于爆發(fā)式請求狀態(tài),如果直連數(shù)據(jù)庫,數(shù)據(jù)庫容易被拖垮。搶購商品也容易出現(xiàn)庫存超賣的情況。通過隊列可有效解決該問題。
- 日志處理:在單機中,日志直接寫入到文件目錄中,但是在分布式應(yīng)用中,日志需要有統(tǒng)一的處理機制,可通過消息隊列統(tǒng)一由某個消費端做處理。
- 消息通信:如生產(chǎn)端和消費端可通過隊列進行異步通信
如何安裝RabbitMQ?
Windows端
1.安裝erlang語言運行環(huán)境
https://erlang.org/download/otp_win64_23.2.exe
下載后直接下一步即可
2.安裝RabbitMQ
https://www.rabbitmq.com/install-windows.html
直接點擊安裝下一步即可按章
3.安裝RabbitMQ的Web管理平臺
RabbitMQ的管理平臺是通過插件的形式使用,需要手動啟用管理平臺
在Windows下,RabbitMQ默認被安裝到C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.14 下。
打開sbin ,在cmd或者powershell中執(zhí)行
rabbitmq-plugins.bat enable rabbitmq_management
安裝完成后,瀏覽器打開 http://localhost:15672/ 即可看到RabbitMQ的管理界面。輸入默認賬號密碼 guest 成功登錄。
Linux環(huán)境安裝
1.Ubuntu:https://www.rabbitmq.com/install-debian.html
2.Centos:https://www.rabbitmq.com/install-rpm.html
RabbitMQ的基本概念
生產(chǎn)者
發(fā)送消息的端
消費者
獲取消息并處理的端
Connection
一個終端連接。每一個Connection都可以在RabbitMQ后臺看到
Channel
Channel是建立在Connection上的一個虛擬通信管道。一般情況下,往消息隊列中寫入多條消息,為了不每條消息都建立一個TCP連接,所以RabbitMQ的做法是多條消息可以公用一個Connection,大大提高MQ的負載能力。
Exchange
Exchange是一個虛擬交換機。每一條消息都必須要通過交換機才能能進入對應(yīng)的隊列,可以理解為網(wǎng)絡(luò)設(shè)備中的交換機,是一個意思。
Queue
Queue是一個存儲消息的內(nèi)部對象,所有的Rabbit MQ消息都存儲在Queue中。生產(chǎn)者所生產(chǎn)的消息會存儲在Queue中,消費者獲取的消息也是從Queue中獲取。
如何在.NET Core中使用RabbitMQ?
nuget安裝
dotnet add package RabbitMQ.Client
創(chuàng)建生產(chǎn)者
const string QUEUENAME = "HELLO_MQ"; //創(chuàng)建連接對象工廠 var factory = new ConnectionFactory() { UserName = "guest", Password = "guest", HostName = "localhost", Port = 5672, //RabbitMQ默認的端口 }; while (true) { using var conn = factory.CreateConnection(); var chanel = conn.CreateModel(); chanel.QueueDeclare(QUEUENAME, true, false, false); Console.WriteLine("輸入生產(chǎn)內(nèi)容:"); var input = Console.ReadLine(); chanel.BasicPublish("", QUEUENAME, null, Encoding.Default.GetBytes("hello rabbitmq:" + input)); }
在循環(huán)中,輸入一個值,按下enter,即可推送一條消息到隊列。
也可以直接在RabbitMQ的管理后臺查看
可以看到我們發(fā)送的消息已經(jīng)被RabbitMQ存儲在Queue中了。只等某個幸運的消費者前來消費。
創(chuàng)建消費者
const string QUEUENAME = "HELLO_MQ"; var factory = new ConnectionFactory() { UserName = "guest", Password = "guest", HostName = "localhost", Port = 5672, }; var conn = factory.CreateConnection(); var chanel = conn.CreateModel(); chanel.QueueDeclare(QUEUENAME, true, false, false); EventingBasicConsumer consumer = new EventingBasicConsumer(chanel); consumer.Received += (a, e) => { Console.WriteLine($"{DateTime.Now.ToString()}接收到消息:" + Encoding.Default.GetString(e.Body.ToArray())); chanel.BasicAck(e.DeliveryTag, true); //收到回復(fù)后,RabbitMQ會直接在隊列中刪除這條消息 }; chanel.BasicConsume(QUEUENAME, false, consumer); Console.WriteLine("啟動成功"); Console.ReadLine();
啟動成功后,consumer的Received方法,會收到一條來自MQ的消息,
如果處理完成后,不調(diào)用chennel的BasicAck方法,那么這條消息依然會存在,下次有消費者出現(xiàn),會再次推送給消費者。
簡單的RabbitMQ Hello World到這里就算完成了。接下來就是稍微高級一點的應(yīng)用
RabbitMQ的工作模式
Work Queue 工作隊列模式
工作隊列模式的意思就是一個生產(chǎn)者對應(yīng)多個消費者。RabbitMQ會使用輪詢?nèi)ソo每個消費者發(fā)送消息。
publish/subscribe
發(fā)布訂閱模式是屬于比較用多的一種。
發(fā)布訂閱,是由交換機發(fā)布消息給多個隊列。多個隊列再對應(yīng)多個消費者。
發(fā)布訂閱模式對應(yīng)的交換機類型的fanout。
消費者
A
const string QUEUENAME = "HELLO_MQ_B"; const string TESTEXCHANGE = "TESTEXCHANGE"; var factory = new ConnectionFactory() { UserName = "guest", Password = "guest", HostName = "localhost", Port = 5672, }; var conn = factory.CreateConnection(); var channel = conn.CreateModel(); //定義隊列 channel.QueueDeclare(QUEUENAME, true, false, false); //定義交換機 channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Fanout, true, false); //綁定隊列到交換機 channel.QueueBind(QUEUENAME, TESTEXCHANGE, ""); var consumer = new EventingBasicConsumer(channel); consumer.Received += (a, e) => { Console.WriteLine($"{DateTime.Now.ToString()}接收到消息:" + Encoding.Default.GetString(e.Body.ToArray())); channel.BasicAck(e.DeliveryTag, true); //收到回復(fù)后,RabbitMQ會直接在隊列中刪除這條消息 }; channel.BasicConsume(QUEUENAME, false, consumer); Console.WriteLine("啟動成功"); Console.ReadLine();
B
const string QUEUENAME = "HELLO_MQ"; const string TESTEXCHANGE = "TESTEXCHANGE"; var factory = new ConnectionFactory() { UserName = "guest", Password = "guest", HostName = "localhost", Port = 5672, }; var conn = factory.CreateConnection(); var channel = conn.CreateModel(); //定義隊列 channel.QueueDeclare(QUEUENAME, true, false, false); //定義交換機 channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Fanout, true, false); //綁定隊列到交換機 channel.QueueBind(QUEUENAME, TESTEXCHANGE, ""); var consumer = new EventingBasicConsumer(channel); consumer.Received += (a, e) => { Console.WriteLine($"{DateTime.Now.ToString()}接收到消息:" + Encoding.Default.GetString(e.Body.ToArray())); channel.BasicAck(e.DeliveryTag, true); //收到回復(fù)后,RabbitMQ會直接在隊列中刪除這條消息 }; channel.BasicConsume(QUEUENAME, false, consumer); Console.WriteLine("啟動成功"); Console.ReadLine();
生產(chǎn)者
const string QUEUENAME = "HELLO_MQ"; const string QUEUENAME_B = "HELLO_MQ_B"; const string TESTEXCHANGE = "TESTEXCHANGE"; //創(chuàng)建連接對象工廠 var factory = new ConnectionFactory() { UserName = "guest", Password = "guest", HostName = "localhost", Port = 5672, //RabbitMQ默認的端口 }; using var conn = factory.CreateConnection(); while (true) { var channel = conn.CreateModel(); //定義交換機 channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Fanout, true, false); Console.WriteLine("輸入生產(chǎn)內(nèi)容:"); var input = Console.ReadLine(); channel.BasicPublish(TESTEXCHANGE,"", null, Encoding.Default.GetBytes("hello rabbitmq:" + input)); }
在生產(chǎn)者運行成功后,RabbitMQ后臺會出現(xiàn)一個交換機,點擊交換機會看到交換機下綁定了兩個隊列
從生產(chǎn)者發(fā)送消息到隊列,兩個消費者會同時收到消息
routing模式
routing模式對應(yīng)的交換機類型是direct,和發(fā)布訂閱模式的區(qū)別在于:routing模式下,可以指定一個routingkey,用于區(qū)分消息
生產(chǎn)者
var channel = conn.CreateModel(); //定義交換機 channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Direct, true, false); //綁定隊列到交換機 Console.WriteLine("輸入生產(chǎn)內(nèi)容:"); var input = Console.ReadLine(); channel.BasicPublish(TESTEXCHANGE, "INFO", null, Encoding.Default.GetBytes("hello rabbitmq:" + input));
消費者 A
//定義隊列 channel.QueueDeclare(QUEUENAME, true, false, false); //定義交換機 channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Direct, true, false); //綁定隊列到交換機 channel.QueueBind(QUEUENAME, TESTEXCHANGE, "INFO");
消費者 B
//定義隊列 channel.QueueDeclare(QUEUENAME, true, false, false); //定義交換機 channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Direct, true, false); //綁定隊列到交換機 channel.QueueBind(QUEUENAME, TESTEXCHANGE, "ERROR");
綁定成功后,發(fā)送消息,消費者A可以收到消息,消費者B無法收到消息。
如果遇到指定routingKey生產(chǎn)一條消息,結(jié)果 AB消費者都收到的情況。建議在RabbitMQ后臺的交換機下看一下綁定的Queue是否重復(fù)綁定了多個routingKey.
topic通配符模式
在通配符模式下,RabbitMQ使用模糊匹配來決定把消息推送給哪個生產(chǎn)者。通配符有兩個符號來匹配routingKey
1.*匹配一個字符 如:*.qq.com 可匹配 1.qq.com
2.#匹配一個或者多個字符。 如:*.qq.com 可匹配 1.qq.com或者1111.qq.com
其他的操作基本和routing模式一樣。
header模式
header模式是把routingkey放到header中.取消掉了routingKey。并使用一個字典傳遞 K、V的方式來匹配。
比如同時要給用戶發(fā)送郵件和短信,可直接通過header的鍵值對來匹配綁定的值,把消息傳遞給發(fā)短信和郵件的生產(chǎn)者.
以上就是如何用.NETCore操作RabbitMQ的詳細內(nèi)容,更多關(guān)于.NETCore 操作RabbitMQ的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
c#使用nsoup解析html亂碼解決方法分享 nsoup教程
NSoup是JSoup的Net移植版本。使用方法基本一致。如果項目涉及HTML的處理,強烈推薦NSoup。但是遺憾的是NSoup默認的編碼是UTF-8,處理中文有亂碼,下面給出二種解決方法2014-01-01在類庫或winform項目中打開另一個winform項目窗體的方法
這篇文章主要介紹了在類庫或winform項目中打開另一個winform項目窗體的方法,可以實現(xiàn)Winform項目間窗體的調(diào)用,在進行Winform項目開發(fā)中非常具有實用價值,需要的朋友可以參考下2014-11-11Unity UI或3D場景實現(xiàn)跟隨手機陀螺儀的晃動效果
這篇文章主要介紹了Unity UI或3D場景實現(xiàn)跟隨手機陀螺儀的晃動效果,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-11-11.NET中的async和await關(guān)鍵字使用及Task異步調(diào)用實例
這篇文章主要介紹了.NET中的async和await關(guān)鍵字使用及Task異步調(diào)用實例,本文還包含了取消執(zhí)行和顯示進度的例子,需要的朋友可以參考下2014-07-07