使用 .NET 实现 EventBus(事件总线)设计模式
EventBus(事件总线)是一种设计模式,它允许应用程序的不同部分之间进行松散耦合的通信。发布者(Publisher)将事件发布到总线,而订阅者(Subscriber)监听总线上的特定事件类型,并在事件发生时执行相应的操作。发布者和订阅者之间不需要直接引用。核心概念:
[*]事件 (Event): 表示系统中发生的事情,通常是一个简单的 POCO (Plain Old CLR Object) 类。
[*]发布者 (Publisher): 创建并发送事件到 EventBus 的组件。
[*]订阅者 (Subscriber): 注册对特定类型事件感兴趣,并在事件发布时接收通知的组件。
[*]事件总线 (EventBus): 负责管理订阅和将事件路由到相应订阅者的中心枢纽。
设计思路:
[*]接口 (IEventBus): 定义 EventBus 的核心功能:订阅、取消订阅、发布。
[*]实现 (EventBus):
[*]使用一个数据结构(如 Dictionary)来存储事件类型与对应的订阅者列表。
[*]键 (Key) 是事件的 Type。
[*]值 (Value) 是一个委托列表(如 List> 或 List,后者需要进行类型转换)。
[*]需要考虑线程安全,因为订阅、取消订阅和发布可能在不同线程上发生。使用 lock 或 ConcurrentDictionary 等机制。
[*]泛型方法: 使用泛型使订阅和发布类型安全。
1. 定义事件 (Event)
// 事件可以是任何类,通常包含事件相关的数据
public class OrderCreatedEvent
{
public int OrderId { get; }
public DateTime Timestamp { get; }
public OrderCreatedEvent(int orderId)
{
OrderId = orderId;
Timestamp = DateTime.UtcNow;
}
public override string ToString()
{
return $"订单已创建: ID={OrderId}, 时间={Timestamp}";
}
}
public class UserLoggedInEvent
{
public string Username { get; }
public DateTime LoginTime { get; }
public UserLoggedInEvent(string username)
{
Username = username;
LoginTime = DateTime.UtcNow;
}
public override string ToString()
{
return $"用户已登录: 用户名={Username}, 时间={LoginTime}";
}
}2. 定义 EventBus 接口 (IEventBus)
using System;
public interface IEventBus
{
/// <summary>
/// 订阅指定类型的事件
/// </summary>
/// <typeparam name="TEvent">事件类型</typeparam>
/// <param name="handler">事件处理委托</param>
void Subscribe<TEvent>(Action<TEvent> handler) where TEvent : class;
/// <summary>
/// 取消订阅指定类型的事件
/// </summary>
/// <typeparam name="TEvent">事件类型</typeparam>
/// <param name="handler">之前订阅时使用的同一个事件处理委托实例</param>
void Unsubscribe<TEvent>(Action<TEvent> handler) where TEvent : class;
/// <summary>
/// 发布一个事件,所有订阅了该事件类型的处理程序都将被调用
/// </summary>
/// <typeparam name="TEvent">事件类型</typeparam>
/// <param name="event">要发布的事件实例</param>
void Publish<TEvent>(TEvent @event) where TEvent : class;
}3. 实现 EventBus (EventBus)
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading; // 用于演示或简单场景,实际可能用 Task
public class EventBus : IEventBus
{
// 存储订阅信息的字典
// Key: 事件类型 (Type)
// Value: 处理该事件类型的委托列表 (List of Action<TEvent>),这里用 List<object> 存储以便处理不同泛型类型
private readonly Dictionary<Type, List<object>> _subscribers;
// 用于确保线程安全的锁对象
private readonly object _lock = new object();
public EventBus()
{
_subscribers = new Dictionary<Type, List<object>>();
}
public void Subscribe<TEvent>(Action<TEvent> handler) where TEvent : class
{
// 获取事件类型
Type eventType = typeof(TEvent);
lock (_lock)
{
// 如果该事件类型还没有订阅者,则创建新列表
if (!_subscribers.ContainsKey(eventType))
{
_subscribers = new List<object>();
}
// 添加处理委托到列表中 (确保不重复添加同一个委托实例)
if (!_subscribers.Contains(handler))
{
_subscribers.Add(handler);
}
}
}
public void Unsubscribe<TEvent>(Action<TEvent> handler) where TEvent : class
{
Type eventType = typeof(TEvent);
lock (_lock)
{
// 检查是否存在该事件类型的订阅列表,以及列表中是否包含该处理委托
if (_subscribers.TryGetValue(eventType, out var handlers))
{
handlers.Remove(handler);
// 如果移除后列表为空,可以选择从字典中移除该事件类型(可选,优化内存)
if (handlers.Count == 0)
{
_subscribers.Remove(eventType);
}
}
}
}
public void Publish<TEvent>(TEvent @event) where TEvent : class
{
Type eventType = typeof(TEvent);
List<object> handlersToInvoke = null;
lock (_lock)
{
// 检查是否有该事件类型的订阅者
if (_subscribers.TryGetValue(eventType, out var handlers))
{
// 复制列表以避免在迭代时修改集合(如果在处理程序中取消订阅)
// 并且在锁外执行委托调用,减少锁的持有时间
handlersToInvoke = handlers.ToList();
}
}
// 在锁外部调用处理程序,避免长时间持有锁
if (handlersToInvoke != null)
{
foreach (var handlerObj in handlersToInvoke)
{
// 将 object 转换回具体的 Action<TEvent>
if (handlerObj is Action<TEvent> handler)
{
try
{
// 调用处理委托
// 实际应用中可能需要考虑:
// 1. 异步执行 (handler( @event).ConfigureAwait(false)等)
// 2. 错误处理 (try-catch 并记录日志)
handler(@event);
}
catch (Exception ex)
{
// 处理或记录订阅者处理事件时发生的异常
Console.WriteLine($"处理事件 {@event.GetType().Name} 时出错: {ex.Message}");
// 根据策略决定是否继续通知其他订阅者
}
}
}
}
}
}4. 示例:发布者和订阅者
// === 发布者 ===
public class OrderService
{
private readonly IEventBus _eventBus;
public OrderService(IEventBus eventBus)
{
_eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus));
}
public void CreateOrder(int orderId)
{
Console.WriteLine($"订单服务: 正在创建订单 {orderId}...");
// ... 执行创建订单的逻辑 ...
Console.WriteLine($"订单服务: 订单 {orderId} 创建成功.");
// 创建事件实例
var orderCreatedEvent = new OrderCreatedEvent(orderId);
// 发布事件到 EventBus
Console.WriteLine($"订单服务: 发布 {nameof(OrderCreatedEvent)} 事件.");
_eventBus.Publish(orderCreatedEvent);
}
}
public class AuthenticationService
{
private readonly IEventBus _eventBus;
public AuthenticationService(IEventBus eventBus)
{
_eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus));
}
public void Login(string username)
{
Console.WriteLine($"认证服务: 用户 {username} 尝试登录...");
// ... 认证逻辑 ...
Console.WriteLine($"认证服务: 用户 {username} 登录成功.");
var loggedInEvent = new UserLoggedInEvent(username);
Console.WriteLine($"认证服务: 发布 {nameof(UserLoggedInEvent)} 事件.");
_eventBus.Publish(loggedInEvent);
}
}
// === 订阅者 ===
public class EmailService : IDisposable // 实现 IDisposable 以便在服务生命周期结束时取消订阅
{
private readonly IEventBus _eventBus;
// 保存委托实例,以便取消订阅时使用
private readonly Action<OrderCreatedEvent> _orderCreatedHandler;
public EmailService(IEventBus eventBus)
{
_eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus));
// 将处理方法赋值给字段
_orderCreatedHandler = HandleOrderCreated;
// 订阅事件
_eventBus.Subscribe(_orderCreatedHandler); // 使用保存的委托实例
Console.WriteLine("邮件服务: 已订阅 OrderCreatedEvent.");
}
private void HandleOrderCreated(OrderCreatedEvent @event)
{
Console.WriteLine($"邮件服务: 收到订单创建事件. 准备发送邮件通知 (订单ID: {@event.OrderId})...");
// 模拟发送邮件
Thread.Sleep(50); // 模拟耗时
Console.WriteLine($"邮件服务: 订单 {@event.OrderId} 的创建通知邮件已发送.");
}
public void Dispose()
{
// 在对象销毁时取消订阅,防止内存泄漏
_eventBus.Unsubscribe(_orderCreatedHandler); // 使用保存的委托实例取消订阅
Console.WriteLine("邮件服务: 已取消订阅 OrderCreatedEvent.");
}
}
public class InventoryService
{
private readonly IEventBus _eventBus;
public InventoryService(IEventBus eventBus)
{
_eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus));
// 可以直接使用 Lambda 表达式或方法组,但取消订阅会稍微麻烦些
// 如果需要取消订阅,最好像 EmailService 那样保存委托实例
_eventBus.Subscribe<OrderCreatedEvent>(HandleOrderCreated);
Console.WriteLine("库存服务: 已订阅 OrderCreatedEvent.");
}
private void HandleOrderCreated(OrderCreatedEvent @event)
{
Console.WriteLine($"库存服务: 收到订单创建事件. 正在为订单 {@event.OrderId} 预留库存...");
// 模拟库存操作
Thread.Sleep(100); // 模拟耗时
Console.WriteLine($"库存服务: 订单 {@event.OrderId} 的库存已预留.");
}
// 注意:如果用 Lambda 或方法组直接订阅且需要取消订阅,需要保存那个委托实例
// public void StopListening() { _eventBus.Unsubscribe<OrderCreatedEvent>(HandleOrderCreated); } // 这样可能无法正确取消匿名方法
}
public class AuditService
{
private readonly IEventBus _eventBus;
public AuditService(IEventBus eventBus)
{
_eventBus = eventBus;
// 订阅多种事件
_eventBus.Subscribe<OrderCreatedEvent>(LogEvent);
_eventBus.Subscribe<UserLoggedInEvent>(LogEvent);
Console.WriteLine("审计服务: 已订阅 OrderCreatedEvent 和 UserLoggedInEvent.");
}
// 一个通用的日志记录方法,处理不同类型的事件
private void LogEvent<TEvent>(TEvent @event) where TEvent : class
{
Console.WriteLine($"审计服务: 记录到事件 - 类型: {@event.GetType().Name}, 内容: {@event.ToString()}");
}
// 可选:如果需要取消订阅,同样需要保存委托实例
}5. 运行示例
using System;
public class Program
{
public static void Main(string[] args)
{
// 1. 创建 EventBus 实例 (通常通过依赖注入容器管理,这里手动创建)
IEventBus eventBus = new EventBus();
// 2. 创建订阅者实例,并将 EventBus 注入
using (var emailService = new EmailService(eventBus)) // 使用 using 确保 Dispose 被调用以取消订阅
{
var inventoryService = new InventoryService(eventBus);
var auditService = new AuditService(eventBus);
Console.WriteLine("\n--- 初始化完成, 准备发布事件 ---\n");
// 3. 创建发布者实例
var orderService = new OrderService(eventBus);
var authService = new AuthenticationService(eventBus);
// 4. 发布者执行操作并发布事件
orderService.CreateOrder(101);
Console.WriteLine("\n----------------------------\n");
authService.Login("Alice");
Console.WriteLine("\n--- 事件处理完成 ---\n");
// 模拟 EmailService 停止工作(离开 using 块会自动调用 Dispose 取消订阅)
} // emailService.Dispose() 会在这里被调用
Console.WriteLine("\n--- EmailService 已停止并取消订阅 ---\n");
// 再次创建订单,EmailService 不应再收到通知
var orderService2 = new OrderService(eventBus); // 可以复用之前的,这里新建仅为演示
orderService2.CreateOrder(102);
Console.WriteLine("\n--- 程序结束 ---");
Console.ReadKey();
}
}输出示例:
邮件服务: 已订阅 OrderCreatedEvent.
库存服务: 已订阅 OrderCreatedEvent.
审计服务: 已订阅 OrderCreatedEvent 和 UserLoggedInEvent.
--- 初始化完成, 准备发布事件 ---
订单服务: 正在创建订单 101...
订单服务: 订单 101 创建成功.
订单服务: 发布 OrderCreatedEvent 事件.
邮件服务: 收到订单创建事件. 准备发送邮件通知 (订单ID: 101)...
库存服务: 收到订单创建事件. 正在为订单 101 预留库存...
审计服务: 记录到事件 - 类型: OrderCreatedEvent, 内容: 订单已创建: ID=101, 时间=...
邮件服务: 订单 101 的创建通知邮件已发送.
库存服务: 订单 101 的库存已预留.
----------------------------
认证服务: 用户 Alice 尝试登录...
认证服务: 用户 Alice 登录成功.
认证服务: 发布 UserLoggedInEvent 事件.
审计服务: 记录到事件 - 类型: UserLoggedInEvent, 内容: 用户已登录: 用户名=Alice, 时间=...
--- 事件处理完成 ---
邮件服务: 已取消订阅 OrderCreatedEvent.
--- EmailService 已停止并取消订阅 ---
订单服务: 正在创建订单 102...
订单服务: 订单 102 创建成功.
订单服务: 发布 OrderCreatedEvent 事件.
库存服务: 收到订单创建事件. 正在为订单 102 预留库存...
审计服务: 记录到事件 - 类型: OrderCreatedEvent, 内容: 订单已创建: ID=102, 时间=...
库存服务: 订单 102 的库存已预留.
--- 程序结束 ---关键点和考虑:
[*]线程安全: 上述实现使用了简单的 lock 来保护 _subscribers 字典。对于高并发场景,可能需要更精细的锁策略或使用 ConcurrentDictionary(但仍需注意其内部列表的线程安全)。在 Publish 方法中,先复制订阅者列表再在锁外调用委托,是为了减少锁的持有时间,提高并发性能,并防止在处理程序中修改订阅列表导致迭代错误。
[*]错误处理: 订阅者的处理程序中可能抛出异常。Publish 方法应该能够处理这些异常(例如,记录日志),并决定是否继续通知其他订阅者。
[*]异步处理: 如果订阅者的处理逻辑是 I/O 密集型或耗时的,应该考虑使用异步委托 (Func) 和异步发布机制,以避免阻塞发布线程。
[*]取消订阅: 非常重要,尤其是在订阅者的生命周期比 EventBus 短的情况下(例如 UI 组件、临时服务)。否则会导致内存泄漏(EventBus 持有对已不再需要的订阅者的引用)。使用 IDisposable 是一种常见的管理取消订阅的方式。确保 Unsubscribe 时传入的是 Subscribe 时使用的同一个委托实例。
[*]弱引用: 对于某些场景(如 UI 框架),如果订阅者可能被垃圾回收而没有显式取消订阅,可以使用 WeakReference 来持有委托,但这会增加实现的复杂性。
[*]依赖注入: 在实际应用中,IEventBus 通常注册为单例(Singleton)服务,并通过依赖注入(DI)容器注入到需要它的发布者和订阅者中。
[*]事件继承: 当前设计是基于精确的事件类型匹配。如果需要支持订阅基类事件或接口事件来接收所有派生类/实现类的事件,Publish 方法的逻辑需要相应调整(遍历事件类型的继承链或接口)。
这个简单的 EventBus 实现提供了一个基础框架,你可以根据具体项目的需求进行扩展和优化。
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页:
[1]