C#實現(xiàn)事件總線的方法示例
EventBus(事件總線)是一種用于在應(yīng)用程序內(nèi)部或跨應(yīng)用程序組件之間進行事件通信的機制。
它允許不同的組件通過發(fā)布和訂閱事件來進行解耦和通信。在給定的代碼片段中,我們可以看到一個使用C#實現(xiàn)的Event Bus。它定義了一些接口和類來實現(xiàn)事件的發(fā)布和訂閱。
首先,我們有兩個基本的約束接口:IEvent
和IAsyncEventHandler<TEvent>
。
IEvent是一個空接口,用于約束事件的類型。IAsyncEventHandler<TEvent>
是一個泛型接口,用于約束事件處理程序的類型。它定義了處理事件的異步方法HandleAsync和處理異常的方法HandleException。接下來,我們有一個IEventBus接口,它定義了一些操作方法用于發(fā)布和訂閱事件。
其中,Publish<TEvent>
和PublishAsync<TEvent>
方法用于發(fā)布事件,而OnSubscribe<TEvent>
方法用于訂閱事件。然后,我們看到一個實現(xiàn)了本地事件總線的類LocalEventBusManager<TEvent>
。它實現(xiàn)了ILocalEventBusManager<TEvent>
接口,用于在單一管道內(nèi)處理本地事件。它使用了一個Channel<TEvent>
來存儲事件,并提供了發(fā)布事件的方法Publish
和PublishAsync
。此外,它還提供了一個自動處理事件的方法AutoHandle
。
總的來說Event Bus
提供了一種方便的方式來實現(xiàn)組件之間的松耦合通信。
通過發(fā)布和訂閱事件,組件可以獨立地進行操作,而不需要直接依賴于彼此的實現(xiàn)細節(jié)。
這種機制可以提高代碼的可維護性和可擴展性。
Github倉庫地址:https://github.com/DonPangPang/soda-event-bus
實現(xiàn)一些基本約束
先實現(xiàn)一些約束,實現(xiàn)IEvent
約束事件,實現(xiàn)IAsyncEvnetHandler<TEvent> where TEvent:IEvent
來約束事件的處理程序。
public interface IEvent { } public interface IAsyncEventHandler<in TEvent> where TEvent : IEvent { Task HandleAsync(IEvent @event); void HandleException(IEvent @event, Exception ex); }
接下來規(guī)定一下咱們的IEventBus
,會有哪些操作方法。基本就是發(fā)布和訂閱。
public interface IEventBus { void Publish<TEvent>(TEvent @event) where TEvent : IEvent; Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent; void OnSubscribe<TEvent>() where TEvent : IEvent; }
實現(xiàn)一個本地事件總線
本地事件處理
本地事件的處理我打算采用兩種方式實現(xiàn),一種是LocalEventBusManager
即本地事件管理,第二種是LocalEventBusPool
池化本地事件。
LocalEvnetBusManager
LocalEventBusManager
主要在單一管道內(nèi)進行處理,集中進行消費。
public interface ILocalEventBusManager<in TEvent>where TEvent : IEvent { void Publish(TEvent @event); Task PublishAsync(TEvent @event) ; void AutoHandle(); } public class LocalEventBusManager<TEvent>(IServiceProvider serviceProvider):ILocalEventBusManager<TEvent> where TEvent: IEvent { readonly IServiceProvider _servicesProvider = serviceProvider; private readonly Channel<TEvent> _eventChannel = Channel.CreateUnbounded<TEvent>(); public void Publish(TEvent @event) { Debug.Assert(_eventChannel != null, nameof(_eventChannel) + " != null"); _eventChannel.Writer.WriteAsync(@event); } private CancellationTokenSource Cts { get; } = new(); public void Cancel() { Cts.Cancel(); } public async Task PublishAsync(TEvent @event) { await _eventChannel.Writer.WriteAsync(@event); } public void AutoHandle() { // 確保只啟動一次 if (!Cts.IsCancellationRequested) return; Task.Run(async () => { while (!Cts.IsCancellationRequested) { var reader = await _eventChannel.Reader.ReadAsync(); await HandleAsync(reader); } }, Cts.Token); } async Task HandleAsync(TEvent @event) { var handler = _servicesProvider.GetService<IAsyncEventHandler<TEvent>>(); if (handler is null) { throw new NullReferenceException($"No handler for event {@event.GetType().Name}"); } try { await handler.HandleAsync(@event); } catch (Exception ex) { handler.HandleException( @event, ex); } } }
LocalEventBusPool
LocalEventBusPool
即所有的Event都會有一個單獨的管道處理,單獨消費處理,并行能力更好一些。
public sealed class LocalEventBusPool(IServiceProvider serviceProvider) { private readonly IServiceProvider _serviceProvider = serviceProvider; private class ChannelKey { public required string Key { get; init; } public int Subscribers { get; set; } public override bool Equals(object? obj) { if (obj is ChannelKey key) { return string.Equals(key.Key, Key, StringComparison.OrdinalIgnoreCase); } return false; } public override int GetHashCode() { return 0; } } private Channel<IEvent> Rent(string channel) { _channels.TryGetValue(new ChannelKey() { Key = channel }, out var value); if (value != null) return value; value = Channel.CreateUnbounded<IEvent>(); _channels.TryAdd(new ChannelKey() { Key = channel }, value); return value; } private Channel<IEvent> Rent(ChannelKey channelKey) { _channels.TryGetValue(channelKey, out var value); if (value != null) return value; value = Channel.CreateUnbounded<IEvent>(); _channels.TryAdd(channelKey, value); return value; } private readonly ConcurrentDictionary<ChannelKey, Channel<IEvent>> _channels = new(); private CancellationTokenSource Cts { get; } = new(); public void Cancel() { Cts.Cancel(); _channels.Clear(); Cts.TryReset(); } public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent { await Rent(typeof(TEvent).Name).Writer.WriteAsync(@event); } public void Publish<TEvent>(TEvent @event) where TEvent : IEvent { Rent(typeof(TEvent).Name).Writer.TryWrite(@event); } public void OnSubscribe<TEvent>() where TEvent : IEvent { var channelKey = _channels.FirstOrDefault(x => x.Key.Key == typeof(TEvent).Name).Key ?? new ChannelKey() { Key = typeof(TEvent).Name }; channelKey.Subscribers++; Task.Run(async () => { try { while (!Cts.IsCancellationRequested) { var @event = await ReadAsync(channelKey); var handler = _serviceProvider.GetService<IAsyncEventHandler<TEvent>>(); if (handler == null) throw new NullReferenceException($"No handler for Event {typeof(TEvent).Name}"); try { await handler.HandleAsync((TEvent)@event); } catch (Exception ex) { handler.HandleException((TEvent)@event, ex); } } } catch (Exception e) { throw new InvalidOperationException("Error on onSubscribe handler", e); } }, Cts.Token); } private async Task<IEvent> ReadAsync(string channel) { return await Rent(channel).Reader.ReadAsync(Cts.Token); } private async Task<IEvent> ReadAsync(ChannelKey channel) { return await Rent(channel).Reader.ReadAsync(Cts.Token); } }
LocalEventBus
實現(xiàn)LocalEventBus
繼承自IEventBus
即可,如果有需要擴展的方法自行添加,池化和管理器的情況單獨處理。
public interface ILocalEventBus: IEventBus { } public class LocalEventBus(IServiceProvider serviceProvider, LocalEventBusOptions options) : ILocalEventBus { private LocalEventBusPool? EventBusPool => serviceProvider.GetService<LocalEventBusPool>(); public void Publish<TEvent>(TEvent @event) where TEvent : IEvent { if (options.Pool) { Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null"); EventBusPool.Publish(@event); } else { var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>(); if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it."); manager.Publish(@event); } } public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent { if (options.Pool) { Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null"); await EventBusPool.PublishAsync(@event); } else { var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>(); if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it."); await manager.PublishAsync(@event); } } public void OnSubscribe<TEvent>() where TEvent : IEvent { if (options.Pool) { Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null"); EventBusPool.OnSubscribe<TEvent>(); } else { var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>(); if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it."); manager.AutoHandle(); } } }
分布式事件總線
根據(jù)需要擴展即可,基本邏輯相同,但可能需要增加確認機制等。
到此這篇關(guān)于C#實現(xiàn)事件總線的方法示例的文章就介紹到這了,更多相關(guān)C# 事件總線內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Unity通用泛型單例設(shè)計模式(普通型和繼承自MonoBehaviour)
這篇文章主要介紹了Unity通用泛型單例設(shè)計模式,分為普通型和繼承MonoBehaviour,幫助大家更好的理解和學(xué)習(xí),感興趣的朋友可以了解下2020-07-07C#將布爾類型轉(zhuǎn)換成字節(jié)數(shù)組的方法
這篇文章主要介紹了C#將布爾類型轉(zhuǎn)換成字節(jié)數(shù)組的方法,涉及C#中字符串函數(shù)的使用技巧,非常具有實用價值,需要的朋友可以參考下2015-04-04C#實現(xiàn)將Doc文檔轉(zhuǎn)換成rtf格式的方法示例
這篇文章主要介紹了C#實現(xiàn)將Doc文檔轉(zhuǎn)換成rtf格式的方法,結(jié)合實例形式分析了C#針對word文件的讀取及文檔格式轉(zhuǎn)換相關(guān)操作技巧,需要的朋友可以參考下2017-07-07