环形队列高效触发大量超时任务的算法实现

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了环形队列高效触发大量超时任务的算法实现相关的知识,希望对你有一定的参考价值。

基于环形队列的超时触发算法只需要一个timer即可实现批量超时任务的触发,CPU消耗低,效率高。下面是此算法的简单实现。

1,TaskHolder.java

package com.zws.timer;
/**
 * 
 * @author wensh.zhu
 * @date 2018-04-22
 */
public class TaskHolder {
	
	/** 任务所需等待的圈数,即任务需要走几圈**/
	private int cycles;
	private int delays;
	private Runnable task;
	
	public TaskHolder() {}

	public TaskHolder(int cycles, int delays, Runnable task) {
		this.cycles = cycles;
		this.delays = delays;
		this.task = task;
	}
	
	public boolean isTimeOut() {
		return cycles <= 0;
	}
	
	public void cutDown() {
		cycles --;
	}

	public int getCycles() {
		return cycles;
	}

	public void setCycles(int cycles) {
		this.cycles = cycles;
	}

	public int getDelays() {
		return delays;
	}

	public void setDelays(int delays) {
		this.delays = delays;
	}

	public Runnable getTask() {
		return task;
	}

	public void setTask(Runnable task) {
		this.task = task;
	}

	@Override
	public String toString() {
		return "TaskHolder[cycles=" + cycles + ", delays=" + delays + "]";
	}
	
}

2,TimerContext.java

package com.zws.timer;

import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
 * 
 * @author wensh.zhu
 * @date 2018-04-22
 */
public class TimerContext {

	public static final int DEFAULT_TICKS = 60;
	public static final int DEFAULT_TICK_DURATION = 1;
	
	private Map<Integer, Queue<TaskHolder>> taskHolders;
	private volatile int currentTick = 0;
	
	/** tick一圈的长度 **/
	private int ticks = DEFAULT_TICKS;
	
	/** 每tick一次的时间间隔,单位:秒**/
	private int tickDuration = DEFAULT_TICK_DURATION;
	
	public TimerContext() {
		init();
	}

	public TimerContext(int ticks, int tickDuration) {
		if (ticks <= 0)
			throw new IllegalArgumentException("ticks must be greater than 0");
		
		if (tickDuration <= 0)
			throw new IllegalArgumentException("tickDuration must be greater than 0");
		
		this.ticks = ticks;
		this.tickDuration = tickDuration;
		init();
	}
	
	private void init() {
		taskHolders = new ConcurrentHashMap<Integer, Queue<TaskHolder>>();
		for (int i = 0; i < ticks; i ++)
			taskHolders.put(i, new ConcurrentLinkedQueue<TaskHolder>());
	}
	
	/**
	 * 添加一个定时任务并计算需要走的圈数和落脚的index
	 * @param task
	 * @param delays
	 */
	public void addTask(Runnable task, int delays) {
		if (task == null) 
			throw new NullPointerException("task must not be null");
		
		if (delays <=0) 
			throw new IllegalArgumentException("delays must be greater than 0");
		
		int allSeconds = ticks * tickDuration;
		int cycles = delays / allSeconds;
		int index = ((delays % allSeconds) / tickDuration) + currentTick;
		TaskHolder metaData = new TaskHolder(cycles, delays, task);
		taskHolders.get(index).add(metaData);
	}
	
	public int tick() {
		currentTick = (currentTick + 1) % ticks;
		return currentTick;
	}
	
	public Queue<TaskHolder> getCurrentTasks() {
		return taskHolders.get(currentTick);
	}

	public int getCurrentTick() {
		return currentTick;
	}

	public int getTicks() {
		return ticks;
	}

	public int getTickDuration() {
		return tickDuration;
	}
	
	@Override
	public String toString() {
		return "TimerContext [timers=" + taskHolders + ", ticks=" + ticks + ", tickDuration=" + tickDuration
				+ ", currentTick=" + currentTick + "]";
	}
}

3,TimerScheduler.java

package com.zws.timer;

import java.io.IOException;
import java.util.Iterator;
import java.util.Queue;
import java.util.Timer;
import java.util.TimerTask;
/**
 * 用于判断定时器是否到时、执行任务、维护定时器状态。
 * @author wensh.zhu
 * @date 2018-04-22
 */
public class TimerScheduler extends TimerTask {
	
	private TimerContext timerContext;
	
	public TimerScheduler() {}

	public TimerScheduler(TimerContext timerContext) {
		this.timerContext = timerContext;
	}
	
	/**
	 * 定时检测,如果定时器触发时间到了就从集合中删除并执行任务,否则圈数减一。
	 */
	@Override
	public void run() {
		if (timerContext == null) 
			return;
		
		Queue<TaskHolder> timers = timerContext.getCurrentTasks();
		Iterator<TaskHolder> itor = timers.iterator();
		while (itor.hasNext()) {
			TaskHolder timer = itor.next();
			if (timer.isTimeOut()) {
				itor.remove();
				new Thread(timer.getTask()).start();
			} else {
				timer.cutDown();
			}
		}
		
		timerContext.tick();
	}
	
	public void addTask(Runnable task, int delays) {
		timerContext.addTask(task, delays);
	}

	public TimerContext getTimerContext() {
		return timerContext;
	}

	public void setTimerContext(TimerContext timerContext) {
		this.timerContext = timerContext;
	}
	
	public static void main(String[] args) throws IOException {
		TimerContext context = new TimerContext(60, 1);
		TimerScheduler sheduler = new TimerScheduler(context);
		sheduler.addTask(new Runnable() {
			
			public void run() {
				System.out.println(DateUtils.now());
			}
		}, 60);
		System.out.println(DateUtils.now());
		
		Timer timer = new Timer();
		timer.scheduleAtFixedRate(sheduler, 0, context.getTickDuration() * 1000L);
		
		System.in.read();
	}
	
}

4,DateUtils.java

package com.zws.timer;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
/**
 * 
 * @author wensh.zhu
 * @date 2018-04-22
 */
public class DateUtils {
	public static final String DEFAULT_PATTERN = "yyyy-MM-dd HH:mm:ss";

	public static String now() {
		LocalDateTime time = LocalDateTime.now();
		return time.format(DateTimeFormatter.ofPattern(DEFAULT_PATTERN));
	}
	
	public static String plusSeconds(int seconds) {
		LocalDateTime time = LocalDateTime.now();
		time.plusSeconds(seconds);
		return time.format(DateTimeFormatter.ofPattern(DEFAULT_PATTERN));
	}
}


以上是关于环形队列高效触发大量超时任务的算法实现的主要内容,如果未能解决你的问题,请参考以下文章

管理大量定时任务,如果高效触发超时?

分布式任务调度架构原理和设计介绍

10w定时任务,如何高效触发超时

简单说说Kafka中的时间轮算法

采用简易的环形延时队列处理秒级定时任务的解决方案

原 荐 简单说说Kafka中的时间轮算法