线程池工具类几种实现

Posted stubborn-dude

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池工具类几种实现相关的知识,希望对你有一定的参考价值。

一 线程池工具类
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**

  • @Description 线程池工具类
    */
    public class ThreadPoolUtil {

    /**

    • 核心线程数,会一直存活,即使没有任务,线程池也会维护线程的最少数量
      /
      private static final int SIZE_CORE_POOL = 5;
      /
      *
    • 线程池维护线程的最大数量
      /
      private static final int SIZE_MAX_POOL = 10;
      /
      *
    • 线程池维护线程所允许的空闲时间
      /
      private static final long ALIVE_TIME = 2000;
      /
      *
    • 线程缓冲队列
      */
      private static BlockingQueue bqueue = new ArrayBlockingQueue(100);
      private static ThreadPoolExecutor pool = new ThreadPoolExecutor(SIZE_CORE_POOL, SIZE_MAX_POOL, ALIVE_TIME, TimeUnit.MILLISECONDS, bqueue, new ThreadPoolExecutor.CallerRunsPolicy());

    static {
    pool.prestartAllCoreThreads();
    }

    public static ThreadPoolExecutor getPool() {
    return pool;
    }
    }

测试类
import com.dashuai.cloud.consulconsumer.util.ThreadPoolUtil;

public class TestUtil {
public static void main(String[] args) {
ThreadPoolUtil.getPool().execute(new Runnable() {
@Override
public void run() {
System.out.println("线程池调用");
}
});
}
}

二 线程池支持多线程返回结果
import org.springframework.stereotype.Service;

/**

  • ClassName:CommenThreadPoolUtil
  • Function:线程池公共入口处理类.

*/
@Service
public class CommonThreadPoolUtil {

// 核心线程数(默认初始化为10)
private int cacheCorePoolSize = 8;

// 核心线程控制的最大数目
private int maxCorePoolSize = 160;

// 队列等待线程数阈值
private int blockingQueueWaitSize = 16;

// 核心线程数自动调整的增量幅度
private int incrementCorePoolSize = 4;

// 初始化线程对象ThreadLocal,重写initialValue(),保证ThreadLocal首次执行get方法时不会null异常
private ThreadLocal<List<Future<?>>> threadlocal = new ThreadLocal<List<Future<?>>>() {

    protected List<Future<?>> initialValue() {

        return new ArrayList<Future<?>>();
    }
};

// 初始化线程池
private MyselfThreadPoolExecutor ThreadPool = new MyselfThreadPoolExecutor(cacheCorePoolSize, cacheCorePoolSize, 0L,
        TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>());

/**
 *
 * dealTask:(线程池执行操作-包含每个进程返回结果). <br/>
 * 1、运用场景:例如,需要同时校验很多不同的逻辑,依赖于获取校验结果响应给用户; 2、具体实现java类:implements
 * 的Callable接口,重写call方法即可,支持返回值
 *
 * @author
 * @param callable
 * @return
 */
public Map<String, Object> dealTask(Callable<?> callable) {

    try {
        // 动态更改核心线程数大小
        dynamicTuningPoolSize();
        // 执行线程业务逻辑及获取返回结果
        Future<?> result = ThreadPool.submit(callable);
        // 获取当前进程的局部变量
        List<Future<?>> threadLocalResult = threadlocal.get();
        // 叠加主进程对应的多个进程处理结果
        threadLocalResult.add(result);
        // 设置最新的threadLocal变量到当前主进程
        threadlocal.set(threadLocalResult);
    } catch (Exception e) {
        e.printStackTrace();
        return errorResp("线程池发生异常-Future", null);
    }
    return successResp(null);
}

/**
 *
 * dealTask:(线程池执行操作-不包含每个进程返回结果). <br/>
 * 1、运用场景:例如,不依赖于响应给用户执行结果的业务逻辑 ; 2、具体实现java类:implements
 * 的Runnable接口,重写run方法,没有返回值
 *
 * @author
 * @param runnable
 * @return
 */
public Map<String, Object> dealTask(Runnable runnable) {

    try {
        // 动态更改核心线程数大小
        dynamicTuningPoolSize();
        // 执行线程业务逻辑
        ThreadPool.execute(runnable);
    } catch (Exception e) {
        e.printStackTrace();
        return errorResp("线程池发生异常", null);
    }
    return successResp(null);
}

/**
 * obtainTaskFuture:(获取线程池执行结果:此为阻塞线程,即所有线程都执行完成才能获取结果,故应将执行时间稍长的业务逻辑先执行,
 * 减少等待时间). <br/>
 * 此方法只能调用一次,即调用之后清除ThreadLocal变量,以便于同一进程再次调用线程池获取最新的执行结果以及释放内存, 防止内存泄露
 *
 * @author
 * @return
 */
public Map<String, Object> obtainTaskFuture() {

    List<Future<?>> threadLocalResult = null;
    try {
        // 获取当前进程变量
        threadLocalResult = threadlocal.get();
        if (threadLocalResult == null || threadLocalResult.size() == 0) {
            return errorResp("获取线程池执行结果为空", null);
        } else {
            return successResp(threadLocalResult);
        }
    } catch (Exception e) {
        return errorResp("获取线程池执行结果发生异常:" + e.getMessage(), null);
    } finally {
        // 1、释放内存;2、防止主进程再次调用线程池方法时对结果互有影响。
        threadlocal.remove();
    }

}

/**
 *
 * dynamicTuningPoolSize:(动态改变核心线程数). <br/>
 *
 * @author
 * @return
 */
private void dynamicTuningPoolSize() {

    // 队列等待任务数(此为近似值,故采用>=判断)
    int queueSize = ThreadPool.getQueueSize();
    // 动态更改核心线程数大小
    if (queueSize >= blockingQueueWaitSize) {
        // 核心线程数小于设定的最大线程数才会自动扩展线程数
        if (cacheCorePoolSize <= maxCorePoolSize) {
            // 原有核心线程数
            int corePoolSize = ThreadPool.getCorePoolSize();
            // 将要累积的核心线程数
            int currentcorePoolSize = corePoolSize + incrementCorePoolSize;
            ThreadPool.setCorePoolSize(currentcorePoolSize);
            ThreadPool.setMaximumPoolSize(currentcorePoolSize);
            cacheCorePoolSize = currentcorePoolSize;
            System.out.println("动态改变线程池大小====原核心线程池数目为:" + corePoolSize + ";现累加为:" + currentcorePoolSize);
        } else {
            System.out.println("动态改变线程池大小====核心线程池数目已累加为:" + cacheCorePoolSize + ";不会继续无限增加");
        }
    }
}

/**
 * 获取核心线程数 getCacheCorePoolSize:(). <br/>
 *
 * @author
 * @return
 */
public int getCacheCorePoolSize() {

    return ThreadPool.getCorePoolSize();
}

/**
 * 设置核心线程数 setCacheCorePoolSize:(). <br/>
 *
 * @author
 * @param cacheCorePoolSize
 */
public void setCacheCorePoolSize(int cacheCorePoolSize) {

    ThreadPool.setCorePoolSize(cacheCorePoolSize);
    ThreadPool.setMaximumPoolSize(cacheCorePoolSize);
    this.cacheCorePoolSize = cacheCorePoolSize;
}

/**
 *
 * successResp:(正确响应信息). <br/>
 *
 * @author
 * @param data
 * @return
 */
private Map<String, Object> successResp(Object data) {

    Map<String, Object> result = new HashMap<String, Object>();
    result.put("status", "0");
    result.put("data", data);
    return result;

}

/**
 *
 * errorResp:(错误响应信息). <br/>
 *
 * @author
 * @param errorMsg
 * @param data
 * @return
 */
public Map<String, Object> errorResp(String errorMsg, Object data) {

    Map<String, Object> result = new HashMap<String, Object>();
    result.put("status", "1");
    result.put("msg", errorMsg);
    result.put("data", data);
    return result;

}

}

创建线程池类

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MyselfThreadPoolExecutor extends ThreadPoolExecutor {

// 初始化父类构造函数及startTime
public MyselfThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
		long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {

	super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

// 按过去执行已提交任务的顺序发起一个有序的关闭,但是不接受新任务(已执行的任务不会停止)
@Override
public void shutdown() {

	super.shutdown();

}

// 尝试停止所有的活动执行任务、暂停等待任务的处理,并返回等待执行的任务列表。在从此方法返回的任务队列中排空(移除)这些任务。并不保证能够停止正在处理的活动执行任务,但是会尽力尝试。
@Override
public List<Runnable> shutdownNow() {

	return super.shutdownNow();

}

// 在执行给定线程中的给定 Runnable 之前调用的方法.可用于重新初始化ThreadLocals或者执行日志记录。
@Override
protected void beforeExecute(Thread t, Runnable r) {

	super.beforeExecute(t, r);
}

// 基于完成执行给定 Runnable 所调用的方法
@Override
protected void afterExecute(Runnable r, Throwable t) {

	super.afterExecute(r, t);

	try {
		// Future<?> result = (Future<?>) r;
		// "任务结果:" result.get();
	} catch (Exception e) {
	}
}

/**
 * 
 * getQueueSize:(已执行的任务数). <br/>
 *
 * @author
 * @return
 */
@Override
public long getCompletedTaskCount() {

	return super.getCompletedTaskCount();
}

/**
 * 
 * getQueueSize:(正在运行的任务数). <br/>
 *
 * @author
 * @return
 */
@Override
public int getActiveCount() {

	return super.getActiveCount();
}

/**
 * 
 * getQueueSize:(队列等待任务数). <br/>
 *
 * @author
 * @return
 */
public int getQueueSize() {

	return getQueue().size();
}

}
测试类
public class TestUtil {
public static void main(String[] args) {

    CommonThreadPoolUtil poolUtil = new CommonThreadPoolUtil();
    poolUtil.dealTask(new Runnable() {
        @Override
        public void run() {
            System.out.println("线程池调用");
        }
    });
}

}

三 jdk1.5之后提供工具类 Executors
工具类Executors面提供了一些静态工厂方法,生成一些常用的线程池,如下所示:

  • newCachedThreadPool:创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制(Interger. MAX_VALUE),线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。

  • newFixedThreadPool:创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

  • newSingleThreadExecutor:创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

  • newScheduledThreadPool:创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。

总结:除了newScheduledThreadPool的内部实现特殊一点之外,其它线程池内部都是基于 ThreadPoolExecutor 类(Executor的子类)实现的。
实现:
public class TestUtil {
public static void main(String[] args) {

    ScheduledExecutorService scheduExec = Executors.newScheduledThreadPool(10);
    scheduExec.schedule(new Runnable() {

        @SuppressWarnings("static-access")
        @Override
        public void run() {
            System.out.println("20秒后处理");
        }
    }, 20, TimeUnit.SECONDS);
}

}
周期性定时任务20秒后执行


















































以上是关于线程池工具类几种实现的主要内容,如果未能解决你的问题,请参考以下文章

javaJava运行时动态生成类几种方式

java多线程-Executors实现的几种线程池以及Callable

newCacheThreadPool()newFixedThreadPool()newScheduledThreadPool()newSingleThreadExecutor()自定义线程池(代码片段

线程池-实现一个取消选项

深度几种线程池的实现算法分析

5种方法,教你判断线程池是不是全部完成