找回密码
 立即注册
首页 业界区 业界 PipelinR:在Java中实现优雅的CQRS架构

PipelinR:在Java中实现优雅的CQRS架构

秤陷曲 2 小时前
使用中介者模式轻松实现命令查询职责分离,构建高内聚、低耦合的应用系统
一、知识点回顾

1. 什么是CQRS?

CQRS是Command Query Responsibility Segregation的缩写,一般称作命令查询职责分离。从字面意思理解,就是将命令(写入)和查询(读取)的责任划分到不同的模型中。
对比一下常用的 CRUD 模式(创建-读取-更新-删除),通常我们会让用户界面与负责所有四种操作的数据存储交互。而 CQRS 则将这些操作分成两种模式,一种用于查询(又称 "R"),另一种用于命令(又称 "CUD")。
1.png

2. CQRS的作用是什么?

CQRS将系统的写操作(命令)和读操作(查询)分离到不同的模型和数据存储中,从而实现读写分离,提高系统的性能、可扩展性和安全性,并使复杂业务逻辑(写端)和高效查询(读端)各自得到优化,降低系统复杂性。它允许为写操作设计严谨的领域模型,为读操作设计简单、只关注查询效率的数据模型(如专用视图或报表数据库),并可通过事件等机制保持最终一致性。
3. CQRS 的优点


  • 独立缩放。 CQRS 使读取模型和写入模型能够独立缩放。 此方法可帮助最大程度地减少锁争用并提高负载下的系统性能。
  • 优化的数据架构。 读取操作可以使用针对查询进行优化的模式。 写入操作使用针对更新优化的模式。
  • 安全性。 通过分隔读取和写入,可以确保只有适当的域实体或操作有权对数据执行写入操作。
  • 关注点分离。 分离读取和写入责任会导致更简洁、更易于维护的模型。 写入端通常处理复杂的业务逻辑。 读取端可以保持简单且专注于查询效率。
  • 更简单的查询。 在读取数据库中存储具体化视图时,应用程序可以在查询时避免复杂的联接。
二、关于PipelinR

项目地址
https://github.com/sizovs/PipelinR
项目开发者在Github的介绍不多,关键是最后一句话:It's similar to a popular MediatR .NET library. 意思就是这个项目是参考着一个叫MediatR的.net库写的。关于MediatR我之前有两篇文章专门介绍过。
PipelinR(包括MediatR)提供了一种CQRS的实现方式,基于中介者模式实现进程内消息传递,用于解耦应用中的各个组件,支持请求/响应(一对一,有返回值)和发布/订阅(一对多,无返回值)两种消息模式。它们在内部提供管道行为 (Pipeline Behaviors),用于在消息处理前后插入自定义逻辑,如日志、验证、异常处理等。
需要提醒的是,PipelinR并不是一个完整的CQRS框架,它只是一个中介者模式的具体实现方式,将调用方和处理方进行了解耦,而这种模式恰好可以用来在一个单体应用(或者是微服务的服务内部)中实现简单的CQRS。
三、依赖安装和配置

1. Maven安装
  1. <dependency>
  2.   <groupId>net.sizovs</groupId>
  3.   pipelinr</artifactId>
  4.   <version>0.11</version>
  5. </dependency>
复制代码
2. Gradle安装
  1. dependencies {
  2.     compile 'net.sizovs:pipelinr:0.11'
  3. }
复制代码
在Spring项目中配置PipelinR
  1. @Configuration
  2. public class PipelinrConfiguration {
  3.     @Bean
  4.     Pipeline pipeline(ObjectProvider<Command.Handler> commandHandlers, ObjectProvider<Notification.Handler> notificationHandlers, ObjectProvider<Command.Middleware> middlewares) {
  5.         return new Pipelinr()
  6.           .with(commandHandlers::stream)
  7.           .with(notificationHandlers::stream)
  8.           .with(middlewares::orderedStream);
  9.     }
  10. }
复制代码
四、核心组件


  • Pipeline/Pipelinr:Pipeline是消息和处理器之间的中介者,调用方向Pipeline发送消息,Pipeline收到消息后通过注册到Pipeline的中间件进行层层传递并最终抵达匹配的消息处理器进行处理。Pipelinr是Pipeline的默认实现。
  • Command:用于约定请求/响应模式的消息类型,泛型参数R是返回值的类型,如果不需要返回值,可以将R指定为Voidy。
  • Notification:用于约定发布/订阅模式的消息类型,没有返回值,消息可以有多个处理器。
  • Middleware:管道中间件,Command和Notification都定义了各自的中间件接口。Pipeline接收到的消息,在到达最终的处理器之前,会经过所有注册到Pipeline的中间。可以使用Middleware实现诸如日志记录、数据验证、开启事务等一系列操作。
五、请求/响应模式实现

请求/响应模式需要用到Command接口。
1. 定义Command

Command代表一个请求,需要实现net.sizovs.pipelinr.Command接口。泛型参数指定返回值类型。
  1. // 定义一个创建用户的命令
  2. public class CreateUserCommand implements Command<UserResponse> {
  3.     private String username;
  4.     private String email;
  5.    
  6.     public CreateUserCommand(String username, String email) {
  7.         this.username = username;
  8.         this.email = email;
  9.     }
  10.    
  11.     public String getUsername() {
  12.         return username;
  13.     }
  14.    
  15.     public String getEmail() {
  16.         return email;
  17.     }
  18. }
  19. // 返回值类型
  20. public class UserResponse {
  21.     private Long userId;
  22.     private String username;
  23.     private String email;
  24.    
  25.     public UserResponse(Long userId, String username, String email) {
  26.         this.userId = userId;
  27.         this.username = username;
  28.         this.email = email;
  29.     }
  30.    
  31.     // getters
  32. }
复制代码
2. 定义Command Handler

创建该Command对应的处理器,实现net.sizovs.pipelinr.Command.Handler接口。
  1. @Component
  2. public class CreateUserCommandHandler implements Command.Handler<CreateUserCommand, UserResponse> {
  3.    
  4.     @Autowired
  5.     private UserRepository userRepository;
  6.    
  7.     @Override
  8.     public UserResponse handle(CreateUserCommand command) {
  9.         // 业务逻辑处理
  10.         User user = new User();
  11.         user.setUsername(command.getUsername());
  12.         user.setEmail(command.getEmail());
  13.         
  14.         User savedUser = userRepository.save(user);
  15.         
  16.         return new UserResponse(savedUser.getId(), savedUser.getUsername(), savedUser.getEmail());
  17.     }
  18. }
复制代码
3. 在业务代码中使用

通过注入Pipeline实例,发送Command并获取响应。
  1. @Service
  2. public class UserService {
  3.    
  4.     @Autowired
  5.     private Pipeline pipeline;
  6.    
  7.     public UserResponse createUser(String username, String email) {
  8.         CreateUserCommand command = new CreateUserCommand(username, email);
  9.         UserResponse response = pipeline.send(command);
  10.         return response;
  11.     }
  12. }
复制代码
4. 添加Command中间件

中间件可以在Command处理前后执行一些操作,如验证、日志、事务管理等。
  1. @Component
  2. public class LoggingMiddleware implements Command.Middleware {
  3.    
  4.     private static final Logger logger = LoggerFactory.getLogger(LoggingMiddleware.class);
  5.    
  6.     @Override
  7.     public <R, C extends Command<R>> R invoke(C command, Chain<R> chain) {
  8.         logger.info("Executing command: {}", command.getClass().getSimpleName());
  9.         try {
  10.             R result = chain.proceed(command);
  11.             logger.info("Command executed successfully");
  12.             return result;
  13.         } catch (Exception e) {
  14.             logger.error("Command execution failed", e);
  15.             throw e;
  16.         }
  17.     }
  18. }
  19. @Component
  20. public class ValidationMiddleware implements Command.Middleware {
  21.    
  22.     @Autowired
  23.     private Validator validator;
  24.    
  25.     @Override
  26.     public <R, C extends Command<R>> R invoke(C command, Chain<R> chain) {
  27.         Set<ConstraintViolation<C>> violations = validator.validate(command);
  28.         if (!violations.isEmpty()) {
  29.             throw new ConstraintViolationException("Validation failed", violations);
  30.         }
  31.         return chain.proceed(command);
  32.     }
  33. }
  34. @Component
  35. @Order(1) // 指定中间件执行顺序
  36. public class TransactionMiddleware implements Command.Middleware {
  37.    
  38.     @Autowired
  39.     private PlatformTransactionManager transactionManager;
  40.    
  41.     @Override
  42.     public <R, C extends Command<R>> R invoke(C command, Chain<R> chain) {
  43.         TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());
  44.         try {
  45.             R result = chain.proceed(command);
  46.             transactionManager.commit(status);
  47.             return result;
  48.         } catch (Exception e) {
  49.             transactionManager.rollback(status);
  50.             throw e;
  51.         }
  52.     }
  53. }
复制代码
六、发布/订阅模式实现

发布/订阅模式使用Notification接口,用于一对多的消息分发,没有返回值。
1. 定义Notification

Notification代表一个事件通知,需要实现net.sizovs.pipelinr.Notification接口。
  1. // 定义一个用户创建成功的事件通知
  2. public class UserCreatedNotification implements Notification {
  3.     private Long userId;
  4.     private String username;
  5.     private String email;
  6.     private LocalDateTime createdTime;
  7.    
  8.     public UserCreatedNotification(Long userId, String username, String email) {
  9.         this.userId = userId;
  10.         this.username = username;
  11.         this.email = email;
  12.         this.createdTime = LocalDateTime.now();
  13.     }
  14.    
  15.     // getters
  16. }
复制代码
2. 定义Notification Handler

Notification可以有多个处理器,每个处理器实现net.sizovs.pipelinr.Notification.Handler接口。
  1. @Component
  2. public class SendWelcomeEmailHandler implements Notification.Handler<UserCreatedNotification> {
  3.    
  4.     private static final Logger logger = LoggerFactory.getLogger(SendWelcomeEmailHandler.class);
  5.    
  6.     @Autowired
  7.     private EmailService emailService;
  8.    
  9.     @Override
  10.     public void handle(UserCreatedNotification notification) {
  11.         logger.info("Sending welcome email to user: {}", notification.getUsername());
  12.         emailService.sendWelcomeEmail(notification.getEmail(), notification.getUsername());
  13.     }
  14. }
  15. @Component
  16. public class LogUserCreationHandler implements Notification.Handler<UserCreatedNotification> {
  17.    
  18.     private static final Logger logger = LoggerFactory.getLogger(LogUserCreationHandler.class);
  19.    
  20.     @Autowired
  21.     private UserAuditLogRepository auditLogRepository;
  22.    
  23.     @Override
  24.     public void handle(UserCreatedNotification notification) {
  25.         logger.info("Logging user creation: {}", notification.getUsername());
  26.         UserAuditLog auditLog = new UserAuditLog();
  27.         auditLog.setUserId(notification.getUserId());
  28.         auditLog.setOperation("CREATE");
  29.         auditLog.setTimestamp(notification.getCreatedTime());
  30.         auditLogRepository.save(auditLog);
  31.     }
  32. }
  33. @Component
  34. public class UpdateUserStatisticsHandler implements Notification.Handler<UserCreatedNotification> {
  35.    
  36.     private static final Logger logger = LoggerFactory.getLogger(UpdateUserStatisticsHandler.class);
  37.    
  38.     @Autowired
  39.     private UserStatisticsRepository statisticsRepository;
  40.    
  41.     @Override
  42.     public void handle(UserCreatedNotification notification) {
  43.         logger.info("Updating statistics for new user: {}", notification.getUsername());
  44.         UserStatistics stats = statisticsRepository.findOrCreate();
  45.         stats.incrementTotalUsers();
  46.         statisticsRepository.save(stats);
  47.     }
  48. }
复制代码
3. 发送Notification

在Command处理完成后,可以发送Notification通知所有相关的处理器。
  1. @Component
  2. public class CreateUserCommandHandler implements Command.Handler<CreateUserCommand, UserResponse> {
  3.    
  4.     @Autowired
  5.     private UserRepository userRepository;
  6.    
  7.     @Autowired
  8.     private Pipeline pipeline;
  9.    
  10.     @Override
  11.     public UserResponse handle(CreateUserCommand command) {
  12.         // 业务逻辑处理
  13.         User user = new User();
  14.         user.setUsername(command.getUsername());
  15.         user.setEmail(command.getEmail());
  16.         
  17.         User savedUser = userRepository.save(user);
  18.         
  19.         // 发送事件通知
  20.         UserCreatedNotification notification = new UserCreatedNotification(
  21.             savedUser.getId(),
  22.             savedUser.getUsername(),
  23.             savedUser.getEmail()
  24.         );
  25.         pipeline.send(notification);
  26.         
  27.         return new UserResponse(savedUser.getId(), savedUser.getUsername(), savedUser.getEmail());
  28.     }
  29. }
复制代码
4. 添加Notification中间件

类似Command,Notification也支持中间件。
  1. @Component
  2. public class NotificationLoggingMiddleware implements Notification.Middleware {
  3.    
  4.     private static final Logger logger = LoggerFactory.getLogger(NotificationLoggingMiddleware.class);
  5.    
  6.     @Override
  7.     public <N extends Notification> void invoke(N notification, Chain chain) {
  8.         logger.info("Publishing notification: {}", notification.getClass().getSimpleName());
  9.         try {
  10.             chain.proceed(notification);
  11.             logger.info("Notification published successfully");
  12.         } catch (Exception e) {
  13.             logger.error("Notification publishing failed", e);
  14.             throw e;
  15.         }
  16.     }
  17. }
  18. @Component
  19. public class NotificationErrorHandlingMiddleware implements Notification.Middleware {
  20.    
  21.     private static final Logger logger = LoggerFactory.getLogger(NotificationErrorHandlingMiddleware.class);
  22.    
  23.     @Override
  24.     public <N extends Notification> void invoke(N notification, Chain chain) {
  25.         try {
  26.             chain.proceed(notification);
  27.         } catch (Exception e) {
  28.             logger.error("Error handling notification: {}", notification.getClass().getSimpleName(), e);
  29.             // 可以选择吞掉异常或重新抛出,取决于业务需求
  30.             // throw e;
  31.         }
  32.     }
  33. }
复制代码
七、总结

核心收获

通过本文的介绍,我们了解了如何在Java应用中使用PipelinR框架实现CQRS模式。核心要点总结如下:
1. CQRS的价值


  • 读写分离:通过Command处理写操作,Notification处理事件响应,实现职责的明确划分
  • 独立优化:读端和写端可以独立优化,不同的数据模型适应不同的场景需求
  • 系统解耦:中介者模式解耦了调用方和处理方,提高了系统的可维护性和可扩展性
2. PipelinR的核心特性


  • 轻量级实现:相比完整的CQRS框架,PipelinR更轻便,学习成本低
  • 灵活的管道机制:通过中间件可以方便地植入横切关注点(如日志、验证、事务等)
  • 支持两种消息模式:Command用于请求/响应,Notification用于发布/订阅
3. 最佳实践建议


  • 合理使用中间件:通过@Order注解控制中间件执行顺序,但要避免中间件层级过多导致性能问题
  • 异常处理:根据场景选择合适的异常处理策略,Notification可考虑不中断其他处理器的错误隔离
  • 事件驱动设计:充分利用Notification实现事件驱动架构,解耦不同的业务流程
  • 代码组织:按照Command、Handler、Middleware的划分方式组织代码,保持结构清晰
实施建议

适用场景


  • 中等复杂度的业务系统,需要良好的代码结构和可维护性
  • 业务逻辑相对复杂,需要事件驱动的系统设计
  • 团队具备良好的DDD设计理念和架构意识
注意事项


  • 学习曲线:虽然PipelinR本身简单,但要理解CQRS的设计理念需要一定时间
  • 适度使用:CQRS不是银弹,过度设计会增加系统复杂度,要根据实际需求决定是否引入
  • 团队协作:CQRS的有效实施对团队的整体架构意识和编码规范要求较高
  • 性能考虑:虽然使用了中介者模式会引入少量额外开销,但对大多数应用来说可以忽略不计
结论

PipelinR提供了一种轻量级、简洁的CQRS实现方案。它特别适合那些想要在不过度复杂化系统的前提下,引入DDD思想和事件驱动设计的项目。通过合理运用Command和Notification,结合恰当的中间件设计,开发者可以构建出高内聚、低耦合、易于维护和扩展的应用系统。
关键是要把握好"度"——既要充分发挥CQRS和PipelinR的优势,又要避免为了追求"高大上"的架构而过度设计,最终的目标是为业务的快速迭代和长期维护提供支撑。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册