c#基于Redis實現(xiàn)輕量級消息組件的步驟
最近在開發(fā)一個輕量級ASP.NET MVC開發(fā)框架,需要加入日志記錄,郵件發(fā)送,短信發(fā)送等功能,為了保持模塊的獨立性,所以需要通過消息通信的方式進行處理,為了保持框架在部署,使用,二次開發(fā)過程中的簡易便捷性,所以沒有選擇傳統(tǒng)的MQ,而是基于Redis的訂閱發(fā)布實現(xiàn)一個系統(tǒng)內部消息組件,話不多說,上碼!
數(shù)據(jù)結構定義
消息實體包含幾個部分,訂閱通道名稱,信息頭,信息體,信息差異化額外信息字典,信息頭主要包含消息標識,消息日期,信息體包含信息內容,信息實體類型等
public class Message { public string MessageChannel { set; get; } public MessageHead @MessageHead { set; get; } public MessageBody @MessageBody { set; get; } [JsonExtensionData] public Dictionary<string,Object> @MessageExtra { set; get; } public Message() { } public void AddExtra(string Name, string Value) { if (@MessageExtra == null) { @MessageExtra = new Dictionary<string, object>(); } @MessageExtra.Add(Name, Value); } public Object GetExtra(string Name) { return @MessageExtra[Name]; } } public class MessageHead { public string MessageID { set; get; } public DateTime MessageDate { set; get; } public MessageHead() { MessageID = CommonUtil.CreateCommonGuid(); MessageDate = DateTime.Now; } } public class MessageBody { public string MessageJsonContent { set; get; } public Type MessageMapperType { set; get; } }
注:因為消息訂閱發(fā)布傳遞過程中,我是通過Json序列化傳輸?shù)?,使用過程中可能需要一些額外的鍵值對信息,這里在對象中定義的是Dictinary對象,但是Dictinary本身是不支持序列化的,所以需要加上注解JsonExtensionData
訂閱通道聲明
我們需要達到的效果是,在系統(tǒng)啟動時,所有消息通道可以根據(jù)系統(tǒng)中的應用自動訂閱,這里就需要一個注解來標識我們的訂閱通道接收消息的實現(xiàn)類
[AttributeUsage(AttributeTargets.Class)] public class MessageChanelAttribute : Attribute { private string _ChannleName; public string ChannelName { get { return this._ChannleName; } set { this._ChannleName = value; } } }
消息的個性化策略處理
Redis的三方庫我這里使用的是StackExchange.Redis.dll,在消息訂閱時,需要為Channel指定接收到消息時的處理委托,我們在自動訂閱的過程中肯定也要收集好各類消息處理類并與Channel一一對應,這時候我們就需要一個基類FastDefaultMessageHandler,我們的具體的消息處理類繼承自FastDefaultMessageHandler,重寫處理方法即可
[Component] [MessageChanelAttribute(ChannelName = "DefaultMessage")] public class FastDefaultMessageHandler : IFastMessageHandle { [AutoWired] public DBUtil @DBUtil; public void HandleMessage(RedisChannel ChannelName, RedisValue Message) { FastExecutor.Message.Design.Message Entity = JsonConvert.DeserializeObject<FastExecutor.Message.Design.Message>(Message); try { if (!CheckMessageIsConsume(Entity)) { this.CustomHandle(Entity); } } catch (Exception e) { StringBuilder ExceptionLog = new StringBuilder(); ExceptionLog.AppendFormat("異常Message所屬Channel:{0}", Entity.MessageChannel + Environment.NewLine); ExceptionLog.AppendFormat("異常Message插入時間:{0}", Entity.MessageHead.MessageDate.ToString() + Environment.NewLine); ExceptionLog.AppendFormat("異常Message內容:{0}", Message + Environment.NewLine); ExceptionLog.AppendFormat("異常信息:{0}", e.Message + Environment.NewLine); LogUtil.WriteLog("Logs/MessageErrorLog", "log_", ExceptionLog.ToString() + Environment.NewLine); ExceptionLog.AppendFormat("========================================================================================================================================================================" + Environment.NewLine); MessageACK.MoveMessageToExceptionChannel(Entity.MessageChannel, Entity); } finally { MessageACK.ConfirmMessageFinish(Entity.MessageChannel, Entity.MessageHead.MessageID); } } public virtual void CustomHandle(FastExecutor.Message.Design.Message @Message) { } public virtual bool CheckMessageIsConsume(FastExecutor.Message.Design.Message @Message) { return false; } }
其中的HandleMessage方法就是我們在訂閱Channel時對應的委托,會調用類中的CustomHandle的虛方法,子類繼承重寫該方法就會基于多態(tài)進行策略調用,CheckMessageIsConsume方法是用于確認消息是否重復消費的,也可以被重寫,下面看一個訪問日志類的實例,使用MessageChanelAttribute標注聲明該實現(xiàn)類需要訂閱發(fā)布的Channel名稱為Visit,CustomHandle方法中實現(xiàn)了插入數(shù)據(jù)庫操作,CheckMessageIsConsume方法判斷該條日志數(shù)據(jù)是否已消費(已經存在于數(shù)據(jù)庫)
[MessageChanelAttribute(ChannelName = "Visit")] public class VisitLog : FastDefaultMessageHandler { public override void CustomHandle(Message.Design.Message Message) { Frame_VisitLog LogEntity = JsonConvert.DeserializeObject<Frame_VisitLog>(Message.MessageBody.MessageJsonContent); @DBUtil.Insert(LogEntity); base.CustomHandle(Message); } public override bool CheckMessageIsConsume(Message.Design.Message Message) { Frame_VisitLog LogEntity = JsonConvert.DeserializeObject<Frame_VisitLog>(Message.MessageBody.MessageJsonContent); DBRow Row = new DBRow("Frame_VisitLog", "RowGuid", LogEntity.RowGuid); if (Row.IsExist()) { return true; } else { return false; } } }
消息自動訂閱
我們希望系統(tǒng)在啟動時就尋找出定義好Channel和實現(xiàn)類,自動實現(xiàn)訂閱,這里就需要用到IOC容器,啟動系統(tǒng)時將所有的消息處理類放入容器中,在自動訂閱時全部取出來,根據(jù)消息處理類中聲明的Channel名稱進行自動訂閱
public void Init() { List<Type> HandlerTypeList = InjectUtil.Container.GetRegistType(typeof(IFastMessageHandle)); foreach (Type HandlerType in HandlerTypeList) { MessageChanelAttribute Channel = Attribute.GetCustomAttribute(HandlerType, typeof(MessageChanelAttribute)) as MessageChanelAttribute; RedisUtil.Subscribe(Channel.ChannelName, ((FastDefaultMessageHandler)InjectUtil.Container.Resolve(HandlerType)).HandleMessage); } }
注:
1.這里的IOC容器是我自己實現(xiàn)的,地址:https://gitee.com/grassprogramming/FastIOC,大家可以用AutoFac代替
2.RedisUtil是對StackExchange.Redis.dll封裝的處理類,地址:https://gitee.com/grassprogramming/FastUtil
消息發(fā)送
消息只需要調用Redis的發(fā)布方法即可,將Channel名稱與定義好的數(shù)據(jù)實體類傳入,序列化為Json
public void SendMessage<T>(string ChannleName, T CustomMessageEntity, Dictionary<string, string> ExtraData = null) { FastExecutor.Message.Design.Message MessageEntity = new Design.Message(); MessageEntity.MessageChannel = ChannleName; MessageHead Head = new MessageHead(); MessageBody Body = new MessageBody(); Body.MessageMapperType = typeof(T); Body.MessageJsonContent = JsonConvert.SerializeObject(CustomMessageEntity); MessageEntity.MessageHead = Head; MessageEntity.MessageBody = Body; if (ExtraData != null) { foreach (var item in ExtraData) { MessageEntity.AddExtra(item.Key, item.Value); } } RedisUtil.Publish(ChannleName, MessageEntity); MessageACK.CopyMessageToACKList(ChannleName, MessageEntity); }
消息確認與存儲
Redis作訂閱發(fā)布模式作為消息組件的問題有兩方面
問題:消息消費完沒有確認機制
解決方案
基于Redis的Hash存儲方式建立一個消息存儲字段,在發(fā)送消息時拷貝到消息Hash字典中,消費完畢后再刪除,對應SendMessage中的MessageACK.CopyMessageToACKList方法和FastDefaultMessageHandler中的MessageACK.ConfirmMessageFinish方法,本質就是對Hash字典的增加與刪除功能
問題:消息處理端掛了再次重啟消息會丟失
解決方案
確認機制已經保證了消息即使沒有被消費完但是處理端宕機消息也不會丟失,需要注意的是,消息沒有丟失僅僅是Hash字典中有存儲,但是消息通道中不存在了,所以我們在系統(tǒng)每次啟動時掃描這個Hash字典,重新發(fā)布消息到Channel,這樣可能導致重復消費,所以需要靠FastDefaultMessageHandler中的CheckMessageIsConsume方法判斷,同時消息處理者本身處理異常我們也需要記錄下來,比如發(fā)短信供應商接口有問題,消息處理異常會進入Redis的ChannelException通道,我們可以根據(jù)需求實現(xiàn)一個可視化界面決定是否通過手動恢復
最后
Message組件相關代碼地址:https://gitee.com/grassprogramming/FastExecutor/tree/master/code/FastExecutor/FastExecutor.Message
存在不足問題:如果消息是單純記錄日志問題,沒辦法確認消息是否消費了
如果大家有什么好的建議,可留言一起交流學習,共同進步
以上就是c#基于Redis實現(xiàn)輕量級消息組件的步驟的詳細內容,更多關于c#基于Redis實現(xiàn)消息組件的資料請關注腳本之家其它相關文章!
相關文章
C#中Predicate<T>與Func<T, bool>泛型委托的用法實例
這篇文章主要介紹了C#中Predicate<T>與Func<T, bool>泛型委托的用法,指出了其用法中的誤區(qū)及易錯點,有助于更好的理解泛型委托的用法,需要的朋友可以參考下2014-09-09VS2019下安裝和破解?DevExpress?19.2?插件的詳細教程
這篇文章主要介紹了VS2019?安裝并破解?DevExpress?19.2?插件的詳細教程,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-03-03winform dateTime數(shù)據(jù)類型轉換方法
這篇文章主要介紹了winform dateTime數(shù)據(jù)類型轉換方法,需要的朋友可以參考下2017-02-02C#動態(tài)對象(dynamic)詳解(實現(xiàn)方法和屬性的動態(tài))
下面小編就為大家?guī)硪黄狢#動態(tài)對象(dynamic)詳解(實現(xiàn)方法和屬性的動態(tài))。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-02-02