C#實(shí)現(xiàn)事件總線的方法示例
EventBus(事件總線)是一種用于在應(yīng)用程序內(nèi)部或跨應(yīng)用程序組件之間進(jìn)行事件通信的機(jī)制。
它允許不同的組件通過(guò)發(fā)布和訂閱事件來(lái)進(jìn)行解耦和通信。在給定的代碼片段中,我們可以看到一個(gè)使用C#實(shí)現(xiàn)的Event Bus。它定義了一些接口和類(lèi)來(lái)實(shí)現(xiàn)事件的發(fā)布和訂閱。
首先,我們有兩個(gè)基本的約束接口:IEvent和IAsyncEventHandler<TEvent>。
IEvent是一個(gè)空接口,用于約束事件的類(lèi)型。IAsyncEventHandler<TEvent>是一個(gè)泛型接口,用于約束事件處理程序的類(lèi)型。它定義了處理事件的異步方法HandleAsync和處理異常的方法HandleException。接下來(lái),我們有一個(gè)IEventBus接口,它定義了一些操作方法用于發(fā)布和訂閱事件。
其中,Publish<TEvent>和PublishAsync<TEvent>方法用于發(fā)布事件,而OnSubscribe<TEvent>方法用于訂閱事件。然后,我們看到一個(gè)實(shí)現(xiàn)了本地事件總線的類(lèi)LocalEventBusManager<TEvent>。它實(shí)現(xiàn)了ILocalEventBusManager<TEvent>接口,用于在單一管道內(nèi)處理本地事件。它使用了一個(gè)Channel<TEvent>來(lái)存儲(chǔ)事件,并提供了發(fā)布事件的方法Publish和PublishAsync。此外,它還提供了一個(gè)自動(dòng)處理事件的方法AutoHandle。
總的來(lái)說(shuō)Event Bus提供了一種方便的方式來(lái)實(shí)現(xiàn)組件之間的松耦合通信。
通過(guò)發(fā)布和訂閱事件,組件可以獨(dú)立地進(jìn)行操作,而不需要直接依賴(lài)于彼此的實(shí)現(xiàn)細(xì)節(jié)。
這種機(jī)制可以提高代碼的可維護(hù)性和可擴(kuò)展性。
Github倉(cāng)庫(kù)地址:https://github.com/DonPangPang/soda-event-bus
實(shí)現(xiàn)一些基本約束
先實(shí)現(xiàn)一些約束,實(shí)現(xiàn)IEvent約束事件,實(shí)現(xiàn)IAsyncEvnetHandler<TEvent> where TEvent:IEvent來(lái)約束事件的處理程序。
public interface IEvent
{
}
public interface IAsyncEventHandler<in TEvent> where TEvent : IEvent
{
Task HandleAsync(IEvent @event);
void HandleException(IEvent @event, Exception ex);
}
接下來(lái)規(guī)定一下咱們的IEventBus,會(huì)有哪些操作方法?;揪褪前l(fā)布和訂閱。
public interface IEventBus
{
void Publish<TEvent>(TEvent @event) where TEvent : IEvent;
Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent;
void OnSubscribe<TEvent>() where TEvent : IEvent;
}
實(shí)現(xiàn)一個(gè)本地事件總線
本地事件處理
本地事件的處理我打算采用兩種方式實(shí)現(xiàn),一種是LocalEventBusManager即本地事件管理,第二種是LocalEventBusPool池化本地事件。
LocalEvnetBusManager
LocalEventBusManager主要在單一管道內(nèi)進(jìn)行處理,集中進(jìn)行消費(fèi)。
public interface ILocalEventBusManager<in TEvent>where TEvent : IEvent
{
void Publish(TEvent @event);
Task PublishAsync(TEvent @event) ;
void AutoHandle();
}
public class LocalEventBusManager<TEvent>(IServiceProvider serviceProvider):ILocalEventBusManager<TEvent>
where TEvent: IEvent
{
readonly IServiceProvider _servicesProvider = serviceProvider;
private readonly Channel<TEvent> _eventChannel = Channel.CreateUnbounded<TEvent>();
public void Publish(TEvent @event)
{
Debug.Assert(_eventChannel != null, nameof(_eventChannel) + " != null");
_eventChannel.Writer.WriteAsync(@event);
}
private CancellationTokenSource Cts { get; } = new();
public void Cancel()
{
Cts.Cancel();
}
public async Task PublishAsync(TEvent @event)
{
await _eventChannel.Writer.WriteAsync(@event);
}
public void AutoHandle()
{
// 確保只啟動(dòng)一次
if (!Cts.IsCancellationRequested) return;
Task.Run(async () =>
{
while (!Cts.IsCancellationRequested)
{
var reader = await _eventChannel.Reader.ReadAsync();
await HandleAsync(reader);
}
}, Cts.Token);
}
async Task HandleAsync(TEvent @event)
{
var handler = _servicesProvider.GetService<IAsyncEventHandler<TEvent>>();
if (handler is null)
{
throw new NullReferenceException($"No handler for event {@event.GetType().Name}");
}
try
{
await handler.HandleAsync(@event);
}
catch (Exception ex)
{
handler.HandleException( @event, ex);
}
}
}
LocalEventBusPool
LocalEventBusPool即所有的Event都會(huì)有一個(gè)單獨(dú)的管道處理,單獨(dú)消費(fèi)處理,并行能力更好一些。
public sealed class LocalEventBusPool(IServiceProvider serviceProvider)
{
private readonly IServiceProvider _serviceProvider = serviceProvider;
private class ChannelKey
{
public required string Key { get; init; }
public int Subscribers { get; set; }
public override bool Equals(object? obj)
{
if (obj is ChannelKey key)
{
return string.Equals(key.Key, Key, StringComparison.OrdinalIgnoreCase);
}
return false;
}
public override int GetHashCode()
{
return 0;
}
}
private Channel<IEvent> Rent(string channel)
{
_channels.TryGetValue(new ChannelKey() { Key = channel }, out var value);
if (value != null) return value;
value = Channel.CreateUnbounded<IEvent>();
_channels.TryAdd(new ChannelKey() { Key = channel }, value);
return value;
}
private Channel<IEvent> Rent(ChannelKey channelKey)
{
_channels.TryGetValue(channelKey, out var value);
if (value != null) return value;
value = Channel.CreateUnbounded<IEvent>();
_channels.TryAdd(channelKey, value);
return value;
}
private readonly ConcurrentDictionary<ChannelKey, Channel<IEvent>> _channels = new();
private CancellationTokenSource Cts { get; } = new();
public void Cancel()
{
Cts.Cancel();
_channels.Clear();
Cts.TryReset();
}
public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent
{
await Rent(typeof(TEvent).Name).Writer.WriteAsync(@event);
}
public void Publish<TEvent>(TEvent @event) where TEvent : IEvent
{
Rent(typeof(TEvent).Name).Writer.TryWrite(@event);
}
public void OnSubscribe<TEvent>() where TEvent : IEvent
{
var channelKey = _channels.FirstOrDefault(x => x.Key.Key == typeof(TEvent).Name).Key ??
new ChannelKey() { Key = typeof(TEvent).Name };
channelKey.Subscribers++;
Task.Run(async () =>
{
try
{
while (!Cts.IsCancellationRequested)
{
var @event = await ReadAsync(channelKey);
var handler = _serviceProvider.GetService<IAsyncEventHandler<TEvent>>();
if (handler == null) throw new NullReferenceException($"No handler for Event {typeof(TEvent).Name}");
try
{
await handler.HandleAsync((TEvent)@event);
}
catch (Exception ex)
{
handler.HandleException((TEvent)@event, ex);
}
}
}
catch (Exception e)
{
throw new InvalidOperationException("Error on onSubscribe handler", e);
}
}, Cts.Token);
}
private async Task<IEvent> ReadAsync(string channel)
{
return await Rent(channel).Reader.ReadAsync(Cts.Token);
}
private async Task<IEvent> ReadAsync(ChannelKey channel)
{
return await Rent(channel).Reader.ReadAsync(Cts.Token);
}
}
LocalEventBus
實(shí)現(xiàn)LocalEventBus繼承自IEventBus即可,如果有需要擴(kuò)展的方法自行添加,池化和管理器的情況單獨(dú)處理。
public interface ILocalEventBus: IEventBus
{
}
public class LocalEventBus(IServiceProvider serviceProvider, LocalEventBusOptions options) : ILocalEventBus
{
private LocalEventBusPool? EventBusPool => serviceProvider.GetService<LocalEventBusPool>();
public void Publish<TEvent>(TEvent @event) where TEvent : IEvent
{
if (options.Pool)
{
Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");
EventBusPool.Publish(@event);
}
else
{
var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();
if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");
manager.Publish(@event);
}
}
public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent
{
if (options.Pool)
{
Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");
await EventBusPool.PublishAsync(@event);
}
else
{
var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();
if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");
await manager.PublishAsync(@event);
}
}
public void OnSubscribe<TEvent>() where TEvent : IEvent
{
if (options.Pool)
{
Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");
EventBusPool.OnSubscribe<TEvent>();
}
else
{
var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();
if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");
manager.AutoHandle();
}
}
}
分布式事件總線
根據(jù)需要擴(kuò)展即可,基本邏輯相同,但可能需要增加確認(rèn)機(jī)制等。
到此這篇關(guān)于C#實(shí)現(xiàn)事件總線的方法示例的文章就介紹到這了,更多相關(guān)C# 事件總線內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Unity通用泛型單例設(shè)計(jì)模式(普通型和繼承自MonoBehaviour)
這篇文章主要介紹了Unity通用泛型單例設(shè)計(jì)模式,分為普通型和繼承MonoBehaviour,幫助大家更好的理解和學(xué)習(xí),感興趣的朋友可以了解下2020-07-07
C#將布爾類(lèi)型轉(zhuǎn)換成字節(jié)數(shù)組的方法
這篇文章主要介紹了C#將布爾類(lèi)型轉(zhuǎn)換成字節(jié)數(shù)組的方法,涉及C#中字符串函數(shù)的使用技巧,非常具有實(shí)用價(jià)值,需要的朋友可以參考下2015-04-04
C#判斷數(shù)據(jù)類(lèi)型的簡(jiǎn)單示例代碼
本篇文章要是對(duì)C#中判斷數(shù)據(jù)類(lèi)型的簡(jiǎn)單示例代碼進(jìn)行了介紹,需要的朋友可以過(guò)來(lái)參考下,希望對(duì)大家有所幫助2014-01-01
C#實(shí)現(xiàn)將Doc文檔轉(zhuǎn)換成rtf格式的方法示例
這篇文章主要介紹了C#實(shí)現(xiàn)將Doc文檔轉(zhuǎn)換成rtf格式的方法,結(jié)合實(shí)例形式分析了C#針對(duì)word文件的讀取及文檔格式轉(zhuǎn)換相關(guān)操作技巧,需要的朋友可以參考下2017-07-07
C#引用類(lèi)型和值類(lèi)型的介紹與實(shí)例
這篇文章主要介紹了C#引用類(lèi)型和值類(lèi)型,有需要的朋友可以參考一下2013-12-12

