如何实现一个延时队列 ?

作者:田燕青  职位:后期工程师

一、什么是延时队列

延时队列,顾名思义,就是元素在入队列时,会指定一个延时时间,期望在经过指定时间后再处理该元素。

二、延时队列适用的场景?

延时队列适用的场景有很多:

2.1比如超时关单:

即用户在电商平台下单后没有立即支付,等超过指定时间后订单自动关闭。

2.2 比如回调重试:

对于异步接口来说,如果给调用方回调时,由于网络不通或其他原因导致回调失败时,我们可以采用延时策略对调用方的回调接口进行重试。为了避免因网络抖动或其他原因造成的回调失败,我们可以采用的延时策略为 1min  5 min  10 min  30 min  1hour 等间隔进行回调。

2.3 会议提醒:

比如我们用的lark会议,在会议开始前10分钟对参会人进行提醒,这个功能也可以采用延时队列来实现。

2.4 各种延时提醒:

比如用户下单未支付时,系统在关单前10分钟提醒用户去支付。比如我曾做过的二手车的一个需求就是提醒买手尽快出价等。

这些场景都用到了延时队列,其实上述场景采用定时任务也能实现,但是相比于定时任务,延时队的时间把控更精准,延时队列不用扫描库表,对系统消耗更少。

三、延时队列的实现方式?

•DelayQueue这个是jdk自带的一种延时队列,位于java.util.concurrent 包下,它是一个有界的阻塞队列,它内部封装了一个 PriorityQueue(优先队列)

PriorityQueue 内部使用完全二叉堆来实现队列元素排序,当向 DelayQueue 队列中添加元素时,会给元素一个 Delay(延迟时间)作为排序条件,队列中最小的元素会优先放在队首。队列中的元素只有到了 Delay 时间才允许从队列中取出。有兴趣的同学可以详细查看源码。

•Quartz 定时任务,对时间精准度要求不高,数据量较小的任务,可以采用定时任务替代延时队列。

•redis 过期回调,我们可以开启监听key 是否过期的事件,一旦key 过期会触发一个callback 事件。这样我们就能通过设置key的过期时间,来实现延时队列的效果。

•redis  sorted set,主要是利用 zset 的score 属性,我们将延时时间转成(当前时间+延时时间)(时间单位毫秒)作为scroe 属性。然后开启一个消费线程轮训redis队列,当score属性的值小于当前时间时,证明延时消息到期,可以进行消费。

•Mq,通过rabbitMq  或 rocketMq 可以实现延时队列,具体实现方式省略。

•时间轮,基于kafka 或 netty 的时间轮算法来实现延时队列,实现方式省略,有兴趣的同学可以自己查看。

实现延时队列的方式有多种,可以采用我们熟悉的方式实现自己的延时队列。一般延时队列可以作为基础公共服务提供给全公司使用,这种方式需要独立维护一个项目,对qps稳定性,数据一致性,精度等要求更高,可以采用时间轮算法实现。

我在项目中遇到一个使用延时队列的场景,因为项目中使用了redis,所以我用redis 实现了一个延时队列来满足需求,下面介绍下如何用redis 实现一个延时队列。

四、如何用redis实现延时队列?

通过 redis 实现延时队列有两种方式,第一种是redis 的过期回调,但是这种方式需要修改redis 的配置文件并重启服务,这对于我们正在使用的 redis 服务来说比较困难。

所以我们用第二种方式来实现一个延时队列:

4.1 我们使用redisson 来实现,首先引入redisson 包

 

 

4.2 配置redisson

 

 

4.3延时服务:添加延时任务,取消延时任务

  1. public class RedisDelayQueueServiceImpl implements RedisDelayQueueService {
  2.  
  3. @Autowired
  4. private RedissonClient redissonClient;
  5.  
  6. /**
  7. * 新增一个延时任务,添加job元信息
  8. *
  9. * @param job 元信息,job中包含 taskId 主键,topicId 队列id,retryNum 重试次数,delaytime 延时时间 等,此处不展开。
  10. */
  11. @Override
  12. public void addJob(Job job) {
  13. //添加分布式锁,防止重复添加任务
  14. RLock lock = redissonClient.getLock(RedisQueueKey.ADD_JOB_LOCK + job.getJobId());
  15. try {
  16. boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS);
  17. if (!lockFlag) {
  18. throw new BizException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL.getInfo());
  19. }
  20. String topicId = RedisQueueKey.getTopicId(job.getTopic(), job.getJobId());
  21.  
  22. // 1. 将job添加到 JobPool中,jobpool 作为一个全局索引,所有未执行任务都存在jobPool 中。
  23. RMap<String, Job> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
  24. if (jobPool.get(topicId) != null) {
  25. throw new BizException(ErrorMessageEnum.JOB_ALREADY_EXIST.getInfo());
  26. }
  27.  
  28. jobPool.put(topicId, job);
  29.  
  30. // 2. 将job添加到 DelayBucket中,按延时时间进行排序
  31. RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
  32. delayBucket.add(job.getDelay(), topicId);
  33. } catch (InterruptedException e) {
  34. log.error("addJob error", e);
  35. throw new BizException("add delay job error,reason:" + e.getMessage());
  36. } finally {
  37. if (lock != null) {
  38. lock.unlock();
  39. }
  40. }
  41. }
  42.  
  43.  
  44. /**
  45. * 删除job信息,为什么要删除job 信息?
  46. * 当我们确信上一个延时任务没有必要执行时,我们可以提前取消延时任务的执行。
  47. *
  48. * @param jobDie 元信息
  49. */
  50. @Override
  51. public void deleteJob(JobDie jobDie) {
  52.  
  53. RLock lock = redissonClient.getLock(RedisQueueKey.DELETE_JOB_LOCK + jobDie.getJobId());
  54. try {
  55. boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS);
  56. if (!lockFlag) {
  57. throw new BizException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL.getInfo());
  58. }
  59. String topicId = RedisQueueKey.getTopicId(jobDie.getTopic(), jobDie.getJobId());
  60. //从全局索引中删除任务。
  61. RMap<String, Job> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
  62. jobPool.remove(topicId);
  63. //从zset 中删除任务
  64. RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
  65. delayBucket.remove(topicId);
  66. } catch (InterruptedException e) {
  67. log.error("addJob error", e);
  68. throw new BizException("delete job error,reason:" + e.getMessage());
  69. } finally {
  70. if (lock != null) {
  71. lock.unlock();
  72. }
  73. }
  74. }
  75. }

 

4.4搬运线程:

搬运线程的目的是将 zset 中已经到期的任务搬运到消费队列中,消费队列中的任务会被消费线程消费。之所以会增加一个消费队列,是考虑到我们的消费能力和数据安全,如果消费能力比较弱,可能会造成消费线程阻塞,或者数据丢失。我们把到期任务放到一个阻塞队列中,可以让消费线程顺序消费。

 

这个地方还能继续优化,比如可以落库,可以建立多个阻塞队列,每个阻塞队列可以指定一个线程池进行消费等。

  1. @Slf4j
  2. @Component
  3. public class CarryJobScheduled {
  4.  
  5. @Autowired
  6. private RedissonClient redissonClient;
  7.  
  8. /**
  9. * 启动定时开启搬运JOB信息
  10. */
  11. @Scheduled(cron = "*/1 * * * * *")
  12. public void carryJobToQueue() {
  13. //System.out.println("carryJobToQueue --->");
  14. RLock lock = redissonClient.getLock(RedisQueueKey.CARRY_THREAD_LOCK);
  15. try {
  16. boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS);
  17. if (!lockFlag) {
  18. throw new BizException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL.getInfo());
  19. }
  20. // 将到期的任务取出来
  21. RScoredSortedSet<Object> bucketSet = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
  22. long now = System.currentTimeMillis();
  23. Collection<Object> jobCollection = bucketSet.valueRange(0, false, now, true);
  24. if (CollectionUtils.isEmpty(jobCollection)) {
  25. return;
  26. }
  27.  
  28. // 将到期的任务搬运到消费队列中,zset 中的任务删除
  29. List<String> jobList = jobCollection.stream().map(String::valueOf).collect(Collectors.toList());
  30. RList<String> readyQueue = redissonClient.getList(RedisQueueKey.RD_LIST_TOPIC_PRE);
  31.  
  32. if (CollectionUtils.isEmpty(jobList)) {
  33. return;
  34. }
  35. if (readyQueue.addAll(jobList)) {
  36. bucketSet.removeAllAsync(jobList);
  37. }
  38.  
  39. } catch (InterruptedException e) {
  40. log.error("carryJobToQueue error", e);
  41. } finally {
  42. if (lock != null) {
  43. lock.unlock();
  44. }
  45. }
  46. }
  47. }

 

4.5消费线程

开启一个消费线程,消费线程会消费阻塞队列中的到期任务,其中 ConsumerService 可以采用策略模式,根据不同的topic 进行不同的业务处理:

  1. @Slf4j
  2. @Component
  3. public class ReadyQueueContext {
  4.  
  5. @Autowired
  6. private RedissonClient redissonClient;
  7.  
  8. @Autowired
  9. private ConsumerService consumerService;
  10.  
  11. /**
  12. * TOPIC消费
  13. */
  14. @PostConstruct
  15. public void startTopicConsumer() {
  16. TaskManager.doTask(this::runTopicThreads, "开启TOPIC消费线程");
  17. }
  18.  
  19. /**
  20. * 开启TOPIC消费线程
  21. * 将所有可能出现的异常全部catch住,确保While(true)能够不中断
  22. */
  23. @SuppressWarnings("InfiniteLoopStatement")
  24. private void runTopicThreads() {
  25. while (true) {
  26. RLock lock = null;
  27. try {
  28. lock = redissonClient.getLock(RedisQueueKey.CONSUMER_TOPIC_LOCK);
  29. } catch (Exception e) {
  30. log.error("runTopicThreads getLock error", e);
  31. }
  32. try {
  33. if (lock == null) {
  34. continue;
  35. }
  36. // 分布式锁时间比Blpop阻塞时间多1S,避免出现释放锁的时候,锁已经超时释放,unlock报错
  37. boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS);
  38. if (!lockFlag) {
  39. continue;
  40. }
  41.  
  42. // 1. 获取ReadyQueue中待消费的数据
  43. RBlockingQueue<String> queue = redissonClient.getBlockingQueue(RedisQueueKey.RD_LIST_TOPIC_PRE);
  44. String topicId = queue.poll(60, TimeUnit.SECONDS);
  45. if (StringUtils.isEmpty(topicId)) {
  46. continue;
  47. }
  48.  
  49. // 2. 获取job元信息内容
  50. RMap<String, Job> jobPoolMap = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
  51. Job job = jobPoolMap.get(topicId);
  52.  
  53. // 3. 消费
  54. FutureTask<Boolean> taskResult = TaskManager.doFutureTask(() -> consumerService.consumerMessage(job.getJobId(),job.getTopic(), job.getBody()), job.getTopic() + "-->消费JobId-->" + job.getJobId());
  55. if (taskResult.get()) {
  56. // 3.1 消费成功,删除JobPool和DelayBucket的job信息
  57. jobPoolMap.remove(topicId);
  58. } else {
  59. /**
  60. * 重试次数为0就直接返回
  61. */
  62. if (job.getRetry() == 0) {
  63. return;
  64. }
  65. int retrySum = job.getRetry() + 1;
  66. // 3.2 消费失败,则根据策略重新加入Bucket
  67.  
  68. // 如果重试次数大于5,则将jobPool中的数据删除,持久化到DB
  69. if (retrySum > RetryStrategyEnum.RETRY_FIVE.getRetry()) {
  70. jobPoolMap.remove(topicId);
  71. continue;
  72. }
  73. job.setRetry(retrySum);
  74. long nextTime = job.getDelay() + RetryStrategyEnum.getDelayTime(job.getRetry()) * 1000;
  75. // log.info("next retryTime is [{}]", DateUtil.long2Str(nextTime));
  76. RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
  77. delayBucket.add(nextTime, topicId);
  78. // 3.3 更新元信息失败次数
  79. jobPoolMap.put(topicId, job);
  80. }
  81. } catch (Exception e) {
  82. log.error("runTopicThreads error", e);
  83. } finally {
  84. if (lock != null) {
  85. try {
  86. lock.unlock();
  87. } catch (Exception e) {
  88. log.error("runTopicThreads unlock error", e);
  89. }
  90. }
  91. }
  92. }
  93. }

 

 

 

Back to Blog
Contact Icon
Message Icon
Back To Top