C#使用RabbitMQ發(fā)送和接收消息工具類的實現
更新時間:2023年12月19日 11:36:48 作者:讓夢想瘋狂
RabbitMQ是一個消息的代理器,用于接收和發(fā)送消息,本文主要介紹了C#使用RabbitMQ發(fā)送和接收消息工具類的實現,具有一定的參考價值,感興趣的可以了解一下
下面是一個簡單的 C# RabbitMQ 發(fā)送和接收消息的封裝工具類的示例代碼:
工具類
通過NuGet安裝RabbitMQ.Client
using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace WorkerService1
{
public class RabbitMQHelper : IDisposable
{
private readonly ConnectionFactory _factory;
private IConnection _connection;
private IModel _channel;
public RabbitMQHelper()
{
// 設置連接參數
_factory = new ConnectionFactory() { HostName = "localhost", Port = 5672, UserName = "guest", Password = "guest" };
}
/// <summary>
/// 發(fā)送消息
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="queueName"></param>
/// <param name="message"></param>
public void SendMessage<T>(string queueName, T message)
{
try
{
InitConnection();
// 聲明隊列
_channel.QueueDeclare(queue: queueName,
durable: true,// 設置為true表示隊列是持久化的
exclusive: false,
autoDelete: false,
arguments: null);
var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
_channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
}
catch (Exception ex)
{
Console.WriteLine("Failed to send message: {0}", ex.Message);
}
}
/// <summary>
/// 接收消息
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="queueName"></param>
/// <param name="messageHandler"></param>
public void ReceiveMessage<T>(string queueName, Action<T> messageHandler)
{
try
{
InitConnection();
// 聲明隊列(接收需聲明隊列,否則隊列不存在時,無法接收消息)
_channel.QueueDeclare(queue: queueName,
durable: true, // 設置為true表示隊列是持久化的
exclusive: false,
autoDelete: false,
arguments: null);
//設置消費者數量(并發(fā)度),每個消費者每次只能處理一條消息
_channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
// 創(chuàng)建消費者
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
try
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
var convertedMessage = JsonConvert.DeserializeObject<T>(message);
//委托方法
messageHandler.Invoke(convertedMessage);
// 消息處理成功,確認消息
_channel.BasicAck(ea.DeliveryTag, false);
}
catch (Exception ex)
{
// 消息處理異常,確認消息
_channel.BasicAck(ea.DeliveryTag, false);
}
};
_channel.BasicConsume(queue: queueName,
autoAck: false,// 設置為true表示自動確認消息
consumer: consumer);
}
catch (Exception ex)
{
Console.WriteLine("Failed to receive message: {0}", ex.Message);
}
}
/// <summary>
/// 初始化鏈接
/// </summary>
private void InitConnection()
{
if (_connection == null || !_connection.IsOpen)
{
_connection = _factory.CreateConnection();
_channel = _connection.CreateModel();
}
}
/// <summary>
/// 釋放資源
/// </summary>
public void Dispose()
{
_channel?.Close();
_channel?.Dispose();
_connection?.Close();
_connection?.Dispose();
}
}
}
使用示例
using System;
using System.Text;
using System.Threading.Tasks;
using WorkerService1;
public class Program
{
private static string QueueName = "myqueue_key";
public static void Main()
{
var rabbitMQHelper = new RabbitMQHelper();
for (long i = 0; i < 30; i++)
{
rabbitMQHelper.SendMessage(QueueName, i);
}
rabbitMQHelper.ReceiveMessage<long>(QueueName, ReceivedHandle);
Console.ReadLine();
}
/// <summary>
/// 接收處理
/// </summary>
/// <param name="index"></param>
private static void ReceivedHandle(long index)
{
try
{
Console.WriteLine($"第{index}次開始{DateTime.Now}");
Thread.Sleep(2000);
Console.WriteLine($"第{index}次結束{DateTime.Now}");
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}到此這篇關于C#使用RabbitMQ發(fā)送和接收消息工具類的實現的文章就介紹到這了,更多相關C# RabbitMQ發(fā)送和接收內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

