.NETCore基于RabbitMQ實(shí)現(xiàn)延時(shí)隊(duì)列的兩方法
前言
此文章用來記錄自己學(xué)習(xí)延時(shí)隊(duì)列過程的文章,并用.NET這兩種方式實(shí)現(xiàn)了簡(jiǎn)單的Demo。
延時(shí)隊(duì)列的應(yīng)用場(chǎng)景 應(yīng)用下單后,30分鐘沒有支付的話,則自動(dòng)取消訂單活動(dòng)開始前30分鐘,提醒參賽者參加活動(dòng)。活動(dòng)結(jié)束后,30分鐘后提醒未進(jìn)行評(píng)價(jià)的參賽人員進(jìn)行評(píng)價(jià)…
上述的場(chǎng)景都可以使用延時(shí)隊(duì)列進(jìn)行對(duì)應(yīng)的處理。
上面的場(chǎng)景雖說可以通過定時(shí)器也可以處理,但有點(diǎn)浪費(fèi)資源, 而上述的場(chǎng)景時(shí)間是不定的,例如有兩個(gè)活動(dòng)需要提醒參賽者參加,一個(gè)是7點(diǎn)開始 ,另一個(gè)是8點(diǎn)開始,那么觸發(fā)處理的一個(gè)是6點(diǎn)半,一個(gè)是7點(diǎn)半。
實(shí)現(xiàn)延時(shí)隊(duì)列的兩種方式
使用Rabbitmq實(shí)現(xiàn)延時(shí)隊(duì)列可以讓消息持久化,也支持分布式
缺點(diǎn) | |
---|---|
第一種 | 第一種方式的缺陷以及解決方案 |
第二種 | 這個(gè)插件的當(dāng)前設(shè)計(jì)并不真正適合具有大量延遲消息(例如成百上千或數(shù)百萬)的場(chǎng)景。詳情信息 |
利用rabbitmq死信隊(duì)列x-dead-letter-exchange和x-dead-letter-routing-key
實(shí)現(xiàn)需要?jiǎng)?chuàng)建兩對(duì)交換機(jī)和隊(duì)列,其中需要對(duì)其中一對(duì)的隊(duì)列進(jìn)行設(shè)置x-dead-letter-exchange和x-dead-letter-routing-key屬性,屬性指定轉(zhuǎn)發(fā)到另一對(duì)的交換機(jī),
隨后實(shí)現(xiàn)流程圖如下:
.NETCore實(shí)現(xiàn)方式
項(xiàng)目:.NET Core 控制臺(tái)項(xiàng)目
install-package RabbitMQ.Client
生產(chǎn)者代碼:
ConnectionFactory connectionFactory = new ConnectionFactory { UserName = "guest", Password = "guest", HostName = "127.0.0.1" }; //創(chuàng)建連接 var connection = connectionFactory.CreateConnection(); //創(chuàng)建通道 var channl = connection.CreateModel(); //指定隊(duì)列的x-dead-letter-exchange和x-dead-letter-routing-key Dictionary<string, object> queueArgs = new Dictionary<string, object>() { { "x-dead-letter-exchange","exchange.business.test" }, {"x-dead-letter-routing-key","businessRoutingkey" } }; //延時(shí)的交換機(jī)和隊(duì)列綁定 channl.ExchangeDeclare("exchange.business.dlx", "direct", true, false, null); channl.QueueDeclare("queue.business.dlx", true, false, false, queueArgs); channl.QueueBind("queue.business.dlx", "exchange.business.dlx", ""); //業(yè)務(wù)的交換機(jī)和隊(duì)列綁定 channl.ExchangeDeclare("exchange.business.test", "direct", true, false, null); channl.QueueDeclare("queue.business.test", true, false, false, null); channl.QueueBind("queue.business.test", "exchange.business.test", "businessRoutingkey", null); Console.WriteLine("生產(chǎn)者開始發(fā)送消息"); while (true) { string message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); var properties = channl.CreateBasicProperties(); properties.Persistent = true; properties.Expiration = "5000"; //發(fā)送一條延時(shí)5秒的消息 channl.BasicPublish("exchange.business.dlx", "", properties, body); }
消費(fèi)者
ConnectionFactory connectionFactory = new ConnectionFactory { UserName = "guest", Password = "guest", HostName = "127.0.0.1" }; //創(chuàng)建連接 var connection = connectionFactory.CreateConnection(); var channel = connection.CreateModel(); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); //給消費(fèi)時(shí)添加一個(gè)委托 consumer.Received += (obj, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); //打印消費(fèi)的消息 Console.WriteLine(message); channel.BasicAck(ea.DeliveryTag, false); }; //消費(fèi)queue.business.test隊(duì)列的消息 channel.BasicConsume("queue.business.test", false, consumer); Console.ReadKey(); channel.Dispose(); connection.Close();
實(shí)現(xiàn)效果:
rabbitmq通過安裝插件的形式實(shí)現(xiàn)(推薦)
使用rabbitmq_delayed_message_exchange
插件提供的x-delayed-message
類型的交換機(jī)
下載插件的地址:https://www.rabbitmq.com/community-plugins.html
選中rabbitmq_delayed_message_exchange插件
該插件使用只需要聲明交換機(jī)的時(shí)候,指定x-delayed-message
類型,然后添加x-delayed-type
參數(shù)即可
.NET Core 實(shí)現(xiàn)
生產(chǎn)者
ConnectionFactory connectionFactory = new ConnectionFactory() { UserName = "guest", Password = "guest", HostName = "127.0.0.1" }; var connection = connectionFactory.CreateConnection(); var channel = connection.CreateModel(); Dictionary<string, object> exchangeArgs = new Dictionary<string, object>() { {"x-delayed-type","direct" } }; //指定x-delayed-message 類型的交換機(jī),并且添加x-delayed-type屬性 channel.ExchangeDeclare("plug.delay.exchange", "x-delayed-message", true, false, exchangeArgs); channel.QueueDeclare("plug.delay.queue", true, false, false, null); channel.QueueBind("plug.delay.queue", "plug.delay.exchange", "plugdelay"); var properties = channel.CreateBasicProperties(); Console.WriteLine("生產(chǎn)者開始發(fā)送消息"); Dictionary<string, object> headers = new Dictionary<string, object>() { {"x-delay","5000" } }; properties.Persistent = true; properties.Headers = headers; while (true) { string message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish("plug.delay.exchange", "plugdelay", properties, body); }
消費(fèi)者:
ConnectionFactory connectionFactory = new ConnectionFactory { UserName = "guest", Password = "guest", HostName = "127.0.0.1" }; //創(chuàng)建連接 var connection = connectionFactory.CreateConnection(); var channel = connection.CreateModel(); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (obj, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine(message); channel.BasicAck(ea.DeliveryTag, false); }; channel.BasicConsume("plug.delay.queue", false, consumer); Console.ReadKey(); channel.Dispose(); connection.Close();
實(shí)現(xiàn)效果:
第一種方式的缺陷以及解決方案
如果存在A、B消息進(jìn)入了隊(duì)列中,A在前,B在后,如果B消息的過期時(shí)間比A的過期時(shí)間要早,消費(fèi)的時(shí)候,并不會(huì)先消費(fèi)B,再消費(fèi)A,而是B會(huì)等A先消費(fèi),即使A要晚過期
舉例
生產(chǎn)者代碼修改成如下:
ConnectionFactory connectionFactory = new ConnectionFactory { UserName = "guest", Password = "guest", HostName = "127.0.0.1" }; //創(chuàng)建連接 var connection = connectionFactory.CreateConnection(); //創(chuàng)建通道 var channl = connection.CreateModel(); Dictionary<string, object> queueArgs = new Dictionary<string, object>() { { "x-dead-letter-exchange","exchange.business.test" }, {"x-dead-letter-routing-key","businessRoutingkey" } }; //延時(shí)的交換機(jī)和隊(duì)列綁定 channl.ExchangeDeclare("exchange.business.dlx", "direct", true, false, null); channl.QueueDeclare("queue.business.dlx", true, false, false, queueArgs); channl.QueueBind("queue.business.dlx", "exchange.business.dlx", ""); //業(yè)務(wù)的交換機(jī)和隊(duì)列綁定 channl.ExchangeDeclare("exchange.business.test", "direct", true, false, null); channl.QueueDeclare("queue.business.test", true, false, false, null); channl.QueueBind("queue.business.test", "exchange.business.test", "businessRoutingkey", null); string message1 = "Hello Word!1"; string message2 = "Hello Word!2"; var body1 = Encoding.UTF8.GetBytes(message1); var body2 = Encoding.UTF8.GetBytes(message2); var properties = channl.CreateBasicProperties(); properties.Persistent = true; //先發(fā)送過期時(shí)間5秒的消息 properties.Expiration = "5000"; channl.BasicPublish("exchange.business.dlx", "", properties, body2); //再發(fā)送過期時(shí)間3秒的消息 properties.Expiration = "3000"; channl.BasicPublish("exchange.business.dlx", "", properties, body1);
結(jié)果:
這里先發(fā)了延時(shí)20秒的A消息,然后又發(fā)了延時(shí)10秒的B消息,但是最終結(jié)果并不是先消費(fèi)了B消息,而是等A消息過期后,立刻再去消費(fèi)B。
這個(gè)會(huì)影響什么業(yè)務(wù)呢?好比兩個(gè)C、D活動(dòng),C活動(dòng)開始時(shí)間是7點(diǎn),D活動(dòng)開始時(shí)間是5點(diǎn),那么D活動(dòng)提醒需要等到C活動(dòng)提醒后,才會(huì)立刻提醒,這明顯不符合我們的業(yè)務(wù)需求。
解決方案 每個(gè)活動(dòng)都是單獨(dú)的創(chuàng)建自己的交換機(jī)和隊(duì)列使用第二種實(shí)現(xiàn)方式,即使用插件的形式。
第一種不太現(xiàn)實(shí),因?yàn)槿绻顒?dòng)多的話,則會(huì)創(chuàng)建很多的隊(duì)列,而且只會(huì)使用一次。
業(yè)務(wù)上還是推薦使用插件的實(shí)現(xiàn)方式。
第二種方式的效果
github地址:
https://github.com/MDZZ3/RabbitmqDelay
到此這篇關(guān)于.NETCore基于RabbitMQ實(shí)現(xiàn)延時(shí)隊(duì)列的兩方法的文章就介紹到這了,更多相關(guān).NETCore RabbitMQ 內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- 如何在一個(gè).NET?Core項(xiàng)目中使用RabbitMQ進(jìn)行即時(shí)消息管理
- .NET Core中RabbitMQ使用死信隊(duì)列的實(shí)現(xiàn)
- .Net?Core和RabbitMQ限制循環(huán)消費(fèi)的方法
- 運(yùn)用.net core中實(shí)例講解RabbitMQ高可用集群構(gòu)建
- 運(yùn)用.NetCore實(shí)例講解RabbitMQ死信隊(duì)列,延時(shí)隊(duì)列
- 運(yùn)用.net core中實(shí)例講解RabbitMQ
- .NET Core實(shí)現(xiàn)RabbitMQ消息隊(duì)列的示例代碼
相關(guān)文章
asp.net使用jquery實(shí)現(xiàn)搜索框默認(rèn)提示功能
這篇文章主要介紹了asp.net使用jquery實(shí)現(xiàn)搜索框默認(rèn)提示功能,大家參考使用吧2014-01-01.Net MVC實(shí)現(xiàn)長(zhǎng)輪詢
這篇文章主要為大家詳細(xì)介紹了.Net MVC實(shí)現(xiàn)長(zhǎng)輪詢的相關(guān)資料,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-06-06ASP.NET Core中修改配置文件后自動(dòng)加載新配置的方法詳解
這篇文章主要給大家介紹了關(guān)于ASP.NET Core中修改配置文件后自動(dòng)加載新配置的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用ASP.NET Core具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧2020-08-08ASP.NET Core擴(kuò)展庫(kù)之Http日志的使用詳解
這篇文章主要介紹了ASP.NET Core擴(kuò)展庫(kù)之Http日志的使用詳解,幫助大家更好的理解和學(xué)習(xí)使用.net技術(shù),感興趣的朋友可以了解下2021-04-04ASP.NET實(shí)現(xiàn)MVC中獲取當(dāng)前URL、controller及action的方法
這篇文章主要介紹了ASP.NET實(shí)現(xiàn)MVC中獲取當(dāng)前URL、controller及action的方法,結(jié)合實(shí)例形式分析了asp.net mvc獲取當(dāng)前URL、controller及action的具體實(shí)現(xiàn)技巧,需要的朋友可以參考下2017-02-02Asp.net core中實(shí)現(xiàn)自動(dòng)更新的Option的方法示例
這篇文章主要介紹了Asp.net core中實(shí)現(xiàn)自動(dòng)更新的Option的方法示例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-03-03asp.net計(jì)算一串?dāng)?shù)字中每個(gè)數(shù)字出現(xiàn)的次數(shù)
計(jì)算一串?dāng)?shù)字中每個(gè)數(shù)字出現(xiàn)的次數(shù),可以這樣子,先判斷輸入的字符串是不是數(shù)字組成,還是否包含有其它字符2012-05-05