延迟任务多种实现姿势--上
Posted 热爱编程的大忽悠
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了延迟任务多种实现姿势--上相关的知识,希望对你有一定的参考价值。
延迟任务多种实现姿势--上
关于延迟任务的所有代码实现均存放在下面这个仓库中:
https://gitee.com/DaHuYuXiXi/deley-task
代码仓库中的源码经过后续更新后,对相关实现进行了抽象,因此会与本文展示的代码有稍许不同
什么是延迟任务
例如:pdd下单,但是没有付款,那么24小时候,订单会自动取消。收货后,如果一直不进行确认,那么默认七天后自动确认收货等等。
上面这些场景是我们平日中一直都会遇到的,作为程序员的我们,有没有考虑过该怎么实现这些延迟任务呢?
一,最简单的延迟队列实现
DelayQueue是一个无界的BlockingQueue的实现类,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。
- BlockingQueue即阻塞队列,java提供的面向多线程安全的队列数据结构,当队列内元素数量为0的时候,试图从队列内获取元素的线程将被阻塞或者抛出异常。
- 这里的“无界”队列,是指队列的元素数量不存在上限,队列的容量会随着元素数量的增加而扩容。
DelayQueue实现了BlockingQueue接口,所以具有无界、阻塞的特点,除此之外它自己的核心特点就是:
- 放入该队列的延时任务对象,只要到达延时时间之后才能被取到。
- DelayQueue 不接受null元素。
- DelayQueue 只接受那些实现了java.util.concurrent.Delayed接口的对象。
订单延迟任务实现
package com.delayTask.delayQueue;
import com.delayTask.domain.Order;
import lombok.Data;
import lombok.ToString;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* 延时订单任务
*
* @author zdh
*/
@ToString
@Data
public class OrderDelayObject implements Delayed
/**
* 延迟任务唯一标识: 这里默认为当前时间戳
*/
private Long id;
/**
* 延时时间
*/
private long delayTime;
/**
* 订单对象
*/
private Order order;
public OrderDelayObject(long delayTime, Order order)
this.id = System.currentTimeMillis();
//延时时间加上当前时间
this.delayTime = System.currentTimeMillis() + delayTime;
this.order = order;
/**
* 延迟任务是否到期
*/
@Override
public long getDelay(TimeUnit unit)
long diff = delayTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
/**
* 延时任务队列,按照延时时间元素排序,实现Comparable接口
*/
@Override
public int compareTo(Delayed obj)
return Long.compare(this.delayTime, ((OrderDelayObject) obj).delayTime);
- Delayed接口继承Comparable接口,所以需要实现compareTo方法,用于延时任务在队列中按照“延时时间”进行排序。
- getDelay方法是Delayed接口方法,实现该方法提供获取延时任务的倒计时时间
订单处理
package com.dhy.delayQueue;
import com.delayTask.delayQueue.OrderDelayFactory;
import com.delayTask.delayQueue.OrderDelayObject;
import com.delayTask.domain.Order;
import lombok.extern.slf4j.Slf4j;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Slf4j
public class DelayQueueTest
/**
* 延迟队列
*/
private final DelayQueue<OrderDelayObject> delayQueue = new DelayQueue<>();
/**
* 开启线程不断轮询,看是否有延迟任务可以处理
*/
@BeforeTest
public void beforeTest()
Executors.newSingleThreadExecutor().execute(() ->
try
while (true)
//阻塞直到获取到某个到时的延迟任务
OrderDelayObject delayObject = delayQueue.take();
log.info("延迟任务信息如下: ",delayObject);
Order order = delayObject.getOrder();
order.cancelOrderByTimeEnd();
catch (InterruptedException e)
e.printStackTrace();
);
/**
* 测试下单
*/
@Test
public void testOrder() throws InterruptedException
OrderDelayObject orderDelay = OrderDelayFactory.newOrderDelay("大忽悠", "小风扇", 13.4, 10L);
delayQueue.add(orderDelay);
OrderDelayObject orderDelay1 = OrderDelayFactory.newOrderDelay("小朋友", "冰箱", 3000.0, 20L);
delayQueue.add(orderDelay1);
Thread.sleep(TimeUnit.SECONDS.toMillis(8L));
orderDelay.getOrder().submitOrder();
delayQueue.remove(orderDelay);
//防止程序结束
Thread.sleep(TimeUnit.MINUTES.toMillis(10L));
优缺点
使用DelayQueue实现延时任务非常简单,而且简便,全部都是标准的JDK代码实现,不用引入第三方依赖(不依赖redis实现、消息队列实现等),非常的轻量级。
它的缺点就是所有的操作都是基于应用内存的,一旦出现应用单点故障,可能会造成延时任务数据的丢失。如果订单并发量非常大,因为DelayQueue是无界的,订单量越大,队列内的对象就越多,可能造成OOM的风险。所以使用DelayQueue实现延时任务,只适用于任务量较小的情况。
优化点
上图中我们使用的是while-true循环同步顺序的处理延迟任务:
这里建议将订单处理的业务逻辑放到单独一个线程池中进行处理,而非在这里同步进行处理,因为这样可能会导致部分到期的延迟任务无法得到及时的处理。
二,上点档次,基于Netty时间轮算法实现
时间轮算法
时间轮算法名副其实,时间轮就是一个环形的数据结构,类似于表盘,将时间轮分成多个bucket(比如:0-8)。假设每个时间轮轮片的分隔时间段tickDuration=1s(即:指针经过每个格子花费时间是 1 s),当前的时间bucket=3,那么在18秒后需要被执行的任务需要落到((3+18)%8=5取余运算)的5号bucket上。假如有多个需要在该时间段内执行的任务,就会组成一个双向链表。另外针对时间轮我们要有下面的几个认知:
- 时间轮指针是一个Worker线程,在时间轮整点的时候执行双向链表中的任务。
- 时间轮算法的并不是精准的延时,它的执行精度取决于每个时间轮轮片的分隔时间段tickDuration
- Worker线程是单线程,一个bucket、一个bucket的顺序处理任务。「所以我们的延时任务一定要做成异步任务,否则会影响时间轮后续任务的执行时间。」
订单延迟任务实现
这里商品订单到时取消对时间精确度的要求并不是特别高,因此可以选择采用时间轮算法进行处理。
首先通过maven坐标引入netty
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha2</version>
</dependency>
- 对netty时间轮的使用,进行一层简单的封装
package com.delayTask.wheelTimer;
import com.delayTask.DelayTaskEvent;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* <h>
* 时间轮工厂
* </h>
* @author 大忽悠
* @create 2022/9/17 17:17
*/
public class WheelTimerHelper
/**
* 处理订单任务的线程池
*/
private static final ExecutorService THREAD_POOL= Executors.newCachedThreadPool();
/**
* 时间轮
*/
private static HashedWheelTimer wheelTimer;
/**
* 生产一个时间轮,默认的bucket数量为512个
*/
public static HashedWheelTimer newWheelTimer(Long duration)
wheelTimer=new HashedWheelTimer(duration, TimeUnit.MILLISECONDS, 512);
return wheelTimer;
/**
* @param delayTaskEvent 延迟任务事件
*/
public static Timeout addNewTask(DelayTaskEvent delayTaskEvent)
//延迟任务,延迟时间,时间单位
return wheelTimer.newTimeout(delayTask ->
delayTaskEvent.handleDelayEvent();
, delayTaskEvent.getDelay(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
- 测试
package com.dhy.wheelTimer;
import com.delayTask.delayQueue.OrderDelayEvent;
import com.delayTask.delayQueue.OrderDelayFactory;
import com.delayTask.wheelTimer.WheelTimerHelper;
import io.netty.util.Timeout;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import java.util.concurrent.TimeUnit;
/**
* @author 大忽悠
* @create 2022/9/17 17:19
*/
public class WheelTimerTest
@BeforeTest
public void beforeTest()
WheelTimerHelper.newWheelTimer(100L);
@Test
public void testWheelTimer() throws InterruptedException
OrderDelayEvent orderDelay = OrderDelayFactory.newOrderDelay("大忽悠", "小风扇", 13.4, 10L);
OrderDelayEvent orderDelay1 = OrderDelayFactory.newOrderDelay("小朋友", "冰箱", 3000.0, 20L);
Timeout timeout = WheelTimerHelper.addNewTask(orderDelay);
Timeout timeout1 = WheelTimerHelper.addNewTask(orderDelay1);
//订单二在到期前成功结算,因此不需要取消
orderDelay1.getOrder().submitOrder();
//取消延迟任务二
timeout1.cancel();
//阻塞,防止程序结束
Thread.sleep(TimeUnit.SECONDS.toMillis(100L));
详细代码实现,可以fork仓库看源码
优缺点
时间轮算法实现延时任务的优点就是,相对于使用JDK的DelayQueue,其算法上具有优势,执行性能相对好一些。其缺点就是所有的延时任务以及延时触发的管理,都是在单个应用服务的内存中进行的,一旦该应用服务发生故障重启服务,时间轮任务数据将全部丢失。这一缺点和DelayQueue是一样的。为了解决这个问题,我们可以使用redis、RocketMQ等分布式中间件来管理延时任务消息的方式来实现延时任务。
小结
本文主要对延迟任务基于内存的单体应用实现给出了两种解决策略,下一篇文章中,我们将针对基于内存的单体解决方法缺陷,给出基于redis和mq实现介绍。
以上是关于延迟任务多种实现姿势--上的主要内容,如果未能解决你的问题,请参考以下文章