使用场景:30分钟后通知用户会议开始,订单未支付15分钟后自动关闭。
延迟队列,本身是**不可靠**的,在部分情况下,消息可能丢失,所以,需要做一个定时任务来做消息补偿。
Q:既然不可靠,为什么要用它?
A1:只有在例如你正在重启服务的时候,消息可能丢失,而且由于业务系统多半是集群,不会同时重启,它有多可靠取决于你redis和业务系统是否可靠。这里说它不可靠,是因为它是没有ACK机制的。
A2:使用延迟队列,可控制到秒级的通知,比秒级定时任务要节约很多资源。而后备的消息补偿定时任务就可以做到分钟级(相当于是不可靠时的服务降级)。
系统默认给出的是基于redis实现的延迟队列。其核心利用的是redis键失效通知。
@Override
public Boolean publishTask(Integer code, String value, Integer delay) {
if (delay < 0) {
delay = 1;
}
cacheComponent.putRaw(assembleKey(code, value), "", delay);
return true;
}
当业务层发布任务时,会将code(消息类型),value(消息体)组装成key,放入redis中。并且指定过期时间为delay秒。
当key过期时,redis会通知到所有的连接,key已经过期。我们可直接通过Spring提供的redis MessageListener来监听该事件。
package org.springframework.data.redis.connection;
import org.springframework.lang.Nullable;
public interface MessageListener {
/**
* Callback for processing received objects through Redis.
*
* @param message message must not be {@literal null}.
* @param pattern pattern matching the channel (if specified) - can be {@literal null}.
*/
void onMessage(Message message, @Nullable byte[] pattern);
}
接到通知时,我们将key解析,然后路由到相关的handler上进行处理即可。
DelayedMessageQueue 对业务系统提供调用的API 您也可以通过实现此接口,打造自己的延迟队列。篇幅有限,更多注释参照Java类。
public interface DelayedMessageQueue {
...
public Boolean publishTask(Integer code, String value, Integer delay);
...
public Boolean deleteTask(Integer code, String value);
/**
* 获取指定任务还有多少时间执行,如果不存在,返回-2
*/
public Long getTaskTime(Integer code, String value);
}
延迟队列消息处理器,getCode需要返回业务系统内独一无二的处理器编号。handle则是具体的业务处理,由业务系统决定。
/**
* 延迟消息处理器
*/
public interface DelayedMessageHandler {
/**
*
* @param value
* @return 处理成功的返回大于0结果,失败返回0
*/
public int handle(String value);
public int getCode();
}
@EnableDelayedMQ
public class PromallApplication {
// 在启动类上注解上,开启延迟队列的注解
}
public class OrderAutoCancelHandler implements DelayedMessageHandler {
@Override
@Transactional(rollbackFor = Exception.class)
public int handle(String orderNo) {
try {
OrderDO orderDO = orderBizService.checkOrderExistByNo(orderNo, null).get(0);
...
}
@Override
public int getCode() {
return DMQHandlerType.ORDER_AUTO_CANCEL.getCode();
}
}
public class DelayedMessageHandlerConfig {
@Bean
public OrderAutoCancelHandler orderAutoCancelHandler() {
return new OrderAutoCancelHandler();
}
@Bean
public OrderAutoConfirmHandler orderAutoConfirmHandler() {
return new OrderAutoConfirmHandler();
}
...
@Autowired
private DelayedMessageQueue delayedMessageQueue;
...
// 发送延迟消息
delayedMessageQueue.publishTask(DMQHandlerType.ORDER_AUTO_CANCEL.getCode(), childOrderNo, unimallOrderProperties.getAutoCancelTime().intValue());
...
// 中途取消
delayedMessageQueue.deleteTask(DMQHandlerType.ORDER_AUTO_CANCEL.getCode(), orderNo);
消息体value的长度不能过长,不能超过redis的限制,否则会报错,若有大对象需要传递,建议是放在缓存或数据库中,将缓存的Key或数据的主键传过去。