作者:田燕青 职位:后期工程师
延时队列,顾名思义,就是元素在入队列时,会指定一个延时时间,期望在经过指定时间后再处理该元素。
延时队列适用的场景有很多:
即用户在电商平台下单后没有立即支付,等超过指定时间后订单自动关闭。
对于异步接口来说,如果给调用方回调时,由于网络不通或其他原因导致回调失败时,我们可以采用延时策略对调用方的回调接口进行重试。为了避免因网络抖动或其他原因造成的回调失败,我们可以采用的延时策略为 1min 5 min 10 min 30 min 1hour 等间隔进行回调。
比如我们用的lark会议,在会议开始前10分钟对参会人进行提醒,这个功能也可以采用延时队列来实现。
比如用户下单未支付时,系统在关单前10分钟提醒用户去支付。比如我曾做过的二手车的一个需求就是提醒买手尽快出价等。
这些场景都用到了延时队列,其实上述场景采用定时任务也能实现,但是相比于定时任务,延时队的时间把控更精准,延时队列不用扫描库表,对系统消耗更少。
•DelayQueue,这个是jdk自带的一种延时队列,位于java.util.concurrent 包下,它是一个有界的阻塞队列,它内部封装了一个 PriorityQueue(优先队列)
PriorityQueue 内部使用完全二叉堆来实现队列元素排序,当向 DelayQueue 队列中添加元素时,会给元素一个 Delay(延迟时间)作为排序条件,队列中最小的元素会优先放在队首。队列中的元素只有到了 Delay 时间才允许从队列中取出。有兴趣的同学可以详细查看源码。
•Quartz 定时任务,对时间精准度要求不高,数据量较小的任务,可以采用定时任务替代延时队列。
•redis 过期回调,我们可以开启监听key 是否过期的事件,一旦key 过期会触发一个callback 事件。这样我们就能通过设置key的过期时间,来实现延时队列的效果。
•redis sorted set,主要是利用 zset 的score 属性,我们将延时时间转成(当前时间+延时时间)(时间单位毫秒)作为scroe 属性。然后开启一个消费线程轮训redis队列,当score属性的值小于当前时间时,证明延时消息到期,可以进行消费。
•Mq,通过rabbitMq 或 rocketMq 可以实现延时队列,具体实现方式省略。
•时间轮,基于kafka 或 netty 的时间轮算法来实现延时队列,实现方式省略,有兴趣的同学可以自己查看。
实现延时队列的方式有多种,可以采用我们熟悉的方式实现自己的延时队列。一般延时队列可以作为基础公共服务提供给全公司使用,这种方式需要独立维护一个项目,对qps稳定性,数据一致性,精度等要求更高,可以采用时间轮算法实现。
我在项目中遇到一个使用延时队列的场景,因为项目中使用了redis,所以我用redis 实现了一个延时队列来满足需求,下面介绍下如何用redis 实现一个延时队列。
通过 redis 实现延时队列有两种方式,第一种是redis 的过期回调,但是这种方式需要修改redis 的配置文件并重启服务,这对于我们正在使用的 redis 服务来说比较困难。
所以我们用第二种方式来实现一个延时队列:
public class RedisDelayQueueServiceImpl implements RedisDelayQueueService {
@Autowired
private RedissonClient redissonClient;
/**
* 新增一个延时任务,添加job元信息
*
* @param job 元信息,job中包含 taskId 主键,topicId 队列id,retryNum 重试次数,delaytime 延时时间 等,此处不展开。
*/
@Override
public void addJob(Job job) {
//添加分布式锁,防止重复添加任务
RLock lock = redissonClient.getLock(RedisQueueKey.ADD_JOB_LOCK + job.getJobId());
try {
boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
throw new BizException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL.getInfo());
}
String topicId = RedisQueueKey.getTopicId(job.getTopic(), job.getJobId());
// 1. 将job添加到 JobPool中,jobpool 作为一个全局索引,所有未执行任务都存在jobPool 中。
RMap<String, Job> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
if (jobPool.get(topicId) != null) {
throw new BizException(ErrorMessageEnum.JOB_ALREADY_EXIST.getInfo());
}
jobPool.put(topicId, job);
// 2. 将job添加到 DelayBucket中,按延时时间进行排序
RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
delayBucket.add(job.getDelay(), topicId);
} catch (InterruptedException e) {
log.error("addJob error", e);
throw new BizException("add delay job error,reason:" + e.getMessage());
} finally {
if (lock != null) {
lock.unlock();
}
}
}
/**
* 删除job信息,为什么要删除job 信息?
* 当我们确信上一个延时任务没有必要执行时,我们可以提前取消延时任务的执行。
*
* @param jobDie 元信息
*/
@Override
public void deleteJob(JobDie jobDie) {
RLock lock = redissonClient.getLock(RedisQueueKey.DELETE_JOB_LOCK + jobDie.getJobId());
try {
boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
throw new BizException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL.getInfo());
}
String topicId = RedisQueueKey.getTopicId(jobDie.getTopic(), jobDie.getJobId());
//从全局索引中删除任务。
RMap<String, Job> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
jobPool.remove(topicId);
//从zset 中删除任务
RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
delayBucket.remove(topicId);
} catch (InterruptedException e) {
log.error("addJob error", e);
throw new BizException("delete job error,reason:" + e.getMessage());
} finally {
if (lock != null) {
lock.unlock();
}
}
}
}
搬运线程的目的是将 zset 中已经到期的任务搬运到消费队列中,消费队列中的任务会被消费线程消费。之所以会增加一个消费队列,是考虑到我们的消费能力和数据安全,如果消费能力比较弱,可能会造成消费线程阻塞,或者数据丢失。我们把到期任务放到一个阻塞队列中,可以让消费线程顺序消费。
这个地方还能继续优化,比如可以落库,可以建立多个阻塞队列,每个阻塞队列可以指定一个线程池进行消费等。
@Slf4j
@Component
public class CarryJobScheduled {
@Autowired
private RedissonClient redissonClient;
/**
* 启动定时开启搬运JOB信息
*/
@Scheduled(cron = "*/1 * * * * *")
public void carryJobToQueue() {
//System.out.println("carryJobToQueue --->");
RLock lock = redissonClient.getLock(RedisQueueKey.CARRY_THREAD_LOCK);
try {
boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
throw new BizException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL.getInfo());
}
// 将到期的任务取出来
RScoredSortedSet<Object> bucketSet = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
long now = System.currentTimeMillis();
Collection<Object> jobCollection = bucketSet.valueRange(0, false, now, true);
if (CollectionUtils.isEmpty(jobCollection)) {
return;
}
// 将到期的任务搬运到消费队列中,zset 中的任务删除
List<String> jobList = jobCollection.stream().map(String::valueOf).collect(Collectors.toList());
RList<String> readyQueue = redissonClient.getList(RedisQueueKey.RD_LIST_TOPIC_PRE);
if (CollectionUtils.isEmpty(jobList)) {
return;
}
if (readyQueue.addAll(jobList)) {
bucketSet.removeAllAsync(jobList);
}
} catch (InterruptedException e) {
log.error("carryJobToQueue error", e);
} finally {
if (lock != null) {
lock.unlock();
}
}
}
}
开启一个消费线程,消费线程会消费阻塞队列中的到期任务,其中 ConsumerService 可以采用策略模式,根据不同的topic 进行不同的业务处理:
@Slf4j
@Component
public class ReadyQueueContext {
@Autowired
private RedissonClient redissonClient;
@Autowired
private ConsumerService consumerService;
/**
* TOPIC消费
*/
@PostConstruct
public void startTopicConsumer() {
TaskManager.doTask(this::runTopicThreads, "开启TOPIC消费线程");
}
/**
* 开启TOPIC消费线程
* 将所有可能出现的异常全部catch住,确保While(true)能够不中断
*/
@SuppressWarnings("InfiniteLoopStatement")
private void runTopicThreads() {
while (true) {
RLock lock = null;
try {
lock = redissonClient.getLock(RedisQueueKey.CONSUMER_TOPIC_LOCK);
} catch (Exception e) {
log.error("runTopicThreads getLock error", e);
}
try {
if (lock == null) {
continue;
}
// 分布式锁时间比Blpop阻塞时间多1S,避免出现释放锁的时候,锁已经超时释放,unlock报错
boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
continue;
}
// 1. 获取ReadyQueue中待消费的数据
RBlockingQueue<String> queue = redissonClient.getBlockingQueue(RedisQueueKey.RD_LIST_TOPIC_PRE);
String topicId = queue.poll(60, TimeUnit.SECONDS);
if (StringUtils.isEmpty(topicId)) {
continue;
}
// 2. 获取job元信息内容
RMap<String, Job> jobPoolMap = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
Job job = jobPoolMap.get(topicId);
// 3. 消费
FutureTask<Boolean> taskResult = TaskManager.doFutureTask(() -> consumerService.consumerMessage(job.getJobId(),job.getTopic(), job.getBody()), job.getTopic() + "-->消费JobId-->" + job.getJobId());
if (taskResult.get()) {
// 3.1 消费成功,删除JobPool和DelayBucket的job信息
jobPoolMap.remove(topicId);
} else {
/**
* 重试次数为0就直接返回
*/
if (job.getRetry() == 0) {
return;
}
int retrySum = job.getRetry() + 1;
// 3.2 消费失败,则根据策略重新加入Bucket
// 如果重试次数大于5,则将jobPool中的数据删除,持久化到DB
if (retrySum > RetryStrategyEnum.RETRY_FIVE.getRetry()) {
jobPoolMap.remove(topicId);
continue;
}
job.setRetry(retrySum);
long nextTime = job.getDelay() + RetryStrategyEnum.getDelayTime(job.getRetry()) * 1000;
// log.info("next retryTime is [{}]", DateUtil.long2Str(nextTime));
RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
delayBucket.add(nextTime, topicId);
// 3.3 更新元信息失败次数
jobPoolMap.put(topicId, job);
}
} catch (Exception e) {
log.error("runTopicThreads error", e);
} finally {
if (lock != null) {
try {
lock.unlock();
} catch (Exception e) {
log.error("runTopicThreads unlock error", e);
}
}
}
}
}