C#利用RabbitMQ實(shí)現(xiàn)點(diǎn)對(duì)點(diǎn)消息傳輸
消息隊(duì)列模型
所有 MQ 產(chǎn)品從模型抽象上來說都是一樣的過程:
消費(fèi)者(consumer)訂閱某個(gè)隊(duì)列。生產(chǎn)者(producer)創(chuàng)建消息,然后發(fā)布到隊(duì)列(queue)中,最后將消息發(fā)送到監(jiān)聽的消費(fèi)者。
RabbitMQ設(shè)置
RabbitMQ是通過交換機(jī)將消息轉(zhuǎn)發(fā)到對(duì)應(yīng)隊(duì)列,所以隊(duì)列需要和交換機(jī)進(jìn)行綁定。本例將隊(duì)列綁定到默認(rèn)的amq.direct交換機(jī),并設(shè)置Routing key,如下圖所示:
RabbitMQ動(dòng)態(tài)庫(kù)安裝
通過NuGet包管理器進(jìn)行安裝RabbitMQ.Client,如下所示:
RabbitMQ.Client相關(guān)知識(shí)點(diǎn)
- ConnectionFactory:構(gòu)造一個(gè)實(shí)例,主要?jiǎng)?chuàng)建連接。
- IConnection:表示一個(gè)基于AMQP協(xié)議的連接。
- IModel:表示一個(gè)RabbitMQ通道,可用于聲明一個(gè)隊(duì)列,然后開始消費(fèi)。
- EventingBasicConsumer:基于獨(dú)立事件監(jiān)聽的基礎(chǔ)消費(fèi)者,可以監(jiān)聽并接收消息。
- 生產(chǎn)者基本步驟:1. 創(chuàng)建連接 2. 基于連接創(chuàng)建通道 3. 基于通道聲明隊(duì)列,4. 開始生產(chǎn)并發(fā)布消息
- 消費(fèi)者基本步驟:1. 創(chuàng)建連接 2. 基于連接創(chuàng)建通道 3. 基于通道聲明隊(duì)列,4. 創(chuàng)建消費(fèi)者,5. 綁定通道和消費(fèi)者,并開始消費(fèi)
示例效果圖
本例主要有一個(gè)生產(chǎn)者,一個(gè)消費(fèi)者,通過消息隊(duì)列進(jìn)行消息轉(zhuǎn)發(fā)和接收。
生產(chǎn)者負(fù)責(zé)消息發(fā)送,如下圖所示:
消費(fèi)者負(fù)責(zé)消息接收,如下圖所示:
核心代碼
代碼結(jié)構(gòu):主要包括生產(chǎn)者,消費(fèi)者,公共基礎(chǔ)代碼,如下所示:
RabbitMqHelper主要?jiǎng)?chuàng)建連接,如下所示:
public class RabbitMqHelper { /// <summary> /// 創(chuàng)建連接 /// </summary> /// <returns></returns> public IConnection GetConnection() { try { var factory = new ConnectionFactory() { HostName = "127.0.0.1", Port = 5672, UserName = "guest", Password = "guest", VirtualHost = "/ShortMsgHost" }; var conn = factory.CreateConnection(); return conn; } catch (Exception ex) { throw ex; } } }
RabbmitMqSendHelper用于發(fā)送消息,如下所示:
public class RabbmitMqSendHelper : RabbitMqHelper { /// <summary> /// 發(fā)送消息 /// </summary> /// <param name="msg"></param> /// <returns></returns> public bool SendMsg(string msg) { try { using (var conn = GetConnection()) { using (var channel = conn.CreateModel()) { channel.QueueDeclare(queue: "ShortMsgQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); var body = Encoding.UTF8.GetBytes(msg); channel.BasicPublish(exchange: "amq.direct", routingKey: "ShortMsgKey", basicProperties: null, body: body); //Console.WriteLine(" [x] Sent {0}", message); }; }; return true; } catch (Exception ex) { throw ex; } } }
RabbitMqReceiveHelper主要用于接收信息,如下所示:
public class RabbitMqReceiveHelper : RabbitMqHelper { public RabbitMqReceiveEventHandler OnReceiveEvent; private IConnection conn; private IModel channel; private EventingBasicConsumer consumer; public bool StartReceiveMsg() { try { conn = GetConnection(); channel = conn.CreateModel(); channel.QueueDeclare(queue: "ShortMsgQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); //Console.WriteLine(" [x] Received {0}", message); if (OnReceiveEvent != null) { OnReceiveEvent(message); } }; channel.BasicConsume(queue: "ShortMsgQueue", autoAck: true, consumer: consumer); return true; } catch (Exception ex) { throw ex; } } }
作者:Alan.hsiang
出處:http://www.cnblogs.com/hsiang/
以上就是C#利用RabbitMQ實(shí)現(xiàn)點(diǎn)對(duì)點(diǎn)消息傳輸?shù)膶?shí)現(xiàn)示例的詳細(xì)內(nèi)容,更多關(guān)于c# 用RabbitMQ實(shí)現(xiàn)點(diǎn)對(duì)點(diǎn)消息傳輸?shù)馁Y料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- C#?RabbitMQ的使用詳解
- C#通過rabbitmq實(shí)現(xiàn)定時(shí)任務(wù)(延時(shí)隊(duì)列)
- C#用RabbitMQ實(shí)現(xiàn)消息訂閱與發(fā)布
- C#使用RabbitMq隊(duì)列(Sample,Work,Fanout,Direct等模式的簡(jiǎn)單使用)
- c# rabbitmq 簡(jiǎn)單收發(fā)消息的示例代碼
- C#調(diào)用RabbitMQ實(shí)現(xiàn)消息隊(duì)列的示例代碼
- C#操作RabbitMQ的完整實(shí)例
- C#實(shí)現(xiàn)rabbitmq 延遲隊(duì)列功能實(shí)例代碼
- C#使用RabbitMQ發(fā)送和接收消息工具類的實(shí)現(xiàn)
相關(guān)文章
C#/VB.NET 在PDF中添加文件包(Portfolio)的方法
這篇文章主要介紹了C#/VB.NET 在PDF中添加文件包(Portfolio)的方法,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2020-06-06C#中XmlTextWriter讀寫xml文件詳細(xì)介紹
.NET中包含了很多支持XML的類,這些類使得程序員使用XML編程就如同理解XML文件一樣簡(jiǎn)單。在這篇文章中,我將給出這樣的一個(gè)類的使用示例,這個(gè)類就是XmlTextWriter類2013-04-04