# 事件驱动消息推送系统 - 实现总结 ## 功能概述 采用Spring事件驱动架构重构消息推送功能,实现业务逻辑与消息推送的完全解耦,使其他系统可以轻松发布事件来触发消息推送。 ## 架构设计 ### 设计模式 采用**观察者模式**(Observer Pattern)的Spring事件机制实现: ``` 业务系统 --发布--> 事件 --监听--> 消息监听器 --保存--> 消息库 ``` ### 核心组件 ```mermaid graph LR A[任务服务] -->|发布事件| B[ApplicationEventPublisher] B --> C[TaskCreatedEvent] B --> D[TaskAssignedEvent] B --> E[TaskStatusChangedEvent] C -->|监听| F[TaskMessageListener] D -->|监听| F E -->|监听| F F -->|保存消息| G[sys_message表] ``` ## 实现内容 ### 1. 事件类设计 #### 1.1 事件基类 **文件**: `ruoyi-system/src/main/java/com/ruoyi/system/event/TaskEvent.java` ```java public abstract class TaskEvent extends ApplicationEvent { private Long taskId; // 任务ID private String taskCode; // 任务编号 private Long operatorId; // 操作人ID private String operatorName; // 操作人姓名 } ``` #### 1.2 任务创建事件 **文件**: `ruoyi-system/src/main/java/com/ruoyi/system/event/TaskCreatedEvent.java` ```java public class TaskCreatedEvent extends TaskEvent { private String taskType; // 任务类型 private Long creatorId; // 创建人ID private String creatorName; // 创建人姓名 } ``` #### 1.3 任务分配事件 **文件**: `ruoyi-system/src/main/java/com/ruoyi/system/event/TaskAssignedEvent.java` ```java public class TaskAssignedEvent extends TaskEvent { private List assigneeIds; // 执行人ID列表 private List assigneeNames; // 执行人姓名列表 private Long assignerId; // 分配人ID private String assignerName; // 分配人姓名 } ``` #### 1.4 任务状态变更事件 **文件**: `ruoyi-system/src/main/java/com/ruoyi/system/event/TaskStatusChangedEvent.java` ```java public class TaskStatusChangedEvent extends TaskEvent { private String oldStatus; // 旧状态 private String newStatus; // 新状态 private String oldStatusDesc; // 旧状态描述 private String newStatusDesc; // 新状态描述 private List assigneeIds; // 执行人ID列表 private Long creatorId; // 创建人ID } ``` ### 2. 消息监听器 **文件**: `ruoyi-system/src/main/java/com/ruoyi/system/listener/TaskMessageListener.java` #### 2.1 核心特性 - ✅ **异步处理**: 使用 `@Async` 注解,不阻塞主业务 - ✅ **事件监听**: 使用 `@EventListener` 注解监听事件 - ✅ **自动保存**: 监听到事件后自动保存消息到数据库 - ✅ **容错处理**: 异常不影响主业务流程 #### 2.2 监听器方法 ```java @Component public class TaskMessageListener { // 监听任务创建事件 @Async @EventListener public void handleTaskCreatedEvent(TaskCreatedEvent event) { // 保存创建成功消息 } // 监听任务分配事件 @Async @EventListener public void handleTaskAssignedEvent(TaskAssignedEvent event) { // 给每个执行人发送任务推送消息 } // 监听任务状态变更事件 @Async @EventListener public void handleTaskStatusChangedEvent(TaskStatusChangedEvent event) { // 给执行人和创建人发送状态变更消息 } } ``` ### 3. 异步配置 **文件**: `ruoyi-framework/src/main/java/com/ruoyi/framework/config/AsyncConfig.java` ```java @Configuration @EnableAsync public class AsyncConfig { @Bean(name = "taskExecutor") public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); // 核心线程数 executor.setMaxPoolSize(10); // 最大线程数 executor.setQueueCapacity(100); // 队列容量 executor.setThreadNamePrefix("async-task-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } } ``` ### 4. 任务服务集成 **文件**: `ruoyi-system/src/main/java/com/ruoyi/system/service/impl/SysTaskServiceImpl.java` #### 4.1 注入事件发布器 ```java @Autowired private ApplicationEventPublisher eventPublisher; ``` #### 4.2 发布事件 **创建任务时**: ```java // 发布任务创建事件 eventPublisher.publishEvent(new TaskCreatedEvent( this, taskId, taskCode, taskType, creatorId, creatorName )); // 发布任务分配事件 eventPublisher.publishEvent(new TaskAssignedEvent( this, taskId, taskCode, assigneeIds, assigneeNames, assignerId, assignerName )); ``` **分配任务时**: ```java eventPublisher.publishEvent(new TaskAssignedEvent( this, taskId, taskCode, assigneeIds, null, assignerId, assignerName )); ``` **状态变更时**: ```java eventPublisher.publishEvent(new TaskStatusChangedEvent( this, taskId, taskCode, oldStatus, newStatus, oldStatusDesc, newStatusDesc, assigneeIds, creatorId )); ``` ## 架构优势 ### 1. 完全解耦 | 对比项 | 直接调用 | 事件驱动 | |--------|---------|---------| | 耦合度 | 强耦合 | 完全解耦 | | 依赖关系 | 需要注入Service | 只需发布事件 | | 修改影响 | 影响业务代码 | 不影响业务代码 | | 扩展性 | 需修改业务代码 | 只需添加监听器 | ### 2. 异步处理 ``` 同步方式: 业务处理 → 保存消息 → 返回结果 总耗时 = 业务耗时 + 消息耗时 异步方式: 业务处理 → 返回结果 ↓ 发布事件 → 异步保存消息(不影响主流程) 总耗时 ≈ 业务耗时 ``` ### 3. 高扩展性 其他系统需要触发消息推送时,只需: ```java // 1. 发布事件 eventPublisher.publishEvent(new TaskCreatedEvent(...)); // 2. 无需关心消息如何保存 // 3. 无需修改任何现有代码 ``` ### 4. 易维护性 - **单一职责**: 业务服务只负责业务逻辑,监听器只负责消息推送 - **独立测试**: 可以独立测试事件发布和监听 - **日志清晰**: 异步线程有独立的线程名称前缀 ## 使用指南 ### 其他系统如何使用 #### 场景1: 订单系统推送消息 ```java @Service public class OrderServiceImpl { @Autowired private ApplicationEventPublisher eventPublisher; public void createOrder(Order order) { // 1. 业务逻辑 orderMapper.insert(order); // 2. 发布事件(推送消息) eventPublisher.publishEvent(new OrderCreatedEvent( this, order.getOrderId(), order.getOrderNo(), order.getCreatorId() )); } } ``` #### 场景2: 审批系统推送消息 ```java @Service public class ApprovalServiceImpl { @Autowired private ApplicationEventPublisher eventPublisher; public void approveTask(Long taskId, String result) { // 1. 业务逻辑 approvalMapper.updateStatus(taskId, result); // 2. 发布事件(推送消息) eventPublisher.publishEvent(new ApprovalCompletedEvent( this, taskId, result, approverId, applicantId )); } } ``` ### 添加新的事件类型 #### 步骤1: 创建事件类 ```java package com.ruoyi.system.event; public class OrderCreatedEvent extends ApplicationEvent { private Long orderId; private String orderNo; private Long creatorId; public OrderCreatedEvent(Object source, Long orderId, String orderNo, Long creatorId) { super(source); this.orderId = orderId; this.orderNo = orderNo; this.creatorId = creatorId; } // getters and setters } ``` #### 步骤2: 添加监听器方法 ```java @Component public class OrderMessageListener { @Autowired private SysMessageMapper sysMessageMapper; @Async @EventListener public void handleOrderCreatedEvent(OrderCreatedEvent event) { // 保存消息 SysMessage message = new SysMessage(); message.setMessageType("ORDER_CREATE"); message.setMessageTitle("订单创建成功"); message.setMessageContent("您的订单已创建成功"); // ... 设置其他字段 sysMessageMapper.insertSysMessage(message); } } ``` #### 步骤3: 发布事件 ```java eventPublisher.publishEvent(new OrderCreatedEvent(this, orderId, orderNo, creatorId)); ``` ## 性能优化 ### 1. 线程池配置优化 根据实际负载调整线程池参数: ```java // 高并发场景 executor.setCorePoolSize(10); executor.setMaxPoolSize(20); executor.setQueueCapacity(200); // 低并发场景 executor.setCorePoolSize(3); executor.setMaxPoolSize(5); executor.setQueueCapacity(50); ``` ### 2. 批量保存优化 如果同时有大量消息,可以批量保存: ```java @Async @EventListener public void handleTaskAssignedEvent(TaskAssignedEvent event) { List messages = new ArrayList<>(); for (Long assigneeId : event.getAssigneeIds()) { SysMessage message = new SysMessage(); // ... 设置字段 messages.add(message); } // 批量插入 sysMessageMapper.batchInsert(messages); } ``` ### 3. 消息去重 避免重复发送消息: ```java @Async @EventListener public void handleTaskStatusChangedEvent(TaskStatusChangedEvent event) { // 使用Set去重 Set receiverIds = new HashSet<>(); receiverIds.addAll(event.getAssigneeIds()); receiverIds.add(event.getCreatorId()); for (Long receiverId : receiverIds) { // 保存消息 } } ``` ## 监控与调试 ### 1. 查看事件日志 ``` 2025-10-25 14:30:15.123 [async-task-1] INFO TaskMessageListener - 收到任务创建事件,任务ID:1001,任务编号:TASK-20251025-001 2025-10-25 14:30:15.234 [async-task-1] INFO TaskMessageListener - 任务创建消息已保存,消息ID:5001 2025-10-25 14:30:15.345 [async-task-2] INFO TaskMessageListener - 收到任务分配事件,任务ID:1001,执行人数量:3 2025-10-25 14:30:15.456 [async-task-2] INFO TaskMessageListener - 任务分配消息已保存,消息ID:5002,接收人:张三 ``` ### 2. 监控线程池状态 ```java @Component public class ThreadPoolMonitor { @Autowired @Qualifier("taskExecutor") private ThreadPoolTaskExecutor executor; @Scheduled(fixedDelay = 60000) // 每分钟 public void monitor() { log.info("线程池状态 - 活跃线程:{},队列大小:{},完成任务:{}", executor.getActiveCount(), executor.getThreadPoolExecutor().getQueue().size(), executor.getThreadPoolExecutor().getCompletedTaskCount()); } } ``` ## 注意事项 ### 1. 事务边界 事件发布应该在事务提交之后: ```java @Transactional public int insertSysTask(TaskCreateVO createVO) { // 1. 保存任务 int result = sysTaskMapper.insertSysTask(task); // 2. 在事务内发布事件(事务提交后才会触发监听器) if (result > 0) { eventPublisher.publishEvent(new TaskCreatedEvent(...)); } return result; } ``` ### 2. 异常处理 监听器中的异常不会影响主业务: ```java @Async @EventListener public void handleTaskCreatedEvent(TaskCreatedEvent event) { try { // 处理事件 } catch (Exception e) { log.error("处理任务创建事件失败", e); // 不向上抛出异常 } } ``` ### 3. 顺序性 异步事件不保证执行顺序: ```java // 这两个事件可能以任意顺序执行 eventPublisher.publishEvent(event1); eventPublisher.publishEvent(event2); ``` 如需保证顺序,使用同步监听器: ```java @EventListener // 不使用@Async public void handleEvent(Event event) { // 同步执行,保证顺序 } ``` ## 对比总结 | 特性 | 直接调用Service | 事件驱动 | |------|---------------|---------| | 代码耦合度 | 高 | 低 | | 性能影响 | 同步阻塞 | 异步非阻塞 | | 扩展性 | 需修改代码 | 只需添加监听器 | | 可测试性 | 需模拟Service | 只需发布事件 | | 可维护性 | 业务代码混杂 | 职责清晰分离 | | 学习成本 | 低 | 中 | | 适用场景 | 简单项目 | 复杂/大型项目 | ## 总结 通过引入Spring事件驱动架构,我们成功实现了: 1. ✅ **业务与消息完全解耦** - 业务代码只需发布事件 2. ✅ **异步非阻塞处理** - 不影响主业务性能 3. ✅ **高度可扩展** - 其他系统轻松集成 4. ✅ **职责清晰** - 监听器专注消息推送 5. ✅ **易于维护** - 独立的事件和监听器管理 这种架构特别适合大型项目和需要高扩展性的系统! --- **更新时间**: 2025-10-25 **版本**: v2.0 **架构**: 事件驱动