.net平臺的rabbitmq使用封裝demo詳解
前言
RabbitMq大家再熟悉不過,這篇文章主要針對rabbitmq學(xué)習(xí)后封裝RabbitMQ.Client的一個分享。文章最后,我會把封裝組件和demo奉上。
什么是rabbitMQ
RabbitMQ是一個由erlang開發(fā)的AMQP(Advanced Message Queue 高級消息隊列協(xié)議 )的開源實現(xiàn),能夠?qū)崿F(xiàn)異步消息處理
RabbitMQ是一個消息代理:它接受和轉(zhuǎn)發(fā)消息。
你可以把它想象成一個郵局:當(dāng)你把你想要發(fā)布的郵件放在郵箱中時,你可以確定郵差先生最終將郵件發(fā)送給你的收件人。在這個比喻中,RabbitMQ是郵政信箱,郵局和郵遞員。
RabbitMQ和郵局的主要區(qū)別在于它不處理紙張,而是接受,存儲和轉(zhuǎn)發(fā)二進制數(shù)據(jù)塊
優(yōu)點:異步消息處理
業(yè)務(wù)解耦(下訂單操作:扣減庫存、生成訂單、發(fā)紅包、發(fā)短信),將下單操作主流程:扣減庫存、生成訂單,然后通過MQ消息隊列完成通知,發(fā)紅包、發(fā)短信,錯峰流控 (通知量 消息量 訂單量大的情況實現(xiàn)MQ消息隊列機制,淡季情況下訪問量會少)
靈活的路由(Flexible Routing)
在消息進入隊列之前,通過 Exchange 來路由消息的。對于典型的路由功能,RabbitMQ 已經(jīng)提供了一些內(nèi)置的 Exchange 來實現(xiàn)。針對更復(fù)雜的路由功能,可以將多個 Exchange 綁定在一起,也通過插件機制實現(xiàn)自己的 Exchange 。
RabbitMQ網(wǎng)站端口號:15672
程序里面實現(xiàn)的端口為:5672
Rabbitmq的關(guān)鍵術(shù)語
1、綁定器(Binding):根據(jù)路由規(guī)則綁定Queue和Exchange。
2、路由鍵(Routing Key):Exchange根據(jù)關(guān)鍵字進行消息投遞。
3、交換機(Exchange):指定消息按照路由規(guī)則進入指定隊列
4、消息隊列(Queue):消息的存儲載體
5、生產(chǎn)者(Producer):消息發(fā)布者。
6、消費者(Consumer):消息接收者。
Rabbitmq的運作
從下圖可以看出,發(fā)布者(Publisher)是把消息先發(fā)送到交換器(Exchange),再從交換器發(fā)送到指定隊列(Queue),而先前已經(jīng)聲明交換器與隊列綁定關(guān)系,最后消費者(Customer)通過訂閱或者主動取指定隊列消息進行消費。
那么剛剛提到的訂閱和主動取可以理解成,推(被動),拉(主動)。
推,只要隊列增加一條消息,就會通知空閑的消費者進行消費。(我不找你,就等你找我,觀察者模式)
拉,不會通知消費者,而是由消費者主動輪循或者定時去取隊列消息。(我需要才去找你)
使用場景我舉個例子,假如有兩套系統(tǒng) 訂單系統(tǒng)和發(fā)貨系統(tǒng),從訂單系統(tǒng)發(fā)起發(fā)貨消息指令,為了及時發(fā)貨,發(fā)貨系統(tǒng)需要訂閱隊列,只要有指令就處理。
可是程序偶爾會出異常,例如網(wǎng)絡(luò)或者DB超時了,把消息丟到失敗隊列,這個時候需要重發(fā)機制。但是我又不想while(IsPostSuccess == True),因為只要出異常了,會在某個時間段內(nèi)都會有異常,這樣的重試是沒意義的。
這個時候不需要及時的去處理消息,有個JOB定時或者每隔幾分鐘(失敗次數(shù)*間隔分鐘)去取失敗隊列消息,進行重發(fā)。
Publish(發(fā)布)的封裝
步驟:初始化鏈接->聲明交換器->聲明隊列->換機器與隊列綁定->發(fā)布消息。注意的是,我將Model存到了ConcurrentDictionary里面,因為聲明與綁定是非常耗時的,其次,往重復(fù)的隊列發(fā)送消息是不需要重新初始化的。
/// <summary> /// 交換器聲明 /// </summary> /// <param name="iModel"></param> /// <param name="exchange">交換器</param> /// <param name="type">交換器類型: /// 1、Direct Exchange – 處理路由鍵。需要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵完全 /// 匹配。這是一個完整的匹配。如果一個隊列綁定到該交換機上要求路由鍵 “dog”,則只有被標(biāo)記為“dog”的 /// 消息才被轉(zhuǎn)發(fā),不會轉(zhuǎn)發(fā)dog.puppy,也不會轉(zhuǎn)發(fā)dog.guard,只會轉(zhuǎn)發(fā)dog /// 2、Fanout Exchange – 不處理路由鍵。你只需要簡單的將隊列綁定到交換機上。一個發(fā)送到交換機的消息都 /// 會被轉(zhuǎn)發(fā)到與該交換機綁定的所有隊列上。很像子網(wǎng)廣播,每臺子網(wǎng)內(nèi)的主機都獲得了一份復(fù)制的消息。Fanout /// 交換機轉(zhuǎn)發(fā)消息是最快的。 /// 3、Topic Exchange – 將路由鍵和某模式進行匹配。此時隊列需要綁定要一個模式上。符號“#”匹配一個或多 /// 個詞,符號“*”匹配不多不少一個詞。因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*” /// 只會匹配到“audit.irs”。</param> /// <param name="durable">持久化</param> /// <param name="autoDelete">自動刪除</param> /// <param name="arguments">參數(shù)</param> private static void ExchangeDeclare(IModel iModel, string exchange, string type = ExchangeType.Direct, bool durable = true, bool autoDelete = false, IDictionary<string, object> arguments = null) { exchange = exchange.IsNullOrWhiteSpace() ? "" : exchange.Trim(); iModel.ExchangeDeclare(exchange, type, durable, autoDelete, arguments); } /// <summary> /// 隊列聲明 /// </summary> /// <param name="channel"></param> /// <param name="queue">隊列</param> /// <param name="durable">持久化</param> /// <param name="exclusive">排他隊列,如果一個隊列被聲明為排他隊列,該隊列僅對首次聲明它的連接可見, /// 并在連接斷開時自動刪除。這里需要注意三點:其一,排他隊列是基于連接可見的,同一連接的不同信道是可 /// 以同時訪問同一個連接創(chuàng)建的排他隊列的。其二,“首次”,如果一個連接已經(jīng)聲明了一個排他隊列,其他連 /// 接是不允許建立同名的排他隊列的,這個與普通隊列不同。其三,即使該隊列是持久化的,一旦連接關(guān)閉或者 /// 客戶端退出,該排他隊列都會被自動刪除的。這種隊列適用于只限于一個客戶端發(fā)送讀取消息的應(yīng)用場景。</param> /// <param name="autoDelete">自動刪除</param> /// <param name="arguments">參數(shù)</param> private static void QueueDeclare(IModel channel, string queue, bool durable = true, bool exclusive = false, bool autoDelete = false, IDictionary<string, object> arguments = null) { queue = queue.IsNullOrWhiteSpace() ? "UndefinedQueueName" : queue.Trim(); channel.QueueDeclare(queue, durable, exclusive, autoDelete, arguments); } /// <summary> /// 獲取Model /// </summary> /// <param name="exchange">交換機名稱</param> /// <param name="queue">隊列名稱</param> /// <param name="routingKey"></param> /// <param name="isProperties">是否持久化</param> /// <returns></returns> private static IModel GetModel(string exchange, string queue, string routingKey, bool isProperties = false) { return ModelDic.GetOrAdd(queue, key => { var model = _conn.CreateModel(); ExchangeDeclare(model, exchange, ExchangeType.Fanout, isProperties); QueueDeclare(model, queue, isProperties); model.QueueBind(queue, exchange, routingKey); ModelDic[queue] = model; return model; }); } /// <summary> /// 發(fā)布消息 /// </summary> /// <param name="routingKey">路由鍵</param> /// <param name="body">隊列信息</param> /// <param name="exchange">交換機名稱</param> /// <param name="queue">隊列名</param> /// <param name="isProperties">是否持久化</param> /// <returns></returns> public void Publish(string exchange, string queue, string routingKey, string body, bool isProperties = false) { var channel = GetModel(exchange, queue, routingKey, isProperties); try { channel.BasicPublish(exchange, routingKey, null, body.SerializeUtf8()); } catch (Exception ex) { throw ex.GetInnestException(); } }
下次是本機測試的發(fā)布速度截圖:
4.2W/S屬于穩(wěn)定速度,把反序列化(ToJson)會稍微快一些。
Subscribe(訂閱)的封裝
發(fā)布的時候是申明了交換器和隊列并綁定,然而訂閱的時候只需要聲明隊列就可。從下面代碼能看到,捕獲到異常的時候,會把消息送到自定義的“死信隊列”里,由另外的JOB進行定時重發(fā),因此,finally是應(yīng)答成功的。
/// <summary> /// 獲取Model /// </summary> /// <param name="queue">隊列名稱</param> /// <param name="isProperties"></param> /// <returns></returns> private static IModel GetModel(string queue, bool isProperties = false) { return ModelDic.GetOrAdd(queue, value => { var model = _conn.CreateModel(); QueueDeclare(model, queue, isProperties); //每次消費的消息數(shù) model.BasicQos(0, 1, false); ModelDic[queue] = model; return model; }); } /// <summary> /// 接收消息 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="queue">隊列名稱</param> /// <param name="isProperties"></param> /// <param name="handler">消費處理</param> /// <param name="isDeadLetter"></param> public void Subscribe<T>(string queue, bool isProperties, Action<T> handler, bool isDeadLetter) where T : class { //隊列聲明 var channel = GetModel(queue, isProperties); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var msgStr = body.DeserializeUtf8(); var msg = msgStr.FromJson<T>(); try { handler(msg); } catch (Exception ex) { ex.GetInnestException().WriteToFile("隊列接收消息", "RabbitMq"); if (!isDeadLetter) PublishToDead<DeadLetterQueue>(queue, msgStr, ex); } finally { channel.BasicAck(ea.DeliveryTag, false); } }; channel.BasicConsume(queue, false, consumer); }
下次是本機測試的發(fā)布速度截圖:
快的時候有1.9K/S,慢的時候也有1.7K/S
Pull(拉)的封裝
直接上代碼:
/// <summary> /// 獲取消息 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="exchange"></param> /// <param name="queue"></param> /// <param name="routingKey"></param> /// <param name="handler">消費處理</param> private void Poll<T>(string exchange, string queue, string routingKey, Action<T> handler) where T : class { var channel = GetModel(exchange, queue, routingKey); var result = channel.BasicGet(queue, false); if (result.IsNull()) return; var msg = result.Body.DeserializeUtf8().FromJson<T>(); try { handler(msg); } catch (Exception ex) { ex.GetInnestException().WriteToFile("隊列接收消息", "RabbitMq"); } finally { channel.BasicAck(result.DeliveryTag, false); } }
快的時候有1.8K/s,穩(wěn)定是1.5K/S
Rpc(遠程調(diào)用)的封裝
首先說明下,RabbitMq只是提供了這個RPC的功能,但是并不是真正的RPC,為什么這么說:
1、傳統(tǒng)Rpc隱藏了調(diào)用細節(jié),像調(diào)用本地方法一樣傳參、拋出異常
2、RabbitMq的Rpc是基于消息的,消費者消費后,通過新隊列返回響應(yīng)結(jié)果。
/// <summary> /// RPC客戶端 /// </summary> /// <param name="exchange"></param> /// <param name="queue"></param> /// <param name="routingKey"></param> /// <param name="body"></param> /// <param name="isProperties"></param> /// <returns></returns> public string RpcClient(string exchange, string queue, string routingKey, string body, bool isProperties = false) { var channel = GetModel(exchange, queue, routingKey, isProperties); var consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(queue, true, consumer); try { var correlationId = Guid.NewGuid().ToString(); var basicProperties = channel.CreateBasicProperties(); basicProperties.ReplyTo = queue; basicProperties.CorrelationId = correlationId; channel.BasicPublish(exchange, routingKey, basicProperties, body.SerializeUtf8()); var sw = Stopwatch.StartNew(); while (true) { var ea = consumer.Queue.Dequeue(); if (ea.BasicProperties.CorrelationId == correlationId) { return ea.Body.DeserializeUtf8(); } if (sw.ElapsedMilliseconds > 30000) throw new Exception("等待響應(yīng)超時"); } } catch (Exception ex) { throw ex.GetInnestException(); } } /// <summary> /// RPC服務(wù)端 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="exchange"></param> /// <param name="queue"></param> /// <param name="isProperties"></param> /// <param name="handler"></param> /// <param name="isDeadLetter"></param> public void RpcService<T>(string exchange, string queue, bool isProperties, Func<T, T> handler, bool isDeadLetter) { //隊列聲明 var channel = GetModel(queue, isProperties); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var msgStr = body.DeserializeUtf8(); var msg = msgStr.FromJson<T>(); var props = ea.BasicProperties; var replyProps = channel.CreateBasicProperties(); replyProps.CorrelationId = props.CorrelationId; try { msg = handler(msg); } catch (Exception ex) { ex.GetInnestException().WriteToFile("隊列接收消息", "RabbitMq"); } finally { channel.BasicPublish(exchange, props.ReplyTo, replyProps, msg.ToJson().SerializeUtf8()); channel.BasicAck(ea.DeliveryTag, false); } }; channel.BasicConsume(queue, false, consumer); }
可以用,但不建議去用。可以考慮其他的RPC框架。grpc、thrift等。
結(jié)尾
本篇文章,沒有過多的寫RabbitMq的知識點,因為園子的學(xué)習(xí)筆記實在太多了。下面把我的代碼奉上 https://github.com/SkyChenSky/Sikiro.Mq.Rabbit。如果有發(fā)現(xiàn)寫得不對的地方麻煩在評論指出,我會及時修改以免誤導(dǎo)別人。
到此這篇關(guān)于.net平臺的rabbitmq使用封裝的文章就介紹到這了,更多相關(guān).net使用rabbitmq內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
asp.net中EXCEL數(shù)據(jù)導(dǎo)入到數(shù)據(jù)庫的方法
這篇文章主要介紹了asp.net中EXCEL數(shù)據(jù)導(dǎo)入到數(shù)據(jù)庫的方法,實現(xiàn)讀取excel數(shù)據(jù)并導(dǎo)入到SQL Server數(shù)據(jù)庫的功能,是非常實用的技巧,需要的朋友可以參考下2015-01-01Asp.Net 通用數(shù)據(jù)操作類 (附通用數(shù)據(jù)基類)
以前經(jīng)常用php的數(shù)據(jù)操作類,這次的asp.net數(shù)據(jù)操作類,是個方法2008-07-07Windows Server 2012 R2 或 2016無法安裝.Net 3.5.1
這篇文章主要為大家詳細介紹了Windows Server 2012 R2 或 2016 無法安裝 .Net 3.5.1,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-02-02國產(chǎn)化中的?.NET?Core?操作達夢數(shù)據(jù)庫DM8的兩種方式(操作詳解)
這篇文章主要介紹了國產(chǎn)化之?.NET?Core?操作達夢數(shù)據(jù)庫DM8的兩種方式,這里提供兩種方式是傳統(tǒng)的DbHelperSQL方式和Dapper?方式,每種方式給大家介紹的非常詳細,需要的朋友可以參考下2022-04-04讓GridView只更新某些特定的數(shù)據(jù)的方法
我又不希望所有的數(shù)據(jù)都可以修改,只希望修改某些特定的列,用下面的方法即可2008-10-10HttpWebRequest的常見錯誤使用TcpClient可避免
有時使用HttpWebRequest對象會出現(xiàn)錯誤有三種服務(wù)器提交了協(xié)議沖突/基礎(chǔ)連接已經(jīng)關(guān)閉:連接被意外關(guān)閉/無法發(fā)送具有此謂詞類型的內(nèi)容正文,感興趣的朋友可以參考下本文2013-02-02.net webapi接收xml格式數(shù)據(jù)的3種情況小結(jié)
這篇文章主要給大家總結(jié)介紹了關(guān)于.net webapi接收xml格式數(shù)據(jù)的3種情況,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧2019-02-02