胆饬 发表于 2025-5-31 23:46:20

使用 .NET 实现 EventBus(事件总线)设计模式

EventBus(事件总线)是一种设计模式,它允许应用程序的不同部分之间进行松散耦合的通信。发布者(Publisher)将事件发布到总线,而订阅者(Subscriber)监听总线上的特定事件类型,并在事件发生时执行相应的操作。发布者和订阅者之间不需要直接引用。
核心概念:

[*]事件 (Event): 表示系统中发生的事情,通常是一个简单的 POCO (Plain Old CLR Object) 类。
[*]发布者 (Publisher): 创建并发送事件到 EventBus 的组件。
[*]订阅者 (Subscriber): 注册对特定类型事件感兴趣,并在事件发布时接收通知的组件。
[*]事件总线 (EventBus): 负责管理订阅和将事件路由到相应订阅者的中心枢纽。
设计思路:

[*]接口 (IEventBus): 定义 EventBus 的核心功能:订阅、取消订阅、发布。
[*]实现 (EventBus):

[*]使用一个数据结构(如 Dictionary)来存储事件类型与对应的订阅者列表。
[*]键 (Key) 是事件的 Type。
[*]值 (Value) 是一个委托列表(如 List> 或 List,后者需要进行类型转换)。
[*]需要考虑线程安全,因为订阅、取消订阅和发布可能在不同线程上发生。使用 lock 或 ConcurrentDictionary 等机制。

[*]泛型方法: 使用泛型使订阅和发布类型安全。
1. 定义事件 (Event)
// 事件可以是任何类,通常包含事件相关的数据
public class OrderCreatedEvent
{
    public int OrderId { get; }
    public DateTime Timestamp { get; }

    public OrderCreatedEvent(int orderId)
    {
      OrderId = orderId;
      Timestamp = DateTime.UtcNow;
    }

    public override string ToString()
    {
      return $"订单已创建: ID={OrderId}, 时间={Timestamp}";
    }
}

public class UserLoggedInEvent
{
    public string Username { get; }
    public DateTime LoginTime { get; }

    public UserLoggedInEvent(string username)
    {
      Username = username;
      LoginTime = DateTime.UtcNow;
    }

    public override string ToString()
    {
      return $"用户已登录: 用户名={Username}, 时间={LoginTime}";
    }
}2. 定义 EventBus 接口 (IEventBus)
using System;

public interface IEventBus
{
    /// <summary>
    /// 订阅指定类型的事件
    /// </summary>
    /// <typeparam name="TEvent">事件类型</typeparam>
    /// <param name="handler">事件处理委托</param>
    void Subscribe<TEvent>(Action<TEvent> handler) where TEvent : class;

    /// <summary>
    /// 取消订阅指定类型的事件
    /// </summary>
    /// <typeparam name="TEvent">事件类型</typeparam>
    /// <param name="handler">之前订阅时使用的同一个事件处理委托实例</param>
    void Unsubscribe<TEvent>(Action<TEvent> handler) where TEvent : class;

    /// <summary>
    /// 发布一个事件,所有订阅了该事件类型的处理程序都将被调用
    /// </summary>
    /// <typeparam name="TEvent">事件类型</typeparam>
    /// <param name="event">要发布的事件实例</param>
    void Publish<TEvent>(TEvent @event) where TEvent : class;
}3. 实现 EventBus (EventBus)
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading; // 用于演示或简单场景,实际可能用 Task

public class EventBus : IEventBus
{
    // 存储订阅信息的字典
    // Key: 事件类型 (Type)
    // Value: 处理该事件类型的委托列表 (List of Action<TEvent>),这里用 List<object> 存储以便处理不同泛型类型
    private readonly Dictionary<Type, List<object>> _subscribers;
    // 用于确保线程安全的锁对象
    private readonly object _lock = new object();

    public EventBus()
    {
      _subscribers = new Dictionary<Type, List<object>>();
    }

    public void Subscribe<TEvent>(Action<TEvent> handler) where TEvent : class
    {
      // 获取事件类型
      Type eventType = typeof(TEvent);

      lock (_lock)
      {
            // 如果该事件类型还没有订阅者,则创建新列表
            if (!_subscribers.ContainsKey(eventType))
            {
                _subscribers = new List<object>();
            }

            // 添加处理委托到列表中 (确保不重复添加同一个委托实例)
            if (!_subscribers.Contains(handler))
            {
               _subscribers.Add(handler);
            }
      }
    }

    public void Unsubscribe<TEvent>(Action<TEvent> handler) where TEvent : class
    {
      Type eventType = typeof(TEvent);

      lock (_lock)
      {
            // 检查是否存在该事件类型的订阅列表,以及列表中是否包含该处理委托
            if (_subscribers.TryGetValue(eventType, out var handlers))
            {
                handlers.Remove(handler);

                // 如果移除后列表为空,可以选择从字典中移除该事件类型(可选,优化内存)
                if (handlers.Count == 0)
                {
                  _subscribers.Remove(eventType);
                }
            }
      }
    }

    public void Publish<TEvent>(TEvent @event) where TEvent : class
    {
      Type eventType = typeof(TEvent);
      List<object> handlersToInvoke = null;

      lock (_lock)
      {
            // 检查是否有该事件类型的订阅者
            if (_subscribers.TryGetValue(eventType, out var handlers))
            {
                // 复制列表以避免在迭代时修改集合(如果在处理程序中取消订阅)
                // 并且在锁外执行委托调用,减少锁的持有时间
                handlersToInvoke = handlers.ToList();
            }
      }

      // 在锁外部调用处理程序,避免长时间持有锁
      if (handlersToInvoke != null)
      {
            foreach (var handlerObj in handlersToInvoke)
            {
                // 将 object 转换回具体的 Action<TEvent>
                if (handlerObj is Action<TEvent> handler)
                {
                  try
                  {
                        // 调用处理委托
                        // 实际应用中可能需要考虑:
                        // 1. 异步执行 (handler( @event).ConfigureAwait(false)等)
                        // 2. 错误处理 (try-catch 并记录日志)
                        handler(@event);
                  }
                  catch (Exception ex)
                  {
                        // 处理或记录订阅者处理事件时发生的异常
                        Console.WriteLine($"处理事件 {@event.GetType().Name} 时出错: {ex.Message}");
                        // 根据策略决定是否继续通知其他订阅者
                  }
                }
            }
      }
    }
}4. 示例:发布者和订阅者
// === 发布者 ===
public class OrderService
{
    private readonly IEventBus _eventBus;

    public OrderService(IEventBus eventBus)
    {
      _eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus));
    }

    public void CreateOrder(int orderId)
    {
      Console.WriteLine($"订单服务: 正在创建订单 {orderId}...");
      // ... 执行创建订单的逻辑 ...
      Console.WriteLine($"订单服务: 订单 {orderId} 创建成功.");

      // 创建事件实例
      var orderCreatedEvent = new OrderCreatedEvent(orderId);

      // 发布事件到 EventBus
      Console.WriteLine($"订单服务: 发布 {nameof(OrderCreatedEvent)} 事件.");
      _eventBus.Publish(orderCreatedEvent);
    }
}

public class AuthenticationService
{
   private readonly IEventBus _eventBus;

    public AuthenticationService(IEventBus eventBus)
    {
      _eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus));
    }

   public void Login(string username)
    {
      Console.WriteLine($"认证服务: 用户 {username} 尝试登录...");
      // ... 认证逻辑 ...
      Console.WriteLine($"认证服务: 用户 {username} 登录成功.");

      var loggedInEvent = new UserLoggedInEvent(username);
      Console.WriteLine($"认证服务: 发布 {nameof(UserLoggedInEvent)} 事件.");
      _eventBus.Publish(loggedInEvent);
    }
}


// === 订阅者 ===
public class EmailService : IDisposable // 实现 IDisposable 以便在服务生命周期结束时取消订阅
{
    private readonly IEventBus _eventBus;
    // 保存委托实例,以便取消订阅时使用
    private readonly Action<OrderCreatedEvent> _orderCreatedHandler;

    public EmailService(IEventBus eventBus)
    {
      _eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus));
      // 将处理方法赋值给字段
      _orderCreatedHandler = HandleOrderCreated;
      // 订阅事件
      _eventBus.Subscribe(_orderCreatedHandler); // 使用保存的委托实例
      Console.WriteLine("邮件服务: 已订阅 OrderCreatedEvent.");
    }

    private void HandleOrderCreated(OrderCreatedEvent @event)
    {
      Console.WriteLine($"邮件服务: 收到订单创建事件. 准备发送邮件通知 (订单ID: {@event.OrderId})...");
      // 模拟发送邮件
      Thread.Sleep(50); // 模拟耗时
      Console.WriteLine($"邮件服务: 订单 {@event.OrderId} 的创建通知邮件已发送.");
    }

    public void Dispose()
    {
      // 在对象销毁时取消订阅,防止内存泄漏
      _eventBus.Unsubscribe(_orderCreatedHandler); // 使用保存的委托实例取消订阅
      Console.WriteLine("邮件服务: 已取消订阅 OrderCreatedEvent.");
    }
}

public class InventoryService
{
    private readonly IEventBus _eventBus;

    public InventoryService(IEventBus eventBus)
    {
      _eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus));
      // 可以直接使用 Lambda 表达式或方法组,但取消订阅会稍微麻烦些
      // 如果需要取消订阅,最好像 EmailService 那样保存委托实例
      _eventBus.Subscribe<OrderCreatedEvent>(HandleOrderCreated);
         Console.WriteLine("库存服务: 已订阅 OrderCreatedEvent.");
    }

    private void HandleOrderCreated(OrderCreatedEvent @event)
    {
      Console.WriteLine($"库存服务: 收到订单创建事件. 正在为订单 {@event.OrderId} 预留库存...");
      // 模拟库存操作
      Thread.Sleep(100); // 模拟耗时
      Console.WriteLine($"库存服务: 订单 {@event.OrderId} 的库存已预留.");
    }

   // 注意:如果用 Lambda 或方法组直接订阅且需要取消订阅,需要保存那个委托实例
   // public void StopListening() { _eventBus.Unsubscribe<OrderCreatedEvent>(HandleOrderCreated); } // 这样可能无法正确取消匿名方法
}

public class AuditService
{
   private readonly IEventBus _eventBus;

    public AuditService(IEventBus eventBus)
    {
      _eventBus = eventBus;
      // 订阅多种事件
      _eventBus.Subscribe<OrderCreatedEvent>(LogEvent);
      _eventBus.Subscribe<UserLoggedInEvent>(LogEvent);
         Console.WriteLine("审计服务: 已订阅 OrderCreatedEvent 和 UserLoggedInEvent.");
    }

    // 一个通用的日志记录方法,处理不同类型的事件
    private void LogEvent<TEvent>(TEvent @event) where TEvent : class
    {
         Console.WriteLine($"审计服务: 记录到事件 - 类型: {@event.GetType().Name}, 内容: {@event.ToString()}");
    }

    // 可选:如果需要取消订阅,同样需要保存委托实例
}5. 运行示例
using System;

public class Program
{
    public static void Main(string[] args)
    {
      // 1. 创建 EventBus 实例 (通常通过依赖注入容器管理,这里手动创建)
      IEventBus eventBus = new EventBus();

      // 2. 创建订阅者实例,并将 EventBus 注入
      using (var emailService = new EmailService(eventBus)) // 使用 using 确保 Dispose 被调用以取消订阅
      {
            var inventoryService = new InventoryService(eventBus);
            var auditService = new AuditService(eventBus);

            Console.WriteLine("\n--- 初始化完成, 准备发布事件 ---\n");

            // 3. 创建发布者实例
            var orderService = new OrderService(eventBus);
            var authService = new AuthenticationService(eventBus);

            // 4. 发布者执行操作并发布事件
            orderService.CreateOrder(101);

            Console.WriteLine("\n----------------------------\n");

            authService.Login("Alice");

            Console.WriteLine("\n--- 事件处理完成 ---\n");

            // 模拟 EmailService 停止工作(离开 using 块会自动调用 Dispose 取消订阅)
      } // emailService.Dispose() 会在这里被调用

      Console.WriteLine("\n--- EmailService 已停止并取消订阅 ---\n");

      // 再次创建订单,EmailService 不应再收到通知
      var orderService2 = new OrderService(eventBus); // 可以复用之前的,这里新建仅为演示
      orderService2.CreateOrder(102);


      Console.WriteLine("\n--- 程序结束 ---");
      Console.ReadKey();
    }
}输出示例:
邮件服务: 已订阅 OrderCreatedEvent.
库存服务: 已订阅 OrderCreatedEvent.
审计服务: 已订阅 OrderCreatedEvent 和 UserLoggedInEvent.

--- 初始化完成, 准备发布事件 ---

订单服务: 正在创建订单 101...
订单服务: 订单 101 创建成功.
订单服务: 发布 OrderCreatedEvent 事件.
邮件服务: 收到订单创建事件. 准备发送邮件通知 (订单ID: 101)...
库存服务: 收到订单创建事件. 正在为订单 101 预留库存...
审计服务: 记录到事件 - 类型: OrderCreatedEvent, 内容: 订单已创建: ID=101, 时间=...
邮件服务: 订单 101 的创建通知邮件已发送.
库存服务: 订单 101 的库存已预留.

----------------------------

认证服务: 用户 Alice 尝试登录...
认证服务: 用户 Alice 登录成功.
认证服务: 发布 UserLoggedInEvent 事件.
审计服务: 记录到事件 - 类型: UserLoggedInEvent, 内容: 用户已登录: 用户名=Alice, 时间=...

--- 事件处理完成 ---

邮件服务: 已取消订阅 OrderCreatedEvent.

--- EmailService 已停止并取消订阅 ---

订单服务: 正在创建订单 102...
订单服务: 订单 102 创建成功.
订单服务: 发布 OrderCreatedEvent 事件.
库存服务: 收到订单创建事件. 正在为订单 102 预留库存...
审计服务: 记录到事件 - 类型: OrderCreatedEvent, 内容: 订单已创建: ID=102, 时间=...
库存服务: 订单 102 的库存已预留.

--- 程序结束 ---关键点和考虑:

[*]线程安全: 上述实现使用了简单的 lock 来保护 _subscribers 字典。对于高并发场景,可能需要更精细的锁策略或使用 ConcurrentDictionary(但仍需注意其内部列表的线程安全)。在 Publish 方法中,先复制订阅者列表再在锁外调用委托,是为了减少锁的持有时间,提高并发性能,并防止在处理程序中修改订阅列表导致迭代错误。
[*]错误处理: 订阅者的处理程序中可能抛出异常。Publish 方法应该能够处理这些异常(例如,记录日志),并决定是否继续通知其他订阅者。
[*]异步处理: 如果订阅者的处理逻辑是 I/O 密集型或耗时的,应该考虑使用异步委托 (Func) 和异步发布机制,以避免阻塞发布线程。
[*]取消订阅: 非常重要,尤其是在订阅者的生命周期比 EventBus 短的情况下(例如 UI 组件、临时服务)。否则会导致内存泄漏(EventBus 持有对已不再需要的订阅者的引用)。使用 IDisposable 是一种常见的管理取消订阅的方式。确保 Unsubscribe 时传入的是 Subscribe 时使用的同一个委托实例。
[*]弱引用: 对于某些场景(如 UI 框架),如果订阅者可能被垃圾回收而没有显式取消订阅,可以使用 WeakReference 来持有委托,但这会增加实现的复杂性。
[*]依赖注入: 在实际应用中,IEventBus 通常注册为单例(Singleton)服务,并通过依赖注入(DI)容器注入到需要它的发布者和订阅者中。
[*]事件继承: 当前设计是基于精确的事件类型匹配。如果需要支持订阅基类事件或接口事件来接收所有派生类/实现类的事件,Publish 方法的逻辑需要相应调整(遍历事件类型的继承链或接口)。
这个简单的 EventBus 实现提供了一个基础框架,你可以根据具体项目的需求进行扩展和优化。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: 使用 .NET 实现 EventBus(事件总线)设计模式