采用Spring事件驱动架构重构消息推送功能,实现业务逻辑与消息推送的完全解耦,使其他系统可以轻松发布事件来触发消息推送。
采用**观察者模式**(Observer Pattern)的Spring事件机制实现:
业务系统 --发布--> 事件 --监听--> 消息监听器 --保存--> 消息库
graph LR
A[任务服务] -->|发布事件| B[ApplicationEventPublisher]
B --> C[TaskCreatedEvent]
B --> D[TaskAssignedEvent]
B --> E[TaskStatusChangedEvent]
C -->|监听| F[TaskMessageListener]
D -->|监听| F
E -->|监听| F
F -->|保存消息| G[sys_message表]
文件: ruoyi-system/src/main/java/com/ruoyi/system/event/TaskEvent.java
public abstract class TaskEvent extends ApplicationEvent {
private Long taskId; // 任务ID
private String taskCode; // 任务编号
private Long operatorId; // 操作人ID
private String operatorName; // 操作人姓名
}
文件: ruoyi-system/src/main/java/com/ruoyi/system/event/TaskCreatedEvent.java
public class TaskCreatedEvent extends TaskEvent {
private String taskType; // 任务类型
private Long creatorId; // 创建人ID
private String creatorName; // 创建人姓名
}
文件: ruoyi-system/src/main/java/com/ruoyi/system/event/TaskAssignedEvent.java
public class TaskAssignedEvent extends TaskEvent {
private List<Long> assigneeIds; // 执行人ID列表
private List<String> assigneeNames; // 执行人姓名列表
private Long assignerId; // 分配人ID
private String assignerName; // 分配人姓名
}
文件: ruoyi-system/src/main/java/com/ruoyi/system/event/TaskStatusChangedEvent.java
public class TaskStatusChangedEvent extends TaskEvent {
private String oldStatus; // 旧状态
private String newStatus; // 新状态
private String oldStatusDesc; // 旧状态描述
private String newStatusDesc; // 新状态描述
private List<Long> assigneeIds; // 执行人ID列表
private Long creatorId; // 创建人ID
}
文件: ruoyi-system/src/main/java/com/ruoyi/system/listener/TaskMessageListener.java
@Async 注解,不阻塞主业务@EventListener 注解监听事件@Component
public class TaskMessageListener {
// 监听任务创建事件
@Async
@EventListener
public void handleTaskCreatedEvent(TaskCreatedEvent event) {
// 保存创建成功消息
}
// 监听任务分配事件
@Async
@EventListener
public void handleTaskAssignedEvent(TaskAssignedEvent event) {
// 给每个执行人发送任务推送消息
}
// 监听任务状态变更事件
@Async
@EventListener
public void handleTaskStatusChangedEvent(TaskStatusChangedEvent event) {
// 给执行人和创建人发送状态变更消息
}
}
文件: ruoyi-framework/src/main/java/com/ruoyi/framework/config/AsyncConfig.java
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean(name = "taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5); // 核心线程数
executor.setMaxPoolSize(10); // 最大线程数
executor.setQueueCapacity(100); // 队列容量
executor.setThreadNamePrefix("async-task-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
文件: ruoyi-system/src/main/java/com/ruoyi/system/service/impl/SysTaskServiceImpl.java
@Autowired
private ApplicationEventPublisher eventPublisher;
创建任务时:
```java
// 发布任务创建事件
eventPublisher.publishEvent(new TaskCreatedEvent(
this, taskId, taskCode, taskType, creatorId, creatorName
));
// 发布任务分配事件
eventPublisher.publishEvent(new TaskAssignedEvent(
this, taskId, taskCode, assigneeIds, assigneeNames, assignerId, assignerName
));
```
分配任务时:java eventPublisher.publishEvent(new TaskAssignedEvent( this, taskId, taskCode, assigneeIds, null, assignerId, assignerName ));
状态变更时:java eventPublisher.publishEvent(new TaskStatusChangedEvent( this, taskId, taskCode, oldStatus, newStatus, oldStatusDesc, newStatusDesc, assigneeIds, creatorId ));
| 对比项 | 直接调用 | 事件驱动 |
|---|---|---|
| 耦合度 | 强耦合 | 完全解耦 |
| 依赖关系 | 需要注入Service | 只需发布事件 |
| 修改影响 | 影响业务代码 | 不影响业务代码 |
| 扩展性 | 需修改业务代码 | 只需添加监听器 |
同步方式:
业务处理 → 保存消息 → 返回结果
总耗时 = 业务耗时 + 消息耗时
异步方式:
业务处理 → 返回结果
↓
发布事件 → 异步保存消息(不影响主流程)
总耗时 ≈ 业务耗时
其他系统需要触发消息推送时,只需:
// 1. 发布事件
eventPublisher.publishEvent(new TaskCreatedEvent(...));
// 2. 无需关心消息如何保存
// 3. 无需修改任何现有代码
@Service
public class OrderServiceImpl {
@Autowired
private ApplicationEventPublisher eventPublisher;
public void createOrder(Order order) {
// 1. 业务逻辑
orderMapper.insert(order);
// 2. 发布事件(推送消息)
eventPublisher.publishEvent(new OrderCreatedEvent(
this, order.getOrderId(), order.getOrderNo(), order.getCreatorId()
));
}
}
@Service
public class ApprovalServiceImpl {
@Autowired
private ApplicationEventPublisher eventPublisher;
public void approveTask(Long taskId, String result) {
// 1. 业务逻辑
approvalMapper.updateStatus(taskId, result);
// 2. 发布事件(推送消息)
eventPublisher.publishEvent(new ApprovalCompletedEvent(
this, taskId, result, approverId, applicantId
));
}
}
package com.ruoyi.system.event;
public class OrderCreatedEvent extends ApplicationEvent {
private Long orderId;
private String orderNo;
private Long creatorId;
public OrderCreatedEvent(Object source, Long orderId, String orderNo, Long creatorId) {
super(source);
this.orderId = orderId;
this.orderNo = orderNo;
this.creatorId = creatorId;
}
// getters and setters
}
@Component
public class OrderMessageListener {
@Autowired
private SysMessageMapper sysMessageMapper;
@Async
@EventListener
public void handleOrderCreatedEvent(OrderCreatedEvent event) {
// 保存消息
SysMessage message = new SysMessage();
message.setMessageType("ORDER_CREATE");
message.setMessageTitle("订单创建成功");
message.setMessageContent("您的订单已创建成功");
// ... 设置其他字段
sysMessageMapper.insertSysMessage(message);
}
}
eventPublisher.publishEvent(new OrderCreatedEvent(this, orderId, orderNo, creatorId));
根据实际负载调整线程池参数:
// 高并发场景
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(200);
// 低并发场景
executor.setCorePoolSize(3);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(50);
如果同时有大量消息,可以批量保存:
@Async
@EventListener
public void handleTaskAssignedEvent(TaskAssignedEvent event) {
List<SysMessage> messages = new ArrayList<>();
for (Long assigneeId : event.getAssigneeIds()) {
SysMessage message = new SysMessage();
// ... 设置字段
messages.add(message);
}
// 批量插入
sysMessageMapper.batchInsert(messages);
}
避免重复发送消息:
@Async
@EventListener
public void handleTaskStatusChangedEvent(TaskStatusChangedEvent event) {
// 使用Set去重
Set<Long> receiverIds = new HashSet<>();
receiverIds.addAll(event.getAssigneeIds());
receiverIds.add(event.getCreatorId());
for (Long receiverId : receiverIds) {
// 保存消息
}
}
2025-10-25 14:30:15.123 [async-task-1] INFO TaskMessageListener - 收到任务创建事件,任务ID:1001,任务编号:TASK-20251025-001
2025-10-25 14:30:15.234 [async-task-1] INFO TaskMessageListener - 任务创建消息已保存,消息ID:5001
2025-10-25 14:30:15.345 [async-task-2] INFO TaskMessageListener - 收到任务分配事件,任务ID:1001,执行人数量:3
2025-10-25 14:30:15.456 [async-task-2] INFO TaskMessageListener - 任务分配消息已保存,消息ID:5002,接收人:张三
@Component
public class ThreadPoolMonitor {
@Autowired
@Qualifier("taskExecutor")
private ThreadPoolTaskExecutor executor;
@Scheduled(fixedDelay = 60000) // 每分钟
public void monitor() {
log.info("线程池状态 - 活跃线程:{},队列大小:{},完成任务:{}",
executor.getActiveCount(),
executor.getThreadPoolExecutor().getQueue().size(),
executor.getThreadPoolExecutor().getCompletedTaskCount());
}
}
事件发布应该在事务提交之后:
@Transactional
public int insertSysTask(TaskCreateVO createVO) {
// 1. 保存任务
int result = sysTaskMapper.insertSysTask(task);
// 2. 在事务内发布事件(事务提交后才会触发监听器)
if (result > 0) {
eventPublisher.publishEvent(new TaskCreatedEvent(...));
}
return result;
}
监听器中的异常不会影响主业务:
@Async
@EventListener
public void handleTaskCreatedEvent(TaskCreatedEvent event) {
try {
// 处理事件
} catch (Exception e) {
log.error("处理任务创建事件失败", e);
// 不向上抛出异常
}
}
异步事件不保证执行顺序:
// 这两个事件可能以任意顺序执行
eventPublisher.publishEvent(event1);
eventPublisher.publishEvent(event2);
如需保证顺序,使用同步监听器:
@EventListener // 不使用@Async
public void handleEvent(Event event) {
// 同步执行,保证顺序
}
| 特性 | 直接调用Service | 事件驱动 |
|---|---|---|
| 代码耦合度 | 高 | 低 |
| 性能影响 | 同步阻塞 | 异步非阻塞 |
| 扩展性 | 需修改代码 | 只需添加监听器 |
| 可测试性 | 需模拟Service | 只需发布事件 |
| 可维护性 | 业务代码混杂 | 职责清晰分离 |
| 学习成本 | 低 | 中 |
| 适用场景 | 简单项目 | 复杂/大型项目 |
通过引入Spring事件驱动架构,我们成功实现了:
这种架构特别适合大型项目和需要高扩展性的系统!
更新时间: 2025-10-25
版本: v2.0
架构: 事件驱动