11种延时任务实方式
延时任务主要应用于需要在特定时间或延迟一段时间后执行的操作场景,比如订单支付超时取消订单功能,又比如自动确定收货的功能等等。所以就来从实现到原理来盘点延迟任务的11种实现方式,这些方式并没有绝对的好坏之分,只是适用场景的不大相同。
DelayQueue
DelayQueue
是 JDK
提供的api
,是一个延迟队列。
DelayQueue泛型参数得实现Delayed接口,Delayed继承了Comparable接口。
getDelay
方法返回这个任务还剩多久时间可以执行,小于0的时候说明可以这个延迟任务到了执行的时间了。
compareTo
这个是对任务排序的,保证最先到延迟时间的任务排到队列的头。
DelayQueue 测试类
@Slf4j
public class DelayQueueJDKTest {
public static void main(String[] args) {
DelayQueue<CactusliTask> delayQueue = new DelayQueue();
new Thread(() -> {
while (true) {
try {
CactusliTask task = delayQueue.take();
log.info("获取到延迟任务:{}", task.getTaskContent());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
log.info("开始添加延迟任务");
delayQueue.offer(new CactusliTask("仙人球任务1,5秒", 5));
delayQueue.offer(new CactusliTask("仙人球任务2,3秒", 3));
delayQueue.offer(new CactusliTask("仙人球任务3,8秒", 8));
}
}
@Data
class CactusliTask implements Delayed {
private final String taskContent;
private final Long triggerTime;
public CactusliTask(String taskContent, long delayTime) {
this.taskContent = taskContent;
this.triggerTime = System.currentTimeMillis() + delayTime * 1000;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(triggerTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return this.triggerTime.compareTo(((CactusliTask) o).triggerTime);
}
}
CactusliTask 实现了Delayed接口,其中构造参数有:
- taskContent:延迟任务的具体的内容
- delayTime:延迟时间,秒为单位
测试结果
17:05:15.492 [main] INFO com.catusli.interview2.delayqueue.DelayQueueJDKTest -- 开始添加延迟任务
17:05:18.516 [Thread-0] INFO com.catusli.interview2.delayqueue.DelayQueueJDKTest -- 获取到延迟任务:仙人球任务2,3秒
17:05:20.511 [Thread-0] INFO com.catusli.interview2.delayqueue.DelayQueueJDKTest -- 获取到延迟任务:仙人球任务1,5秒
17:05:23.519 [Thread-0] INFO com.catusli.interview2.delayqueue.DelayQueueJDKTest -- 获取到延迟任务:仙人球任务3,8秒
至此基于 JDK 自带的 DelayQueue 成功实现了延时任务。
实现原理
offer
方法在提交任务的时候,会通过根据compareTo
的实现对任务进行排序,将最先需要被执行的任务放到队列头。
take
方法获取任务的时候,会拿到队列头部的元素,也就是队列中最早需要被执行的任务,通过getDelay返回值判断任务是否需要被立刻执行,如果需要的话,就返回任务,如果不需要就会等待这个任务到延迟时间的剩余时间,当时间到了就会将任务返回。
Timer
Timer 也是 JDK 中提供的一个用于安排任务在未来某个时间执行或重复执行的工具类。
Timer 测试类
@Slf4j
public class TimerDelayQueueJDKTest {
public static void main(String[] args) {
Timer timer = new Timer();
log.info("开始添加延迟任务");
timer.schedule(new TimerTask() {
@Override
public void run() {
log.info("执行仙人球任务1,6秒");
}
}, 6000);
}
}
测试结果
通过schedule
提交一个延迟时间为6s的延迟任务,实现效果如下:
D:\Public_software\Java_jdk\17\bin\java.exe
17:16:43.235 [main] INFO com.catusli.interview2.delayqueue.TimerDelayQueueJDKTest -- 开始添加延迟任务
17:16:49.247 [Timer-0] INFO com.catusli.interview2.delayqueue.TimerDelayQueueJDKTest -- 执行仙人球任务1,6秒
实现原理
提交的任务是一个TimerTask
:
TimerTask
内部有一个nextExecutionTime
属性,代表下一次任务执行的时间,在提交任务的时候会计算出nextExecutionTime
值。
Timer
内部有一个TaskQueue
对象,用来保存TimerTask
任务的,会根据nextExecutionTime
来排序,保证能够快速获取到最早需要被执行的延迟任务。
在Timer
内部还有一个执行任务的线程TimerThread
,这个线程就跟DelayQueue demo
中开启的线程作用是一样的,用来执行已经到延迟时间的任务。
所以总的来看,Time
r有点像整体封装了DelayQueue demo
中的所有东西,让用起来简单点。
虽然Timer用起来比较简单,但是在阿里规范中是不推荐使用的,主要是有以下几点原因:
- Timer使用单线程来处理任务,长时间运行的任务会导致其他任务的延时处理
- Timer没有对运行时异常进行处理,一旦某个任务触发运行时异常,会导致整个Timer崩溃,不安全
ScheduledThreadPoolExecutor
由于Timer
在使用上有一定的问题,所以在JDK1.5
版本的时候提供了ScheduledThreadPoolExecutor
,这个跟Time
r的作用差不多,并且他们的方法的命名都是差不多的,但是ScheduledThreadPoolExecutor
解决了单线程和异常崩溃等问题。
测试类
@Slf4j
public class ScheduledThreadPoolExecutorTest {
public static void main(String[] args) {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2, new ThreadPoolExecutor.CallerRunsPolicy());
log.info("提交延时任务");
executor.schedule(() -> {
log.info("执行任务1,延时5秒");
}, 5, TimeUnit.SECONDS);
}
}
测试结果
17:35:19.152 [main] INFO com.cactusli.interview2.delayqueue.ScheduledThreadPoolExecutorTest -- 提交延时任务
17:35:24.166 [pool-1-thread-1] INFO com.cactusli.interview2.delayqueue.ScheduledThreadPoolExecutorTest -- 执行任务1,延时5秒
实现原理
ScheduledThreadPoolExecutor
继承了ThreadPoolExecutor
,也就是继承了线程池,所以可以有很多个线程来执行任务。
ScheduledThreadPoolExecuto
r在构造的时候会传入一个DelayedWorkQueue
阻塞队列,所以线程池内部的阻塞队列是DelayedWorkQueue
。
在提交延迟任务的时候,任务会被封装一个任务会被封装成ScheduledFutureTask
对象,然后放到DelayedWorkQueue
阻塞队列中。
ScheduledFutureTask
实现了前面提到的Delayed接口,所以其实可以猜到DelayedWorkQueue
会根据ScheduledFutureTask
对于Delayed接口的实现来排序,所以线程能够获取到最早到延迟时间的任务。当线程从DelayedWorkQueue
中获取到需要执行的任务之后就会执行任务。
RocketMQ
RocketMQ
是阿里开源的一款消息中间件,其内部也是实现了延迟消息的功能,如果有对RocketMQ
不熟悉的小伙伴可以看一下我之前写的RocketMQ保姆级教程进行学习。
RocketMQ
延迟消息的延迟时间默认有18个等级。当发送消息的时候只需要指定延迟等级即可。如果这18个等级的延迟时间不符和你的要求,可以修改RocketMQ
服务端的配置文件。
当发送消息的时候只需要指定延迟等级即可。如果这18个等级的延迟时间不符和你的要求,可以修改RocketMQ服务端的配置文件。
RocketMQ 测试
在项目中引入依赖如下:
<!--SpringBoot与RocketMQ整合依赖-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>3.2.0</version>
</dependency>
在 application.properties 中配置:
# ========================RocketMQ=====================
rocketmq.producer.group=cactusliProducer
rocketmq.consumer.group=cactusliProducer
rocketmq.name-server=192.168.1.20:9876
创建Topic 和 消费者组
./bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t cactusliDelayTaskTopic
./bin/mqadmin updateSubGroup -n localhost:9876 -c DefaultCluster -g cactusliConsumer
新建 RocketMQProducerConfig
配置消息生产者:
@Configuration
public class RocketMQProducerConfig {
@Value("${rocketmq.producer.group}")
private String producerGroup;
@Value("${rocketmq.name-server}")
private String namesrvAddr;
@Bean
public DefaultMQProducer defaultMQProducer() {
DefaultMQProducer cactusliDelayTaskProducer = new DefaultMQProducer(producerGroup);
cactusliDelayTaskProducer.setNamesrvAddr(namesrvAddr);
try {
cactusliDelayTaskProducer.start();
} catch (Exception e) {
e.printStackTrace();
}
return cactusliDelayTaskProducer;
}
}
然后在新建 RocketMQConsumerConfig
配置消息消费者,监听cactusliDelayTaskTopic
的消息:
@Slf4j
@Configuration
public class RocketMQConsumerConfig {
@Value("${rocketmq.name-server}")
private String nameServer;
@Value("${rocketmq.consumer.group}")
private String consumerGroup;
@Bean
public DefaultMQPushConsumer defaultMQPushConsumer() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(nameServer);
try {
consumer.subscribe("cactusliDelayTaskTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
log.info("获取到延迟任务:{}", new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
return consumer;
}
}
controller类,通过DefaultMQProducer发送延迟消息到`cactusliDelayTaskTopic这个topic,延迟等级为2,也就是延迟时间为5s的意思。
@RestController
@Slf4j
public class RocketMQDelayTaskController {
@Resource
private DefaultMQProducer producer;
@GetMapping("/rocketmq/add")
public void addTask(@RequestParam("task") String task) throws Exception {
Message msg = new Message("cactusliDelayTaskTopic", "TagA", task.getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.setDelayTimeLevel(2);
// 发送消息并得到消息的发送结果,然后打印
log.info("提交延迟任务");
producer.send(msg);
}
}
测试结果
启动应用,浏览器输入链接添加任务: http://192.168.1.218:24618/rocketmq/add?task=仙人球下发订单1
2024-08-20T09:54:39.738+08:00 INFO 21872 --- [Interview2] [io-24618-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2024-08-20T09:54:39.739+08:00 INFO 21872 --- [Interview2] [io-24618-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 1 ms
2024-08-20T09:54:39.761+08:00 INFO 21872 --- [Interview2] [io-24618-exec-1] c.c.i.c.RocketMQDelayTaskController : 提交延迟任务
2024-08-20T09:54:44.802+08:00 INFO 21872 --- [Interview2] [tusliConsumer_1] c.c.i.config.RocketMQConsumerConfig : 获取到延迟任务:仙人球下发订单1
实现原理
生产者发送延时消息之后,RocketMQ服务端在接收到消息之后,会去根据延迟级别是否大于0来判断是否是延迟消息。
- 如果不大于0,说明不是延迟消息,那就会将消息保存到指定的topic中
- 如果大于0,说明是延迟消息,此时RocketMQ会进行一波偷梁换柱的操作,将消息的topic改成
SCHEDULE_TOPIC_XXXX
中,XXXX不是占位符,然后存储。
在RocketMQ
内部有一个延迟任务,相当于是一个定时任务,这个任务就会获取SCHEDULE_TOPIC_XXXX
中的消息,判断消息是否到了延迟时间,如果到了,那么就会将消息的topic存储到原来真正的topic(拿我们的例子来说就是cactusliDelayTaskTopic
)中,之后消费者就可以从真正的topic中获取到消息了。
定时任务
RocketMQ这种实现方式相比于前面提到的三种更加可靠,因为前面提到的三种任务内容都是存在内存的,服务器重启任务就丢了,如果要实现任务不丢还得自己实现逻辑,但是RocketMQ消息有持久化机制,能够保证任务不丢失。
RabbitMQ
RabbitMQ也是一款消息中间件,通过RabbitMQ的死信队列也可以是先延迟任务的功能。
Docker 安装 RabbitMQ
docker pull rabbitmq:management docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
RabbitMQ 测试
在项目中引入RabbitMQ的依赖:
<!--SpringBoot与RabbitMQ整合依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>3.2.0</version>
</dependency
在 application.properties 中配置如下:
# ========================RabbitMQ =====================
spring.rabbitmq.host=192.168.1.20
spring.rabbitmq.port=15672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
RabbitMQ死信队列的配置类,后面会介绍原理
@Configuration
public class RabbitMQConfiguration {
@Bean
public DirectExchange cactusliDirectExchangee() {
return new DirectExchange("cactusliDirectExchangee");
}
@Bean
public Queue cactusliQueue() {
return QueueBuilder
// 队列名称, 持久化
.durable("cactusliQueue")
// 设置队列的过期时间, 也就是延迟任务的时间5s
.ttl(5000)
// 设置死信交换机
.deadLetterExchange("cactusliDelayTaskExchangee")
.build();
}
@Bean
public Binding cactusliQueueBinding() {
// 绑定队列到交换机
return BindingBuilder.bind(cactusliQueue())
.to(cactusliDirectExchangee())
.with("");
}
@Bean
public DirectExchange cactusliDelayTaskExchangee() {
return new DirectExchange("cactusliDelayTaskExchangee");
}
@Bean
public Queue cactusliDelayTaskQueue() {
return QueueBuilder
// 队列名称, 持久化
.durable("cactusliDelayTaskQueue")
.build();
}
@Bean
public Binding cactusliDelayTaskBinding() {
// 绑定队列到交换机
return BindingBuilder.bind(cactusliDelayTaskQueue())
.to(cactusliDelayTaskExchangee())
.with("");
}
}
RabbitMQDelayTaskController用来发送消息,这里没指定延迟时间,是因为在声明队列的时候指定了延迟时间为5s
@RestController
@Slf4j
public class RabbitMQDelayTaskController {
@Resource
private AmqpTemplate amqpTemplate;
@GetMapping("/rabbitmq/add")
public void addTask(@RequestParam("task") String task) throws Exception {
log.info("提交延迟任务");
// 发送消息
amqpTemplate.convertAndSend("cactusliDirectExchangee", "", task, message -> {
// 为消息设置一个唯一的ID
message.getMessageProperties().setCorrelationId(UUID.randomUUID().toString());
return message;
});
}
}
测试结果
启动应用,浏览器输入链接添加任务: http://192.168.1.218:24618/rabbitmq/add?task=仙人球下发订单3
2024-08-20T16:46:17.419+08:00 INFO 20996 --- [io-24618-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 1 ms
2024-08-20T16:46:17.443+08:00 INFO 20996 --- [io-24618-exec-1] c.c.i.c.RabbitMQDelayTaskController : 提交延迟任务
2024-08-20T16:46:22.461+08:00 INFO 20996 --- [ntContainer#0-1] c.c.i.d.r.RabbitMessageListener : 接收到消息: 仙人球下发订单1
2024-08-20T16:47:18.899+08:00 INFO 20996 --- [io-24618-exec-4] c.c.i.c.RabbitMQDelayTaskController : 提交延迟任务
2024-08-20T16:47:23.903+08:00 INFO 20996 --- [ntContainer#0-1] c.c.i.d.r.RabbitMessageListener : 接收到消息: 仙人球下发订单2
// 查看时间相差 5s 是正确的
2024-08-20T16:47:30.497+08:00 INFO 20996 --- [io-24618-exec-5] c.c.i.c.RabbitMQDelayTaskController : 提交延迟任务
2024-08-20T16:47:35.501+08:00 INFO 20996 --- [ntContainer#0-1] c.c.i.d.r.RabbitMessageListener : 接收到消息: 仙人球下发订单3
实现原理
整个工作流程如下:
- 消息发送的时候会将消息发送到
cactusliDirectExchange
这个交换机上 - 由于
cactusliDirectExchange
绑定了cactusliQueue
,所以消息会被路由到cactusliQueue
这个队列上 - 由于
cactusliQueue
没有消费者消费消息,并且又设置了5s的过期时间,所以当消息过期之后,消息就被放到绑定的cactusliDelayTaskExchangee
死信交换机中 - 消息到达
cactusliDelayTaskExchangee
交换机后,由于跟cactusliDelayTaskQueue
进行了绑定,所以消息就被路由到cactusliDelayTaskQueue
中,消费者就能从cactusliDelayTaskQueue
中拿到消息了
上面说的队列与交换机的绑定关系,就是上面的配置类所干的事。
其实从这个单从消息流转的角度可以看出,RabbitMQ跟RocketMQ实现有相似之处。
消息最开始都并没有放到最终消费者消费的队列中,而都是放到一个中间队列中,等消息到了过期时间或者说是到延迟时间,消息就会被放到最终的队列供消费者消息。
只不过RabbitMQ需要你显示的手动指定消息所在的中间队列,而RocketMQ是在内部已经做好了这块逻辑。
除了基于RabbitMQ的死信队列来做,RabbitMQ官方还提供了延时插件,也可以实现延迟消息的功能,这个插件的大致原理也跟上面说的一样,延时消息会被先保存在一个中间的地方,叫做Mnesia,然后有一个定时任务去查询最近需要被投递的消息,将其投递到目标队列中。
监听Redis过期key
在Redis中,使用发布订阅的机制来实现延迟任务。
Redis 测试
在项目中引入Redis的依赖:
<!--SpringBoot与Redis整合依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
在 application.properties 中配置:
spring.data.redis.database=0
spring.data.redis.host=192.168.1.18
spring.data.redis.port=6380
spring.data.redis.password=Nstr.@#$
spring.data.redis.lettuce.pool.max-active=8
spring.data.redis.lettuce.pool.max-wait=-1ms
spring.data.redis.lettuce.pool.max-idle=8
配置类
/**
* @param lettuceConnectionFactory
* @return redis序列化的工具配置类,下面这个请一定开启配置
* 127.0.0.1:6379> keys *
* 1) "ord:102" 序列化过
* 2) "\xac\xed\x00\x05t\x00\aord:102" 野生,没有序列化过
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(lettuceConnectionFactory);
//设置key序列化方式string
redisTemplate.setKeySerializer(new StringRedisSerializer());
//设置value的序列化方式json
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(connectionFactory);
return redisMessageListenerContainer;
}
@Bean
public KeyExpirationEventMessageListener redisKeyExpirationListener(RedisMessageListenerContainer redisMessageListenerContainer) {
return new KeyExpirationEventMessageListener(redisMessageListenerContainer);
}
KeyExpirationEventMessageListener
实现了对__keyevent@*__:expired
channel的监听
当KeyExpirationEventMessageListener
收到Redis发布的过期Key的消息的时候,会发布RedisKeyExpiredEvent
事件
所以我们只需要监听RedisKeyExpiredEvent
事件就可以拿到过期消息的Key,也就是延迟消息。
对RedisKeyExpiredEvent事件的监听实现MyRedisKeyExpiredEventListener
@Component
public class RedisKeyExpiredEventListener implements ApplicationListener<RedisKeyExpiredEvent> {
@Override
public void onApplicationEvent(RedisKeyExpiredEvent event) {
byte[] body = event.getSource();
System.out.println("获取到延迟消息:" + new String(body));
}
}
controller类,通过stringRedisTemplate
向 Redis Server 中设置一个 Key (键), key 的有效时间是 5s。
@RestController
@Slf4j
public class RedisDelayTaskController {
@Resource
private StringRedisTemplate stringRedisTemplate;
@GetMapping("/redis/add")
public void addTask(@RequestParam("task") String task) throws Exception {
stringRedisTemplate.opsForValue().set("cactusliDelayTask", task,5, TimeUnit.SECONDS);
// 发送消息并得到消息的发送结果,然后打印
log.info("提交延迟任务");
}
}
测试结果
启动应用,浏览器输入链接添加任务: http://192.168.1.218:24618/redis/addtask=仙人球下发订单3
成功获取到延迟任务
2024-08-21T10:30:34.030+08:00 INFO 3612 --- [io-24618-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 1 ms
2024-08-21T10:30:34.061+08:00 INFO 3612 --- [io-24618-exec-1] c.c.i.c.RedisDelayTaskController : 提交延迟任务
2024-08-21T10:30:39.137+08:00 INFO 3612 --- [enerContainer-1] c.c.i.d.l.RedisKeyExpiredEventListener : 获取到延迟消息:cactusliDelayTask
2024-08-21T10:30:42.721+08:00 INFO 3612 --- [io-24618-exec-2] c.c.i.c.RedisDelayTaskController : 提交延迟任务
2024-08-21T10:30:47.762+08:00 INFO 3612 --- [enerContainer-2] c.c.i.d.l.RedisKeyExpiredEventListener : 获取到延迟消息:cactusliDelayTask
虽然这种方式可以实现延迟任务,但是这种方式坑比较多
任务存在延迟
Redis过期事件的发布不是指key到了过期时间就发布,而是key到了过期时间被清除之后才会发布事件。
而Redis过期key的两种清除策略,就是面试八股文常背的两种:
- 惰性清除。当这个key过期之后,访问时,这个Key才会被清除
- 定时清除。后台会定期检查一部分key,如果有key过期了,就会被清除
所以即使key到了过期时间,Redis也不一定会发送key过期事件,这就到导致虽然延迟任务到了延迟时间也可能获取不到延迟任务。
丢消息太频繁
Redis实现的发布订阅模式,消息是没有持久化机制,当消息发布到某个channel之后,如果没有客户端订阅这个channel,那么这个消息就丢了,并不会像MQ一样进行持久化,等有消费者订阅的时候再给消费者消费。
所以说,假设服务重启期间,某个生产者或者是Redis本身发布了一条消息到某个channel,由于服务重启,没有监听这个channel,那么这个消息自然就丢了。
消息消费只有广播模式
Redis的发布订阅模式消息消费只有广播模式一种。
所谓的广播模式就是多个消费者订阅同一个channel,那么每个消费者都能消费到发布到这个channel的所有消息。
如图所示,生产者发布了一条消息,那么两个消费者都可以同时收到这条消息。所以,如果通过监听channel来获取延迟任务,那么一旦服务实例有多个的话,还得保证消息不能重复处理,额外地增加了代码开发量。
接收到所有key的某个事件
这个不属于Redis发布订阅模式的问题,而是Redis本身事件通知的问题。
当监听了__keyevent@<db>__:expired
的channel,那么所有的Redis的key只要发生了过期事件都会被通知给消费者,不管这个key是不是消费者想接收到的。
所以如果你只想消费某一类消息的key,那么还得自行加一些标记,比如消息的key加个前缀,消费的时候判断一下带前缀的key就是需要消费的任务。
实现原理
生产者在消息发送时需要到指定发送到哪个channel上,消费者订阅这个channel就能获取到消息。图中channel理解成MQ中的topic。
并且在Redis中,有很多默认的channel,只不过向这些channel发送消息的生产者不是我们写的代码,而是Redis本身。这里面就有这么一个channel叫做__keyevent@<db>__:expired
,db是指Redis数据库的序号。
当某个Redis的key过期之后,Redis内部会发布一个事件到__keyevent@<db>__:expired
这个channel上,只要监听这个事件,那么就可以获取到过期的key。
所以基于监听Redis过期key实现延迟任务的原理如下:
- 将延迟任务作为key,过期时间设置为延迟时间
- 监听
__keyevent@<db>__:expired
这个channel,那么一旦延迟任务到了过期时间(延迟时间),那么就可以获取到这个任务
Redisson的RDelayedQueue
Redisson 是一个基于 Java 的 Redis 客户端,其主要目的是简化在 Java 应用程序中与 Redis 交互的过程。Redisson 提供了许多高级的分布式数据结构和服务,这些都建立在 Redis 的基础之上。并基于Redis实现了非常多的功能,其中最常使用的就是Redis分布式锁的实现,但是除了实现Redis分布式锁之外,它还实现了延迟队列的功能。
Redisson 测试
在项目中引入redisson 的依赖:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.18.0</version>
</dependency>
封装了一个RedissonDelayQueue类
package com.cactusli.interview2.delayqueue.listener;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.redisson.Redisson;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class RedissonDelayQueue {
private RedissonClient redissonClient;
private RDelayedQueue<String> delayQueue;
private RBlockingQueue<String> blockingQueue;
@PostConstruct
public void init() {
initDelayQueue();
startDelayQueueConsumer();
}
private void initDelayQueue() {
Config config = new Config();
SingleServerConfig serverConfig = config.useSingleServer();
serverConfig.setAddress("redis://192.168.1.18:6380");
serverConfig.setPassword("Nstr.");
serverConfig.setDatabase(0);
redissonClient = Redisson.create(config);
blockingQueue = redissonClient.getBlockingQueue("CACTUSLI");
delayQueue = redissonClient.getDelayedQueue(blockingQueue);
}
private void startDelayQueueConsumer() {
new Thread(() -> {
while (true) {
try {
String task = blockingQueue.take();
log.info("接收到延迟任务:{}", task);
} catch (Exception e) {
e.printStackTrace();
}
}
}, "SANYOU-Consumer").start();
}
public void offerTask(String task, long seconds) {
log.info("添加延迟任务:{} 延迟时间:{}s", task, seconds);
delayQueue.offer(task, seconds, TimeUnit.SECONDS);
}
}
这个类在创建的时候会去初始化延迟队列,创建一个RedissonClient对象,之后通过RedissonClient对象获取到RDelayedQueue和RBlockingQueue对象,传入的队列名字叫CACTUSLI,这个名字无所谓。
当延迟队列创建之后,会开启一个延迟任务的消费线程,这个线程会一直从RBlockingQueue中通过take方法阻塞获取延迟任务。
添加任务的时候是通过RDelayedQueue的offer方法添加的。
controller类,通过接口添加任务,延迟时间为5s
@RestController
public class RedissonDelayQueueController {
@Resource
private RedissonDelayQueue redissonDelayQueue;
@GetMapping("/redisson/add")
public void addTask(@RequestParam("task") String task) {
redissonDelayQueue.offerTask(task, 5);
}
}
测试结果
启动应用,浏览器输入链接添加任务: http://192.168.1.218:24618/redisson/add?task==仙人球下发订单3
静静等待5s,成功获取到任务。
2024-08-22T16:39:17.973+08:00 INFO 19296 --- [io-24618-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 1 ms
2024-08-22T16:39:18.024+08:00 INFO 19296 --- [io-24618-exec-1] c.c.i.d.listener.RedissonDelayQueue : 添加延迟任务:仙人球下发订单1 延迟时间:5s
2024-08-22T16:39:23.078+08:00 INFO 19296 --- [SANYOU-Consumer] c.c.i.d.listener.RedissonDelayQueue : 接收到延迟任务:仙人球下发订单1
2024-08-22T16:39:32.599+08:00 INFO 19296 --- [io-24618-exec-2] c.c.i.d.listener.RedissonDelayQueue : 添加延迟任务:仙人球下发订单2 延迟时间:5s
2024-08-22T16:39:37.676+08:00 INFO 19296 --- [SANYOU-Consumer] c.c.i.d.listener.RedissonDelayQueue : 接收到延迟任务:仙人球下发订单2
实现原理
如下是Redisson延迟队列的实现原理
CACTUSLI前面的前缀都是固定的,Redisson创建的时候会拼上前缀。
redisson_delay_queue_timeout:CACTUSLI
,sorted set数据类型,存放所有延迟任务,按照延迟任务的到期时间戳(提交任务时的时间戳 + 延迟时间)来排序的,所以列表的最前面的第一个元素就是整个延迟队列中最早要被执行的任务,这个概念很重要redisson_delay_queue:CACTUSLI
,list数据类型,也是存放所有的任务,但是研究下来发现好像没什么用。CACTUSLI
,list数据类型,被称为目标队列,这个里面存放的任务都是已经到了延迟时间的,可以被消费者获取的任务,所以上面demo中的RBlockingQueue的take方法是从这个目标队列中获取到任务的redisson_delay_queue_channel:CACTUSLI
,是一个channel,用来通知客户端开启一个延迟任务
任务提交的时候,Redisson会将任务放到redisson_delay_queue_timeout:CACTUSLI
中,分数就是提交任务的时间戳+延迟时间,就是延迟任务的到期时间戳
Redisson客户端内部通过监听redisson_delay_queue_channel:CACTUSLI
这个channel来提交一个延迟任务,这个延迟任务能够保证将redisson_delay_queue_timeout:CACTUSLI
中到了延迟时间的任务从redisson_delay_queue_timeout:CACTUSLI
中移除,存到CACTUSLI
这个目标队列中。
于是消费者就可以从CACTUSLI
这个目标队列获取到延迟任务了。
所以从这可以看出,Redisson的延迟任务的实现跟前面说的MQ的实现都是殊途同归,最开始任务放到中间的一个地方,叫做redisson_delay_queue_timeout:CACTUSLI
,然后会开启一个类似于定时任务的一个东西,去判断这个中间地方的消息是否到了延迟时间,到了再放到最终的目标的队列供消费者消费。
Redisson的这种实现方式比监听Redis过期key的实现方式更加可靠,因为消息都存在list和sorted set数据类型中,所以消息很少丢。
Netty的HashedWheelTimer
Netty测试
@Slf4j
public class NettyHashedWheelTimerDemo {
public static void main(String[] args) {
HashedWheelTimer timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 8);
timer.start();
log.info("提交延迟任务");
timer.newTimeout(timeout -> log.info("执行延迟任务"), 5, TimeUnit.SECONDS);
}
}
测试结果
17:49:44.965 [main] INFO com.cactusli.interview2.delayqueue.NettyHashedWheelTimerDemo -- 提交延迟任务
17:49:50.070 [pool-1-thread-1] INFO com.cactusli.interview2.delayqueue.NettyHashedWheelTimerDemo -- 执行延迟任务
实现原理
如图,时间轮会被分成很多格子(上述demo中的8就代表了8个格子),一个格子代表一段时间(上述demo中的100就代表一个格子是100ms),所以上述demo中,每800ms会走一圈。
当任务提交的之后,会根据任务的到期时间进行hash取模,计算出这个任务的执行时间所在具体的格子,然后添加到这个格子中,如果这个格子有多个任务,会用链表来保存。所以这个任务的添加有点像HashMap储存元素的原理。
HashedWheelTimer内部会开启一个线程,轮询每个格子,找到到了延迟时间的任务,然后执行。
由于HashedWheelTimer也是单线程来处理任务,所以跟Timer一样,长时间运行的任务会导致其他任务的延时处理。
前面Redisson中提到的客户端延迟任务就是基于Netty的HashedWheelTimer实现的。
Hutool的SystemTimer
Hutool工具类也提供了延迟任务的实现SystemTimer。
Hutool测试
@Slf4j
public class SystemTimerDemo {
public static void main(String[] args) {
SystemTimer systemTimer = new SystemTimer();
systemTimer.start();
log.info("提交延迟任务");
systemTimer.addTask(new TimerTask(() -> log.info("执行延迟任务"), 5000));
}
}
测试结果
17:58:09.752 [main] INFO com.cactusli.interview2.delayqueue.SystemTimerDemo -- 提交延迟任务
17:58:14.755 [pool-2-thread-1] INFO com.cactusli.interview2.delayqueue.SystemTimerDemo -- 执行延迟任务
实现原理
Hutool底层其实也用到了时间轮。
Qurtaz
Qurtaz是一款开源作业调度框架,基于Qurtaz提供的api也可以实现延迟任务的功能。
Qurtaz 测试
在 pom.xml 中添加依赖:
<!-- https://mvnrepository.com/artifact/org.quartz-scheduler/quartz -->
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.2</version>
</dependency>
CactusliJob实现Job接口,当任务到达执行时间的时候会调用execute的实现,从context可以获取到任务的内容:
@Slf4j
public class CactusliJob implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
JobDetail jobDetail = jobExecutionContext.getJobDetail();
JobDataMap jobDataMap = jobDetail.getJobDataMap();
log.info("获取到延迟任务:{}", jobDataMap.get("delayTask"));
}
}
创建 CactusliJobQuartzDemo,执行定时任务。
@Slf4j
public class CactusliJobQuartzDemo {
public static void main(String[] args) throws SchedulerException, InterruptedException {
// 1.创建Scheduler的工厂
SchedulerFactory sf = new StdSchedulerFactory();
// 2.从工厂中获取调度器实例
Scheduler scheduler = sf.getScheduler();
// 6.启动 调度器
scheduler.start();
// 3.创建JobDetail,Job类型就是上面说的CactusliJob
JobDetail jb = JobBuilder.newJob(CactusliJob.class)
.usingJobData("delayTask", "这是一个延迟任务")
.build();
// 4.创建Trigger
Trigger t = TriggerBuilder.newTrigger()
//任务的触发时间就是延迟任务到的延迟时间
.startAt(DateUtil.offsetSecond(new Date(), 5))
.build();
// 5.注册任务和定时器
log.info("提交延迟任务");
scheduler.scheduleJob(jb, t);
}
}
测试结果
18:04:42.293 [main] INFO org.quartz.impl.StdSchedulerFactory -- Quartz scheduler 'DefaultQuartzScheduler' initialized from default resource file in Quartz package: 'quartz.properties'
18:04:42.293 [main] INFO org.quartz.impl.StdSchedulerFactory -- Quartz scheduler version: 2.3.2
18:04:42.293 [main] INFO org.quartz.core.QuartzScheduler -- Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED started.
18:04:42.369 [main] INFO com.cactusli.interview2.delayqueue.CactusliJobQuartzDemo -- 提交延迟任务
18:04:47.358 [DefaultQuartzScheduler_Worker-1] INFO com.cactusli.interview2.delayqueue.CactusliJob -- 获取到延迟任务:这是一个延迟任务
实现原理
Quartz 中核心组件:
- Job:表示一个任务,execute方法的实现是对任务的执行逻辑
- JobDetail:任务的详情,可以设置任务需要的参数等信息
- Trigger:触发器,是用来触发业务的执行,比如说指定5s后触发任务,那么任务就会在5s后触发
- Scheduler:调度器,内部可以注册多个任务和对应任务的触发器,之后会调度任务的执行
启动的时候会开启一个QuartzSchedulerThread调度线程,这个线程会去判断任务是否到了执行时间,到的话就将任务交给任务线程池去执行。
无限轮询延迟任务
无限轮询的意思就是开启一个线程不停的去轮询任务,当这些任务到达了延迟时间,那么就执行任务。
测试
@Slf4j
public class PollingTaskDemo {
private static final List<DelayTask> DELAY_TASK_LIST = new CopyOnWriteArrayList<>();
public static void main(String[] args) {
new Thread(() -> {
while (true) {
try {
for (DelayTask delayTask : DELAY_TASK_LIST) {
if (delayTask.triggerTime <= System.currentTimeMillis()) {
log.info("处理延迟任务:{}", delayTask.taskContent);
DELAY_TASK_LIST.remove(delayTask);
}
}
TimeUnit.MILLISECONDS.sleep(100);
} catch (Exception e) {
}
}
}).start();
log.info("提交延迟任务");
DELAY_TASK_LIST.add(new DelayTask("知识库:https://cactusli.net/ ", 5L));
}
@Getter
@Setter
public static class DelayTask {
private final String taskContent;
private final Long triggerTime;
public DelayTask(String taskContent, Long delayTime) {
this.taskContent = taskContent;
this.triggerTime = System.currentTimeMillis() + delayTime * 1000;
}
}
}
任务可以存在数据库又或者是内存,看具体的需求,这里我为了简单就放在内存里了。
测试结果
18:39:08.194 [main] INFO com.cactusli.interview2.delayqueue.PollingTaskDemo -- 提交延迟任务
18:39:13.201 [Thread-0] INFO com.cactusli.interview2.delayqueue.PollingTaskDemo -- 处理延迟任务:知识库:https://cactusli.net/
这种操作简单,但是就是效率低下,每次都得遍历所有的任务。