### 1. 延迟队列 简介 > 使用场景:30分钟后通知用户会议开始,订单未支付15分钟后自动关闭。 ### 2.用前须知 延迟队列,本身是**不可靠**的,在部分情况下,消息可能丢失,所以,需要做一个定时任务来做消息补偿。 Q:既然不可靠,为什么要用它? A1:只有在例如你正在重启服务的时候,消息可能丢失,而且由于业务系统多半是集群,不会同时重启,它有多可靠取决于你redis和业务系统是否可靠。这里说它不可靠,是因为它是没有ACK机制的。 A2:使用延迟队列,可控制到秒级的通知,比秒级定时任务要节约很多资源。而后备的消息补偿定时任务就可以做到分钟级(相当于是不可靠时的服务降级)。 ### 3. 基本原理(可跳过) 系统默认给出的是基于redis实现的延迟队列。其核心利用的是redis键失效通知。 ```java @Override public Boolean publishTask(Integer code, String value, Integer delay) { if (delay < 0) { delay = 1; } cacheComponent.putRaw(assembleKey(code, value), "", delay); return true; } ``` 当业务层发布任务时,会将code(消息类型),value(消息体)组装成key,放入redis中。并且指定过期时间为delay秒。 当key过期时,redis会通知到所有的连接,key已经过期。我们可直接通过Spring提供的redis MessageListener来监听该事件。 ```java package org.springframework.data.redis.connection; import org.springframework.lang.Nullable; public interface MessageListener { /** * Callback for processing received objects through Redis. * * @param message message must not be {@literal null}. * @param pattern pattern matching the channel (if specified) - can be {@literal null}. */ void onMessage(Message message, @Nullable byte[] pattern); } ``` 接到通知时,我们将key解析,然后路由到相关的handler上进行处理即可。 ### 4. 接口 DelayedMessageQueue 对业务系统提供调用的API 您也可以通过实现此接口,打造自己的延迟队列。篇幅有限,更多注释参照Java类。 ```java public interface DelayedMessageQueue { ... public Boolean publishTask(Integer code, String value, Integer delay); ... public Boolean deleteTask(Integer code, String value); /** * 获取指定任务还有多少时间执行,如果不存在,返回-2 */ public Long getTaskTime(Integer code, String value); } ``` 延迟队列消息处理器,getCode需要返回业务系统内独一无二的处理器编号。handle则是具体的业务处理,由业务系统决定。 ```java /** * 延迟消息处理器 */ public interface DelayedMessageHandler { /** * * @param value * @return 处理成功的返回大于0结果,失败返回0 */ public int handle(String value); public int getCode(); } ``` ### 5. 使用方法 #### 5.1. 启用 ```java @EnableDelayedMQ public class PromallApplication { // 在启动类上注解上,开启延迟队列的注解 } ``` #### 5.2. 编写Handler ```java public class OrderAutoCancelHandler implements DelayedMessageHandler { @Override @Transactional(rollbackFor = Exception.class) public int handle(String orderNo) { try { OrderDO orderDO = orderBizService.checkOrderExistByNo(orderNo, null).get(0); ... } @Override public int getCode() { return DMQHandlerType.ORDER_AUTO_CANCEL.getCode(); } } ``` #### 5.3. 将Handler全部加入IoC ```java public class DelayedMessageHandlerConfig { @Bean public OrderAutoCancelHandler orderAutoCancelHandler() { return new OrderAutoCancelHandler(); } @Bean public OrderAutoConfirmHandler orderAutoConfirmHandler() { return new OrderAutoConfirmHandler(); } ... ``` #### 5.4. 发送延迟消息 & 中途取消任务 ```java @Autowired private DelayedMessageQueue delayedMessageQueue; ... // 发送延迟消息 delayedMessageQueue.publishTask(DMQHandlerType.ORDER_AUTO_CANCEL.getCode(), childOrderNo, unimallOrderProperties.getAutoCancelTime().intValue()); ... // 中途取消 delayedMessageQueue.deleteTask(DMQHandlerType.ORDER_AUTO_CANCEL.getCode(), orderNo); ``` ### 6. 特别注意 消息体value的长度不能过长,不能超过redis的限制,否则会报错,若有大对象需要传递,建议是放在缓存或数据库中,将缓存的Key或数据的主键传过去。