延迟任务多种实现姿势--上

Posted 热爱编程的大忽悠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了延迟任务多种实现姿势--上相关的知识,希望对你有一定的参考价值。

延迟任务多种实现姿势--上


关于延迟任务的所有代码实现均存放在下面这个仓库中:

https://gitee.com/DaHuYuXiXi/deley-task

代码仓库中的源码经过后续更新后,对相关实现进行了抽象,因此会与本文展示的代码有稍许不同


什么是延迟任务

例如:pdd下单,但是没有付款,那么24小时候,订单会自动取消。收货后,如果一直不进行确认,那么默认七天后自动确认收货等等。

上面这些场景是我们平日中一直都会遇到的,作为程序员的我们,有没有考虑过该怎么实现这些延迟任务呢?


一,最简单的延迟队列实现

DelayQueue是一个无界的BlockingQueue的实现类,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。

  • BlockingQueue即阻塞队列,java提供的面向多线程安全的队列数据结构,当队列内元素数量为0的时候,试图从队列内获取元素的线程将被阻塞或者抛出异常。
  • 这里的“无界”队列,是指队列的元素数量不存在上限,队列的容量会随着元素数量的增加而扩容。


DelayQueue实现了BlockingQueue接口,所以具有无界、阻塞的特点,除此之外它自己的核心特点就是:

  • 放入该队列的延时任务对象,只要到达延时时间之后才能被取到。
  • DelayQueue 不接受null元素。
  • DelayQueue 只接受那些实现了java.util.concurrent.Delayed接口的对象。

订单延迟任务实现

package com.delayTask.delayQueue;


import com.delayTask.domain.Order;
import lombok.Data;
import lombok.ToString;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * 延时订单任务
 *
 * @author zdh
 */
@ToString
@Data
public class OrderDelayObject implements Delayed 

    /**
     * 延迟任务唯一标识: 这里默认为当前时间戳
     */
    private Long id;

    /**
     * 延时时间
     */
    private long delayTime;

    /**
     * 订单对象
     */
    private Order order;

    public OrderDelayObject(long delayTime, Order order) 
        this.id = System.currentTimeMillis();
        //延时时间加上当前时间
        this.delayTime = System.currentTimeMillis() + delayTime;
        this.order = order;
    


    /**
     * 延迟任务是否到期
     */
    @Override
    public long getDelay(TimeUnit unit) 
        long diff = delayTime - System.currentTimeMillis();
        return unit.convert(diff, TimeUnit.MILLISECONDS);
    

    /**
     * 延时任务队列,按照延时时间元素排序,实现Comparable接口
     */
    @Override
    public int compareTo(Delayed obj) 
        return Long.compare(this.delayTime, ((OrderDelayObject) obj).delayTime);
    
 
  • Delayed接口继承Comparable接口,所以需要实现compareTo方法,用于延时任务在队列中按照“延时时间”进行排序。
  • getDelay方法是Delayed接口方法,实现该方法提供获取延时任务的倒计时时间

订单处理

package com.dhy.delayQueue;


import com.delayTask.delayQueue.OrderDelayFactory;
import com.delayTask.delayQueue.OrderDelayObject;
import com.delayTask.domain.Order;
import lombok.extern.slf4j.Slf4j;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@Slf4j
public class DelayQueueTest 
    /**
     * 延迟队列
     */
    private final DelayQueue<OrderDelayObject> delayQueue = new DelayQueue<>();

    /**
     * 开启线程不断轮询,看是否有延迟任务可以处理
     */
    @BeforeTest
    public void beforeTest() 
        Executors.newSingleThreadExecutor().execute(() -> 
            try 
                while (true) 
                    //阻塞直到获取到某个到时的延迟任务
                    OrderDelayObject delayObject = delayQueue.take();
                    log.info("延迟任务信息如下: ",delayObject);
                    Order order = delayObject.getOrder();
                    order.cancelOrderByTimeEnd();
                
             catch (InterruptedException e) 
                e.printStackTrace();
            
        );
    

    /**
     * 测试下单
     */
    @Test
    public void testOrder() throws InterruptedException 
        OrderDelayObject orderDelay = OrderDelayFactory.newOrderDelay("大忽悠", "小风扇", 13.4, 10L);
        delayQueue.add(orderDelay);

        OrderDelayObject orderDelay1 = OrderDelayFactory.newOrderDelay("小朋友", "冰箱", 3000.0, 20L);
        delayQueue.add(orderDelay1);

        Thread.sleep(TimeUnit.SECONDS.toMillis(8L));

        orderDelay.getOrder().submitOrder();
        delayQueue.remove(orderDelay);

        //防止程序结束
        Thread.sleep(TimeUnit.MINUTES.toMillis(10L));
    





优缺点

使用DelayQueue实现延时任务非常简单,而且简便,全部都是标准的JDK代码实现,不用引入第三方依赖(不依赖redis实现、消息队列实现等),非常的轻量级。

它的缺点就是所有的操作都是基于应用内存的,一旦出现应用单点故障,可能会造成延时任务数据的丢失。如果订单并发量非常大,因为DelayQueue是无界的,订单量越大,队列内的对象就越多,可能造成OOM的风险。所以使用DelayQueue实现延时任务,只适用于任务量较小的情况。


优化点

上图中我们使用的是while-true循环同步顺序的处理延迟任务:


这里建议将订单处理的业务逻辑放到单独一个线程池中进行处理,而非在这里同步进行处理,因为这样可能会导致部分到期的延迟任务无法得到及时的处理。


二,上点档次,基于Netty时间轮算法实现

时间轮算法


时间轮算法名副其实,时间轮就是一个环形的数据结构,类似于表盘,将时间轮分成多个bucket(比如:0-8)。假设每个时间轮轮片的分隔时间段tickDuration=1s(即:指针经过每个格子花费时间是 1 s),当前的时间bucket=3,那么在18秒后需要被执行的任务需要落到((3+18)%8=5取余运算)的5号bucket上。假如有多个需要在该时间段内执行的任务,就会组成一个双向链表。另外针对时间轮我们要有下面的几个认知:

  • 时间轮指针是一个Worker线程,在时间轮整点的时候执行双向链表中的任务。
  • 时间轮算法的并不是精准的延时,它的执行精度取决于每个时间轮轮片的分隔时间段tickDuration
  • Worker线程是单线程,一个bucket、一个bucket的顺序处理任务。「所以我们的延时任务一定要做成异步任务,否则会影响时间轮后续任务的执行时间。」

更加详细介绍,可以参考此篇文章


订单延迟任务实现

这里商品订单到时取消对时间精确度的要求并不是特别高,因此可以选择采用时间轮算法进行处理。

首先通过maven坐标引入netty

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>5.0.0.Alpha2</version>
        </dependency>
  • 对netty时间轮的使用,进行一层简单的封装
package com.delayTask.wheelTimer;

import com.delayTask.DelayTaskEvent;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * <h>
 *     时间轮工厂
 * </h>
 * @author 大忽悠
 * @create 2022/9/17 17:17
 */
public class WheelTimerHelper 
    /**
     * 处理订单任务的线程池
     */
    private static final ExecutorService THREAD_POOL= Executors.newCachedThreadPool();

    /**
     * 时间轮
     */
    private static HashedWheelTimer wheelTimer;

    /**
     * 生产一个时间轮,默认的bucket数量为512个
     */
    public static HashedWheelTimer newWheelTimer(Long duration)
        wheelTimer=new HashedWheelTimer(duration, TimeUnit.MILLISECONDS, 512);
        return wheelTimer;
    

    /**
     * @param delayTaskEvent 延迟任务事件
     */
    public static Timeout addNewTask(DelayTaskEvent delayTaskEvent)
        //延迟任务,延迟时间,时间单位
        return wheelTimer.newTimeout(delayTask -> 
            delayTaskEvent.handleDelayEvent();
        , delayTaskEvent.getDelay(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
    

  • 测试
package com.dhy.wheelTimer;

import com.delayTask.delayQueue.OrderDelayEvent;
import com.delayTask.delayQueue.OrderDelayFactory;
import com.delayTask.wheelTimer.WheelTimerHelper;
import io.netty.util.Timeout;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;

import java.util.concurrent.TimeUnit;

/**
 * @author 大忽悠
 * @create 2022/9/17 17:19
 */
public class WheelTimerTest 
    @BeforeTest
    public void beforeTest()
        WheelTimerHelper.newWheelTimer(100L);
    

    @Test
    public void testWheelTimer() throws InterruptedException 
        OrderDelayEvent orderDelay =  OrderDelayFactory.newOrderDelay("大忽悠", "小风扇", 13.4, 10L);
        OrderDelayEvent orderDelay1 = OrderDelayFactory.newOrderDelay("小朋友", "冰箱", 3000.0, 20L);
        Timeout timeout = WheelTimerHelper.addNewTask(orderDelay);
        Timeout timeout1 = WheelTimerHelper.addNewTask(orderDelay1);

        //订单二在到期前成功结算,因此不需要取消
        orderDelay1.getOrder().submitOrder();
        //取消延迟任务二
        timeout1.cancel();

        //阻塞,防止程序结束
        Thread.sleep(TimeUnit.SECONDS.toMillis(100L));
    


详细代码实现,可以fork仓库看源码


优缺点

时间轮算法实现延时任务的优点就是,相对于使用JDK的DelayQueue,其算法上具有优势,执行性能相对好一些。其缺点就是所有的延时任务以及延时触发的管理,都是在单个应用服务的内存中进行的,一旦该应用服务发生故障重启服务,时间轮任务数据将全部丢失。这一缺点和DelayQueue是一样的。为了解决这个问题,我们可以使用redis、RocketMQ等分布式中间件来管理延时任务消息的方式来实现延时任务。


小结

本文主要对延迟任务基于内存的单体应用实现给出了两种解决策略,下一篇文章中,我们将针对基于内存的单体解决方法缺陷,给出基于redis和mq实现介绍。

以上是关于延迟任务多种实现姿势--上的主要内容,如果未能解决你的问题,请参考以下文章

延迟任务多种实现姿势--下

RabbitMQ 延迟队列实现定时任务的正确姿势

RabbitMQ 延迟队列实现定时任务的正确姿势

什么可能会延迟我的芹菜任务?

Jenkins连接k8s的多种姿势

在页面上延迟应用 css