编辑 | blame | 历史 | 原始文档

事件驱动消息推送系统 - 实现总结

功能概述

采用Spring事件驱动架构重构消息推送功能,实现业务逻辑与消息推送的完全解耦,使其他系统可以轻松发布事件来触发消息推送。

架构设计

设计模式

采用**观察者模式**(Observer Pattern)的Spring事件机制实现:

业务系统 --发布--> 事件 --监听--> 消息监听器 --保存--> 消息库

核心组件

graph LR
    A[任务服务] -->|发布事件| B[ApplicationEventPublisher]
    B --> C[TaskCreatedEvent]
    B --> D[TaskAssignedEvent]
    B --> E[TaskStatusChangedEvent]
    C -->|监听| F[TaskMessageListener]
    D -->|监听| F
    E -->|监听| F
    F -->|保存消息| G[sys_message表]

实现内容

1. 事件类设计

1.1 事件基类

文件: ruoyi-system/src/main/java/com/ruoyi/system/event/TaskEvent.java

public abstract class TaskEvent extends ApplicationEvent {
    private Long taskId;          // 任务ID
    private String taskCode;      // 任务编号
    private Long operatorId;      // 操作人ID
    private String operatorName;  // 操作人姓名
}

1.2 任务创建事件

文件: ruoyi-system/src/main/java/com/ruoyi/system/event/TaskCreatedEvent.java

public class TaskCreatedEvent extends TaskEvent {
    private String taskType;      // 任务类型
    private Long creatorId;       // 创建人ID
    private String creatorName;   // 创建人姓名
}

1.3 任务分配事件

文件: ruoyi-system/src/main/java/com/ruoyi/system/event/TaskAssignedEvent.java

public class TaskAssignedEvent extends TaskEvent {
    private List<Long> assigneeIds;     // 执行人ID列表
    private List<String> assigneeNames; // 执行人姓名列表
    private Long assignerId;            // 分配人ID
    private String assignerName;        // 分配人姓名
}

1.4 任务状态变更事件

文件: ruoyi-system/src/main/java/com/ruoyi/system/event/TaskStatusChangedEvent.java

public class TaskStatusChangedEvent extends TaskEvent {
    private String oldStatus;         // 旧状态
    private String newStatus;         // 新状态
    private String oldStatusDesc;     // 旧状态描述
    private String newStatusDesc;     // 新状态描述
    private List<Long> assigneeIds;   // 执行人ID列表
    private Long creatorId;           // 创建人ID
}

2. 消息监听器

文件: ruoyi-system/src/main/java/com/ruoyi/system/listener/TaskMessageListener.java

2.1 核心特性

  • 异步处理: 使用 @Async 注解,不阻塞主业务
  • 事件监听: 使用 @EventListener 注解监听事件
  • 自动保存: 监听到事件后自动保存消息到数据库
  • 容错处理: 异常不影响主业务流程

2.2 监听器方法

@Component
public class TaskMessageListener {
    
    // 监听任务创建事件
    @Async
    @EventListener
    public void handleTaskCreatedEvent(TaskCreatedEvent event) {
        // 保存创建成功消息
    }
    
    // 监听任务分配事件
    @Async
    @EventListener
    public void handleTaskAssignedEvent(TaskAssignedEvent event) {
        // 给每个执行人发送任务推送消息
    }
    
    // 监听任务状态变更事件
    @Async
    @EventListener
    public void handleTaskStatusChangedEvent(TaskStatusChangedEvent event) {
        // 给执行人和创建人发送状态变更消息
    }
}

3. 异步配置

文件: ruoyi-framework/src/main/java/com/ruoyi/framework/config/AsyncConfig.java

@Configuration
@EnableAsync
public class AsyncConfig {
    
    @Bean(name = "taskExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);         // 核心线程数
        executor.setMaxPoolSize(10);         // 最大线程数
        executor.setQueueCapacity(100);      // 队列容量
        executor.setThreadNamePrefix("async-task-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}

4. 任务服务集成

文件: ruoyi-system/src/main/java/com/ruoyi/system/service/impl/SysTaskServiceImpl.java

4.1 注入事件发布器

@Autowired
private ApplicationEventPublisher eventPublisher;

4.2 发布事件

创建任务时:
```java
// 发布任务创建事件
eventPublisher.publishEvent(new TaskCreatedEvent(
this, taskId, taskCode, taskType, creatorId, creatorName
));

// 发布任务分配事件
eventPublisher.publishEvent(new TaskAssignedEvent(
this, taskId, taskCode, assigneeIds, assigneeNames, assignerId, assignerName
));
```

分配任务时:
java eventPublisher.publishEvent(new TaskAssignedEvent( this, taskId, taskCode, assigneeIds, null, assignerId, assignerName ));

状态变更时:
java eventPublisher.publishEvent(new TaskStatusChangedEvent( this, taskId, taskCode, oldStatus, newStatus, oldStatusDesc, newStatusDesc, assigneeIds, creatorId ));

架构优势

1. 完全解耦

对比项 直接调用 事件驱动
耦合度 强耦合 完全解耦
依赖关系 需要注入Service 只需发布事件
修改影响 影响业务代码 不影响业务代码
扩展性 需修改业务代码 只需添加监听器

2. 异步处理

同步方式:
业务处理 → 保存消息 → 返回结果
总耗时 = 业务耗时 + 消息耗时

异步方式:
业务处理 → 返回结果
    ↓
发布事件 → 异步保存消息(不影响主流程)
总耗时 ≈ 业务耗时

3. 高扩展性

其他系统需要触发消息推送时,只需:

// 1. 发布事件
eventPublisher.publishEvent(new TaskCreatedEvent(...));

// 2. 无需关心消息如何保存
// 3. 无需修改任何现有代码

4. 易维护性

  • 单一职责: 业务服务只负责业务逻辑,监听器只负责消息推送
  • 独立测试: 可以独立测试事件发布和监听
  • 日志清晰: 异步线程有独立的线程名称前缀

使用指南

其他系统如何使用

场景1: 订单系统推送消息

@Service
public class OrderServiceImpl {
    
    @Autowired
    private ApplicationEventPublisher eventPublisher;
    
    public void createOrder(Order order) {
        // 1. 业务逻辑
        orderMapper.insert(order);
        
        // 2. 发布事件(推送消息)
        eventPublisher.publishEvent(new OrderCreatedEvent(
            this, order.getOrderId(), order.getOrderNo(), order.getCreatorId()
        ));
    }
}

场景2: 审批系统推送消息

@Service
public class ApprovalServiceImpl {
    
    @Autowired
    private ApplicationEventPublisher eventPublisher;
    
    public void approveTask(Long taskId, String result) {
        // 1. 业务逻辑
        approvalMapper.updateStatus(taskId, result);
        
        // 2. 发布事件(推送消息)
        eventPublisher.publishEvent(new ApprovalCompletedEvent(
            this, taskId, result, approverId, applicantId
        ));
    }
}

添加新的事件类型

步骤1: 创建事件类

package com.ruoyi.system.event;

public class OrderCreatedEvent extends ApplicationEvent {
    private Long orderId;
    private String orderNo;
    private Long creatorId;
    
    public OrderCreatedEvent(Object source, Long orderId, String orderNo, Long creatorId) {
        super(source);
        this.orderId = orderId;
        this.orderNo = orderNo;
        this.creatorId = creatorId;
    }
    
    // getters and setters
}

步骤2: 添加监听器方法

@Component
public class OrderMessageListener {
    
    @Autowired
    private SysMessageMapper sysMessageMapper;
    
    @Async
    @EventListener
    public void handleOrderCreatedEvent(OrderCreatedEvent event) {
        // 保存消息
        SysMessage message = new SysMessage();
        message.setMessageType("ORDER_CREATE");
        message.setMessageTitle("订单创建成功");
        message.setMessageContent("您的订单已创建成功");
        // ... 设置其他字段
        
        sysMessageMapper.insertSysMessage(message);
    }
}

步骤3: 发布事件

eventPublisher.publishEvent(new OrderCreatedEvent(this, orderId, orderNo, creatorId));

性能优化

1. 线程池配置优化

根据实际负载调整线程池参数:

// 高并发场景
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(200);

// 低并发场景
executor.setCorePoolSize(3);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(50);

2. 批量保存优化

如果同时有大量消息,可以批量保存:

@Async
@EventListener
public void handleTaskAssignedEvent(TaskAssignedEvent event) {
    List<SysMessage> messages = new ArrayList<>();
    
    for (Long assigneeId : event.getAssigneeIds()) {
        SysMessage message = new SysMessage();
        // ... 设置字段
        messages.add(message);
    }
    
    // 批量插入
    sysMessageMapper.batchInsert(messages);
}

3. 消息去重

避免重复发送消息:

@Async
@EventListener
public void handleTaskStatusChangedEvent(TaskStatusChangedEvent event) {
    // 使用Set去重
    Set<Long> receiverIds = new HashSet<>();
    receiverIds.addAll(event.getAssigneeIds());
    receiverIds.add(event.getCreatorId());
    
    for (Long receiverId : receiverIds) {
        // 保存消息
    }
}

监控与调试

1. 查看事件日志

2025-10-25 14:30:15.123 [async-task-1] INFO  TaskMessageListener - 收到任务创建事件,任务ID:1001,任务编号:TASK-20251025-001
2025-10-25 14:30:15.234 [async-task-1] INFO  TaskMessageListener - 任务创建消息已保存,消息ID:5001
2025-10-25 14:30:15.345 [async-task-2] INFO  TaskMessageListener - 收到任务分配事件,任务ID:1001,执行人数量:3
2025-10-25 14:30:15.456 [async-task-2] INFO  TaskMessageListener - 任务分配消息已保存,消息ID:5002,接收人:张三

2. 监控线程池状态

@Component
public class ThreadPoolMonitor {
    
    @Autowired
    @Qualifier("taskExecutor")
    private ThreadPoolTaskExecutor executor;
    
    @Scheduled(fixedDelay = 60000) // 每分钟
    public void monitor() {
        log.info("线程池状态 - 活跃线程:{},队列大小:{},完成任务:{}", 
                executor.getActiveCount(),
                executor.getThreadPoolExecutor().getQueue().size(),
                executor.getThreadPoolExecutor().getCompletedTaskCount());
    }
}

注意事项

1. 事务边界

事件发布应该在事务提交之后:

@Transactional
public int insertSysTask(TaskCreateVO createVO) {
    // 1. 保存任务
    int result = sysTaskMapper.insertSysTask(task);
    
    // 2. 在事务内发布事件(事务提交后才会触发监听器)
    if (result > 0) {
        eventPublisher.publishEvent(new TaskCreatedEvent(...));
    }
    
    return result;
}

2. 异常处理

监听器中的异常不会影响主业务:

@Async
@EventListener
public void handleTaskCreatedEvent(TaskCreatedEvent event) {
    try {
        // 处理事件
    } catch (Exception e) {
        log.error("处理任务创建事件失败", e);
        // 不向上抛出异常
    }
}

3. 顺序性

异步事件不保证执行顺序:

// 这两个事件可能以任意顺序执行
eventPublisher.publishEvent(event1);
eventPublisher.publishEvent(event2);

如需保证顺序,使用同步监听器:

@EventListener  // 不使用@Async
public void handleEvent(Event event) {
    // 同步执行,保证顺序
}

对比总结

特性 直接调用Service 事件驱动
代码耦合度
性能影响 同步阻塞 异步非阻塞
扩展性 需修改代码 只需添加监听器
可测试性 需模拟Service 只需发布事件
可维护性 业务代码混杂 职责清晰分离
学习成本
适用场景 简单项目 复杂/大型项目

总结

通过引入Spring事件驱动架构,我们成功实现了:

  1. 业务与消息完全解耦 - 业务代码只需发布事件
  2. 异步非阻塞处理 - 不影响主业务性能
  3. 高度可扩展 - 其他系统轻松集成
  4. 职责清晰 - 监听器专注消息推送
  5. 易于维护 - 独立的事件和监听器管理

这种架构特别适合大型项目和需要高扩展性的系统!


更新时间: 2025-10-25
版本: v2.0
架构: 事件驱动