作者:田燕青 职位:后期工程师
延时队列,顾名思义,就是元素在入队列时,会指定一个延时时间,期望在经过指定时间后再处理该元素。
延时队列适用的场景有很多:
即用户在电商平台下单后没有立即支付,等超过指定时间后订单自动关闭。
对于异步接口来说,如果给调用方回调时,由于网络不通或其他原因导致回调失败时,我们可以采用延时策略对调用方的回调接口进行重试。为了避免因网络抖动或其他原因造成的回调失败,我们可以采用的延时策略为 1min 5 min 10 min 30 min 1hour 等间隔进行回调。
比如我们用的lark会议,在会议开始前10分钟对参会人进行提醒,这个功能也可以采用延时队列来实现。
比如用户下单未支付时,系统在关单前10分钟提醒用户去支付。比如我曾做过的二手车的一个需求就是提醒买手尽快出价等。
这些场景都用到了延时队列,其实上述场景采用定时任务也能实现,但是相比于定时任务,延时队的时间把控更精准,延时队列不用扫描库表,对系统消耗更少。
•DelayQueue,这个是jdk自带的一种延时队列,位于java.util.concurrent 包下,它是一个有界的阻塞队列,它内部封装了一个 PriorityQueue(优先队列)
PriorityQueue 内部使用完全二叉堆来实现队列元素排序,当向 DelayQueue 队列中添加元素时,会给元素一个 Delay(延迟时间)作为排序条件,队列中最小的元素会优先放在队首。队列中的元素只有到了 Delay 时间才允许从队列中取出。有兴趣的同学可以详细查看源码。
•Quartz 定时任务,对时间精准度要求不高,数据量较小的任务,可以采用定时任务替代延时队列。
•redis 过期回调,我们可以开启监听key 是否过期的事件,一旦key 过期会触发一个callback 事件。这样我们就能通过设置key的过期时间,来实现延时队列的效果。
•redis sorted set,主要是利用 zset 的score 属性,我们将延时时间转成(当前时间+延时时间)(时间单位毫秒)作为scroe 属性。然后开启一个消费线程轮训redis队列,当score属性的值小于当前时间时,证明延时消息到期,可以进行消费。
•Mq,通过rabbitMq 或 rocketMq 可以实现延时队列,具体实现方式省略。
•时间轮,基于kafka 或 netty 的时间轮算法来实现延时队列,实现方式省略,有兴趣的同学可以自己查看。
实现延时队列的方式有多种,可以采用我们熟悉的方式实现自己的延时队列。一般延时队列可以作为基础公共服务提供给全公司使用,这种方式需要独立维护一个项目,对qps稳定性,数据一致性,精度等要求更高,可以采用时间轮算法实现。
我在项目中遇到一个使用延时队列的场景,因为项目中使用了redis,所以我用redis 实现了一个延时队列来满足需求,下面介绍下如何用redis 实现一个延时队列。
通过 redis 实现延时队列有两种方式,第一种是redis 的过期回调,但是这种方式需要修改redis 的配置文件并重启服务,这对于我们正在使用的 redis 服务来说比较困难。
所以我们用第二种方式来实现一个延时队列:



public class RedisDelayQueueServiceImpl implements RedisDelayQueueService { @Autowiredprivate RedissonClient redissonClient; /*** 新增一个延时任务,添加job元信息** @param job 元信息,job中包含 taskId 主键,topicId 队列id,retryNum 重试次数,delaytime 延时时间 等,此处不展开。*/@Overridepublic void addJob(Job job) {//添加分布式锁,防止重复添加任务RLock lock = redissonClient.getLock(RedisQueueKey.ADD_JOB_LOCK + job.getJobId());try {boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS);if (!lockFlag) {throw new BizException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL.getInfo());}String topicId = RedisQueueKey.getTopicId(job.getTopic(), job.getJobId()); // 1. 将job添加到 JobPool中,jobpool 作为一个全局索引,所有未执行任务都存在jobPool 中。RMap<String, Job> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);if (jobPool.get(topicId) != null) {throw new BizException(ErrorMessageEnum.JOB_ALREADY_EXIST.getInfo());} jobPool.put(topicId, job); // 2. 将job添加到 DelayBucket中,按延时时间进行排序RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);delayBucket.add(job.getDelay(), topicId);} catch (InterruptedException e) {log.error("addJob error", e);throw new BizException("add delay job error,reason:" + e.getMessage());} finally {if (lock != null) {lock.unlock();}}} /*** 删除job信息,为什么要删除job 信息?* 当我们确信上一个延时任务没有必要执行时,我们可以提前取消延时任务的执行。** @param jobDie 元信息*/@Overridepublic void deleteJob(JobDie jobDie) { RLock lock = redissonClient.getLock(RedisQueueKey.DELETE_JOB_LOCK + jobDie.getJobId());try {boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS);if (!lockFlag) {throw new BizException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL.getInfo());}String topicId = RedisQueueKey.getTopicId(jobDie.getTopic(), jobDie.getJobId());//从全局索引中删除任务。RMap<String, Job> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);jobPool.remove(topicId);//从zset 中删除任务RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);delayBucket.remove(topicId);} catch (InterruptedException e) {log.error("addJob error", e);throw new BizException("delete job error,reason:" + e.getMessage());} finally {if (lock != null) {lock.unlock();}}}}
搬运线程的目的是将 zset 中已经到期的任务搬运到消费队列中,消费队列中的任务会被消费线程消费。之所以会增加一个消费队列,是考虑到我们的消费能力和数据安全,如果消费能力比较弱,可能会造成消费线程阻塞,或者数据丢失。我们把到期任务放到一个阻塞队列中,可以让消费线程顺序消费。
这个地方还能继续优化,比如可以落库,可以建立多个阻塞队列,每个阻塞队列可以指定一个线程池进行消费等。
@Slf4j@Componentpublic class CarryJobScheduled { @Autowiredprivate RedissonClient redissonClient; /*** 启动定时开启搬运JOB信息*/@Scheduled(cron = "*/1 * * * * *")public void carryJobToQueue() {//System.out.println("carryJobToQueue --->");RLock lock = redissonClient.getLock(RedisQueueKey.CARRY_THREAD_LOCK);try {boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS);if (!lockFlag) {throw new BizException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL.getInfo());}// 将到期的任务取出来RScoredSortedSet<Object> bucketSet = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);long now = System.currentTimeMillis();Collection<Object> jobCollection = bucketSet.valueRange(0, false, now, true);if (CollectionUtils.isEmpty(jobCollection)) {return;} // 将到期的任务搬运到消费队列中,zset 中的任务删除List<String> jobList = jobCollection.stream().map(String::valueOf).collect(Collectors.toList());RList<String> readyQueue = redissonClient.getList(RedisQueueKey.RD_LIST_TOPIC_PRE); if (CollectionUtils.isEmpty(jobList)) {return;}if (readyQueue.addAll(jobList)) {bucketSet.removeAllAsync(jobList);} } catch (InterruptedException e) {log.error("carryJobToQueue error", e);} finally {if (lock != null) {lock.unlock();}}}}
开启一个消费线程,消费线程会消费阻塞队列中的到期任务,其中 ConsumerService 可以采用策略模式,根据不同的topic 进行不同的业务处理:
@Slf4j@Componentpublic class ReadyQueueContext { @Autowiredprivate RedissonClient redissonClient; @Autowiredprivate ConsumerService consumerService; /*** TOPIC消费*/@PostConstructpublic void startTopicConsumer() {TaskManager.doTask(this::runTopicThreads, "开启TOPIC消费线程");} /*** 开启TOPIC消费线程* 将所有可能出现的异常全部catch住,确保While(true)能够不中断*/@SuppressWarnings("InfiniteLoopStatement")private void runTopicThreads() {while (true) {RLock lock = null;try {lock = redissonClient.getLock(RedisQueueKey.CONSUMER_TOPIC_LOCK);} catch (Exception e) {log.error("runTopicThreads getLock error", e);}try {if (lock == null) {continue;}// 分布式锁时间比Blpop阻塞时间多1S,避免出现释放锁的时候,锁已经超时释放,unlock报错boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS);if (!lockFlag) {continue;} // 1. 获取ReadyQueue中待消费的数据RBlockingQueue<String> queue = redissonClient.getBlockingQueue(RedisQueueKey.RD_LIST_TOPIC_PRE);String topicId = queue.poll(60, TimeUnit.SECONDS);if (StringUtils.isEmpty(topicId)) {continue;} // 2. 获取job元信息内容RMap<String, Job> jobPoolMap = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);Job job = jobPoolMap.get(topicId); // 3. 消费FutureTask<Boolean> taskResult = TaskManager.doFutureTask(() -> consumerService.consumerMessage(job.getJobId(),job.getTopic(), job.getBody()), job.getTopic() + "-->消费JobId-->" + job.getJobId());if (taskResult.get()) {// 3.1 消费成功,删除JobPool和DelayBucket的job信息jobPoolMap.remove(topicId);} else {/*** 重试次数为0就直接返回*/if (job.getRetry() == 0) {return;}int retrySum = job.getRetry() + 1;// 3.2 消费失败,则根据策略重新加入Bucket // 如果重试次数大于5,则将jobPool中的数据删除,持久化到DBif (retrySum > RetryStrategyEnum.RETRY_FIVE.getRetry()) {jobPoolMap.remove(topicId);continue;}job.setRetry(retrySum);long nextTime = job.getDelay() + RetryStrategyEnum.getDelayTime(job.getRetry()) * 1000;// log.info("next retryTime is [{}]", DateUtil.long2Str(nextTime));RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);delayBucket.add(nextTime, topicId);// 3.3 更新元信息失败次数jobPoolMap.put(topicId, job);}} catch (Exception e) {log.error("runTopicThreads error", e);} finally {if (lock != null) {try {lock.unlock();} catch (Exception e) {log.error("runTopicThreads unlock error", e);}}}}}