C#使用RabbitMQ發(fā)送和接收消息工具類的實現(xiàn)
更新時間:2023年12月19日 11:36:48 作者:讓夢想瘋狂
RabbitMQ是一個消息的代理器,用于接收和發(fā)送消息,本文主要介紹了C#使用RabbitMQ發(fā)送和接收消息工具類的實現(xiàn),具有一定的參考價值,感興趣的可以了解一下
下面是一個簡單的 C# RabbitMQ 發(fā)送和接收消息的封裝工具類的示例代碼:
工具類
通過NuGet安裝RabbitMQ.Client
using Newtonsoft.Json; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Channels; using System.Threading.Tasks; namespace WorkerService1 { public class RabbitMQHelper : IDisposable { private readonly ConnectionFactory _factory; private IConnection _connection; private IModel _channel; public RabbitMQHelper() { // 設置連接參數(shù) _factory = new ConnectionFactory() { HostName = "localhost", Port = 5672, UserName = "guest", Password = "guest" }; } /// <summary> /// 發(fā)送消息 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="queueName"></param> /// <param name="message"></param> public void SendMessage<T>(string queueName, T message) { try { InitConnection(); // 聲明隊列 _channel.QueueDeclare(queue: queueName, durable: true,// 設置為true表示隊列是持久化的 exclusive: false, autoDelete: false, arguments: null); var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)); _channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body); } catch (Exception ex) { Console.WriteLine("Failed to send message: {0}", ex.Message); } } /// <summary> /// 接收消息 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="queueName"></param> /// <param name="messageHandler"></param> public void ReceiveMessage<T>(string queueName, Action<T> messageHandler) { try { InitConnection(); // 聲明隊列(接收需聲明隊列,否則隊列不存在時,無法接收消息) _channel.QueueDeclare(queue: queueName, durable: true, // 設置為true表示隊列是持久化的 exclusive: false, autoDelete: false, arguments: null); //設置消費者數(shù)量(并發(fā)度),每個消費者每次只能處理一條消息 _channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); // 創(chuàng)建消費者 var consumer = new EventingBasicConsumer(_channel); consumer.Received += (model, ea) => { try { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); var convertedMessage = JsonConvert.DeserializeObject<T>(message); //委托方法 messageHandler.Invoke(convertedMessage); // 消息處理成功,確認消息 _channel.BasicAck(ea.DeliveryTag, false); } catch (Exception ex) { // 消息處理異常,確認消息 _channel.BasicAck(ea.DeliveryTag, false); } }; _channel.BasicConsume(queue: queueName, autoAck: false,// 設置為true表示自動確認消息 consumer: consumer); } catch (Exception ex) { Console.WriteLine("Failed to receive message: {0}", ex.Message); } } /// <summary> /// 初始化鏈接 /// </summary> private void InitConnection() { if (_connection == null || !_connection.IsOpen) { _connection = _factory.CreateConnection(); _channel = _connection.CreateModel(); } } /// <summary> /// 釋放資源 /// </summary> public void Dispose() { _channel?.Close(); _channel?.Dispose(); _connection?.Close(); _connection?.Dispose(); } } }
使用示例
using System; using System.Text; using System.Threading.Tasks; using WorkerService1; public class Program { private static string QueueName = "myqueue_key"; public static void Main() { var rabbitMQHelper = new RabbitMQHelper(); for (long i = 0; i < 30; i++) { rabbitMQHelper.SendMessage(QueueName, i); } rabbitMQHelper.ReceiveMessage<long>(QueueName, ReceivedHandle); Console.ReadLine(); } /// <summary> /// 接收處理 /// </summary> /// <param name="index"></param> private static void ReceivedHandle(long index) { try { Console.WriteLine($"第{index}次開始{DateTime.Now}"); Thread.Sleep(2000); Console.WriteLine($"第{index}次結束{DateTime.Now}"); } catch (Exception ex) { Console.WriteLine(ex.Message); } } }
到此這篇關于C#使用RabbitMQ發(fā)送和接收消息工具類的實現(xiàn)的文章就介紹到這了,更多相關C# RabbitMQ發(fā)送和接收內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
C# PC版微信消息監(jiān)聽自動回復的實現(xiàn)方法
這篇文章主要介紹了C# PC版微信消息監(jiān)聽自動回復的實現(xiàn)方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-05-05C#開發(fā)Winform實現(xiàn)學生管理系統(tǒng)
這篇文章介紹了C#開發(fā)Winform實現(xiàn)學生管理系統(tǒng)的項目案例,文中通過示例代碼介紹的非常詳細。對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-05-05