编辑 | blame | 历史 | 原始文档

1. 延迟队列 简介

使用场景:30分钟后通知用户会议开始,订单未支付15分钟后自动关闭。

2.用前须知

延迟队列,本身是**不可靠**的,在部分情况下,消息可能丢失,所以,需要做一个定时任务来做消息补偿。

Q:既然不可靠,为什么要用它?

A1:只有在例如你正在重启服务的时候,消息可能丢失,而且由于业务系统多半是集群,不会同时重启,它有多可靠取决于你redis和业务系统是否可靠。这里说它不可靠,是因为它是没有ACK机制的。

A2:使用延迟队列,可控制到秒级的通知,比秒级定时任务要节约很多资源。而后备的消息补偿定时任务就可以做到分钟级(相当于是不可靠时的服务降级)。

3. 基本原理(可跳过)

系统默认给出的是基于redis实现的延迟队列。其核心利用的是redis键失效通知。

@Override
public Boolean publishTask(Integer code, String value, Integer delay) {
    if (delay < 0) {
        delay = 1;
    }
    cacheComponent.putRaw(assembleKey(code, value), "", delay);
    return true;
}

当业务层发布任务时,会将code(消息类型),value(消息体)组装成key,放入redis中。并且指定过期时间为delay秒。

当key过期时,redis会通知到所有的连接,key已经过期。我们可直接通过Spring提供的redis MessageListener来监听该事件。

package org.springframework.data.redis.connection;

import org.springframework.lang.Nullable;

public interface MessageListener {

   /**
    * Callback for processing received objects through Redis.
    *
    * @param message message must not be {@literal null}.
    * @param pattern pattern matching the channel (if specified) - can be {@literal null}.
    */
   void onMessage(Message message, @Nullable byte[] pattern);
}

接到通知时,我们将key解析,然后路由到相关的handler上进行处理即可。

4. 接口

DelayedMessageQueue 对业务系统提供调用的API 您也可以通过实现此接口,打造自己的延迟队列。篇幅有限,更多注释参照Java类。

public interface DelayedMessageQueue {
	
	...
    public Boolean publishTask(Integer code, String value, Integer delay);
	...
    public Boolean deleteTask(Integer code, String value);

    /**
     * 获取指定任务还有多少时间执行,如果不存在,返回-2
     */
    public Long getTaskTime(Integer code, String value);
}

延迟队列消息处理器,getCode需要返回业务系统内独一无二的处理器编号。handle则是具体的业务处理,由业务系统决定。

/**
 * 延迟消息处理器
 */
public interface DelayedMessageHandler {

    /**
     *
     * @param value
     * @return 处理成功的返回大于0结果,失败返回0
     */
    public int handle(String value);

    public int getCode();
}

5. 使用方法

5.1. 启用

@EnableDelayedMQ
public class PromallApplication {
    // 在启动类上注解上,开启延迟队列的注解
}

5.2. 编写Handler

public class OrderAutoCancelHandler implements DelayedMessageHandler {
    @Override
    @Transactional(rollbackFor = Exception.class)
    public int handle(String orderNo) {
    	try {
    		OrderDO orderDO = orderBizService.checkOrderExistByNo(orderNo, null).get(0);
    	...
    }
    
    @Override
    public int getCode() {
        return DMQHandlerType.ORDER_AUTO_CANCEL.getCode();
    }
    
}

5.3. 将Handler全部加入IoC

public class DelayedMessageHandlerConfig {

    @Bean
    public OrderAutoCancelHandler orderAutoCancelHandler() {
        return new OrderAutoCancelHandler();
    }

    @Bean
    public OrderAutoConfirmHandler orderAutoConfirmHandler() {
        return new OrderAutoConfirmHandler();
    }
    ...

5.4. 发送延迟消息 & 中途取消任务

@Autowired
private DelayedMessageQueue delayedMessageQueue;
...
// 发送延迟消息
delayedMessageQueue.publishTask(DMQHandlerType.ORDER_AUTO_CANCEL.getCode(), childOrderNo, unimallOrderProperties.getAutoCancelTime().intValue());
...
// 中途取消
delayedMessageQueue.deleteTask(DMQHandlerType.ORDER_AUTO_CANCEL.getCode(), orderNo);

6. 特别注意

消息体value的长度不能过长,不能超过redis的限制,否则会报错,若有大对象需要传递,建议是放在缓存或数据库中,将缓存的Key或数据的主键传过去。