线程池的使用

Posted blacksmallcat

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池的使用相关的知识,希望对你有一定的参考价值。

1...线程池参数的配置

executor:
  taskTimeSeconds: 0.1
  tasksParSecond: 100
  taskWaitTimeSeconds: 0.01
  uCPU: 0.8
  taskResponseTimeSeconds: 0.02
  keepAliveTime: 60
  rejectedType: 0
  queueType: 0
  unitType: 0

2...注入线程池

package com.test.domi.config;

import com.test.domi.executor.ThreadPoolUtil;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
public class executorConfig {

    @Value("${executor.taskResponseTimeSeconds}")
    private double taskResponseTimeSeconds;
    @Value("${executor.taskWaitTimeSeconds}")
    private double taskWaitTimeSeconds;

    @Value("${executor.taskTimeSeconds}")
    private double taskTimeSeconds;
    @Value("${executor.tasksParSecond}")
    private int tasksParSecond;
    @Value("${executor.uCPU}")
    private double uCPU;

    @Value("${executor.keepAliveTime}")
    private long keepAliveTime;
    @Value("${executor.rejectedType}")
    private int rejectedType;
    @Value("${executor.queueType}")
    private int queueType;
    @Value("${executor.unitType}")
    private int unitType;

    @Bean("executor")
    public ThreadPoolExecutor getThreadPoolExecutor(){
        ThreadPoolUtil threadPoolUtil = new ThreadPoolUtil(taskTimeSeconds, tasksParSecond, uCPU, taskResponseTimeSeconds,
                taskWaitTimeSeconds,  keepAliveTime,  unitType, queueType, rejectedType);
        return threadPoolUtil.getThreadPoolExecutorForCPU();
    }
}

3...自定义线程工厂

package com.test.domi.executor;

import org.apache.commons.lang.StringUtils;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class WarmThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    WarmThreadFactory(String poolName) {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
        poolName = StringUtils.isBlank(poolName) ? "-" : (poolName + "-");
        namePrefix = poolName + "pool-" +  poolNumber.getAndIncrement() +  "-thread-";
    }

    @Override
    public Thread newThread(Runnable r) {
        String rName = r.toString();
        String threadName = namePrefix + threadNumber.getAndIncrement() + "-" + rName.substring((rName.indexOf("$") + 1),rName.indexOf("["));
        Thread t = new Thread(group, r,threadName,0);
        if (t.isDaemon()) {
            t.setDaemon(false);
        }
        if (t.getPriority() != Thread.NORM_PRIORITY) {
            t.setPriority(Thread.NORM_PRIORITY);
        }
        return t;
    }

}

4...自定义拒绝策略

package com.test.domi.executor;

import com.alibaba.fastjson.JSON;

import java.util.Random;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;

public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
    private static final AbortTaskQueue abortTaskQueue = AbortTaskQueue.newInstance();

    public AbortPolicyWithReport() {}
                                                                                   
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        //todo 记录任务至于DB,MQ,等,实现最终一致性.
        abortTaskQueue.add(r);
        String param = JSON.toJSONString(r);
//        if (new Random().nextInt(6) > 1) {
//            Runnable poll = abortTaskQueue.poll();
//            System.out.println("我消费了一个");
//        }
        System.out.println("线程" + Thread.currentThread().getName() + "丢弃任务,参数为==》" + param);
        throw new RejectedExecutionException("Task-" + r.toString() + " -rejected from- " + e.toString());
    }


}

5...自定义拒绝队列之外的任务队列(单例,安全)

package com.test.domi.executor;

import java.io.Serializable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class AbortTaskQueue implements Serializable {
    private static  AbortTaskQueue aborttaskqueue = null;
    private static final ConcurrentLinkedQueue<Runnable> concurrentLinkedQueue = new ConcurrentLinkedQueue();
    private static final int MAX_LENGTH = 10000;
    private final AtomicInteger count = new AtomicInteger(0);

    private AbortTaskQueue() {}
    public Runnable poll() {
        Runnable poll = concurrentLinkedQueue.poll();
        if (poll == null) {
            count.set(0);
        }
        count.decrementAndGet();
        return poll;
    }

    public void add(Runnable r) {
        int total = count.get();
        if (total < MAX_LENGTH) {
            concurrentLinkedQueue.add(r);
            count.getAndIncrement();
        }
    }

    public static AbortTaskQueue newInstance(){
        if (aborttaskqueue == null) {
            synchronized (AbortTaskQueue.class) {
                if (aborttaskqueue == null) {
                    aborttaskqueue = new AbortTaskQueue();
                }
            }
        }
        return aborttaskqueue;
    }

    public int size(){
        return count.get();
    }
}

6...自定线程池

package com.test.domi.executor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

public class WarmThreadPoolExecutor extends ThreadPoolExecutor {
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final ThreadLocal<Long> startTime = new ThreadLocal<>();
    private final AtomicLong numTasks = new AtomicLong();
    private final AtomicLong totalTime = new AtomicLong();

    public WarmThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                              TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
    public WarmThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                              TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory);
    }
    public WarmThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }
    public WarmThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                              BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);
    }


    protected void beforeExecute(Thread t,Runnable r) {
        super.beforeExecute(t,r);
        log.info(String.format("Thread %s: start======> %s",t,r));
        startTime.set(System.nanoTime());
    }

    protected void afterExecute(Runnable r,Throwable t) {
        try {
            long endTime = System.nanoTime();
            long taskTime = endTime - startTime.get();
            numTasks.incrementAndGet();
            totalTime.addAndGet(taskTime);
            log.info(String.format("Thread %s: end=====> %s, time = %dns", t, r, taskTime));
        }finally {
            super.afterExecute(r,t);
        }
    }

    protected void terminated() {
        try {
            log.info(String.format("Terminated: avg time ======> %dns", totalTime.get() / numTasks.get()));
        }finally {
            super.terminated();
        }
    }
}

7...注入线程池的工具类

package com.test.domi.executor;

import java.math.BigDecimal;
import java.util.concurrent.*;

public class ThreadPoolUtil {
    private static final int PROCESSORS = Runtime.getRuntime().availableProcessors();
    private static final String POOL_NAME_PRE = "cpu-task";

    private double taskResponseTimeSeconds;
    private double taskWaitTimeSeconds;
    private double taskTimeSeconds;
    private int tasksParSecond;
    private double uCPU;
    private long keepAliveTime;
    private int rejectedType;
    private int queueType;
    private int unitType;


    public ThreadPoolUtil(double taskTimeSeconds,int tasksParSecond,double uCPU,double taskResponseTimeSeconds,
                          double taskWaitTimeSeconds, long keepAliveTime, int unitType,int queueType,int rejectedType){
      this.taskTimeSeconds = taskTimeSeconds;
      this.tasksParSecond = tasksParSecond;
      this.uCPU = uCPU;

      this.taskResponseTimeSeconds = taskResponseTimeSeconds;
      this.taskWaitTimeSeconds = taskWaitTimeSeconds;

      this.keepAliveTime = keepAliveTime;
      this.unitType = unitType;
      this.queueType = queueType;
      this.rejectedType = rejectedType;
    }


    public ThreadPoolExecutor getThreadPoolExecutorForIO() {
        return getThreadPoolExecutor(2);
    }

    public ThreadPoolExecutor getThreadPoolExecutorForCPU() {
        return getThreadPoolExecutor(1);

    }

    private ThreadPoolExecutor getThreadPoolExecutor(int executorForType) {
        int corePoolSize = getCorePoolSize();
        int maxPoolSize = getMaxPoolSize(corePoolSize,1);
        TimeUnit timeUnit = getTimeUnit(unitType);
        BlockingQueue queue = getBlockingQueue(queueType,corePoolSize);
        System.out.println("创建线程池的参数为corePoolSize=" + corePoolSize + "maxPoolSize = " + maxPoolSize + "queueSize=" + getQueueSize(corePoolSize));
        ThreadPoolExecutor warmThreadPoolExecutor = new WarmThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit,
                queue, new WarmThreadFactory(POOL_NAME_PRE), new AbortPolicyWithReport());
        return warmThreadPoolExecutor;
    }

    private int getMaxPoolSize(int corePoolSize,int executorForType) {
        int maxPoolSize;
        if (executorForType == 1) {
            maxPoolSize = getMaxCorePoolSizeForCPUtask();
        } else {
            maxPoolSize = getMaxCorePoolSizeForIOtask();
        }
        return maxPoolSize < corePoolSize ? corePoolSize : maxPoolSize;
    }


    private TimeUnit getTimeUnit(int timeUnit){
        TimeUnit resultTimeUnit = TimeUnit.MILLISECONDS;
        switch (timeUnit){
            case 1:
                resultTimeUnit =  TimeUnit.SECONDS;
                break;
            case 2:
                resultTimeUnit =  TimeUnit.MICROSECONDS;
                break;
            case 3:
                resultTimeUnit =  TimeUnit.NANOSECONDS;
                break;
            case 4:
                resultTimeUnit =  TimeUnit.MINUTES;
                break;
            case 5:
                resultTimeUnit =  TimeUnit.HOURS;
                break;
            case 6:
                resultTimeUnit =  TimeUnit.DAYS;
                break;
            default:
                break;
        }
        return resultTimeUnit;
    }

    private BlockingQueue getBlockingQueue(int queueType, int corePoolSize){
        BlockingQueue queue = new ArrayBlockingQueue(getQueueSize(corePoolSize));
        switch (queueType){
            case 1:
                queue =  new LinkedBlockingQueue();
                break;
            case 2:
                queue =  new SynchronousQueue();
                break;
            default:
                break;
        }
        return queue;
    }

    private RejectedExecutionHandler getRejectedExecutionHandler(int rejectedType){
        RejectedExecutionHandler rejectedExecutionHandler = new AbortPolicyWithReport();
        switch (rejectedType) {
            case 1:
                rejectedExecutionHandler = new ThreadPoolExecutor.DiscardPolicy();
                break;
            case 2:
                rejectedExecutionHandler = new ThreadPoolExecutor.DiscardOldestPolicy();
                break;
            case 3:
                rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
                break;
            default:
                break;
        }
        return rejectedExecutionHandler;
    }

    private int getCorePoolSize(){
        double size = tasksParSecond * taskTimeSeconds;
        return getIntFromDouble(size);
    }

    private int getMaxCorePoolSizeForCPUtask(){
        return PROCESSORS + 1;
    }

    private int getMaxCorePoolSizeForIOtask(){
        if (uCPU <= 0 || taskTimeSeconds <= 0 || taskWaitTimeSeconds <= 0) {
            return 2 * PROCESSORS;
        }
        double size = PROCESSORS * uCPU * (taskWaitTimeSeconds / taskTimeSeconds);
        return getIntFromDouble(size);
    }

    private int getQueueSize(int corePoolSize){
        double size = corePoolSize / taskTimeSeconds * taskResponseTimeSeconds;
        return getIntFromDouble(size);
    }

    private int getIntFromDouble(double size) {
        BigDecimal bsize = new BigDecimal(size).setScale(0, BigDecimal.ROUND_HALF_UP);
        return Integer.parseInt(bsize.toString());
    }



}

8...封装cpu密集型的Task

package com.test.domi.task;

import java.util.concurrent.Callable;

/**
 * 如何设计一个任务:
 * 1- 是否具备可取消操作
 * 2- 是否能响应中断,中断是实现取消的最佳实践
 * 3- call方法中要允许线程的退出
 */
public class TestFutureTask implements Callable {
    private static final String TASK_NAME = "TEST_TASK";
    //取消标志位
    private volatile boolean cancelled;
    private String param1;
    private String param2;

    public TestFutureTask(String param1,String param2) {
        this.param1 = param1;
        this.param2 = param2;
    }

    @Override
    public String call() throws Exception {
        Thread.sleep(2000);
        //TODO 设置允许线程退出的方法
        return param2 + param1;
    }

    public void cancel() {
        cancelled = true;
    }

    public String getParam() {
        return param1 + param2;
    }

}

9...封装IO密集型的阻塞,可相应中断的task:https://blog.csdn.net/xrr0520blog/article/details/8912139

10...测试

package com.test.domi;

import com.test.domi.executor.AbortTaskQueue;
import com.test.domi.task.TestFutureTask;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = TestModulServiceApplication.class)
public class TestDemo {

    @Autowired
    @Qualifier("executor")
    private ThreadPoolExecutor threadPoolExecutor;

//    private CountDownLatch  count = new CountDownLatch(1);

    @Test
    public void redisTest() throws Exception{

        List<Future<String> > list = new ArrayList<>(32);
        for (int i = 0; i < 60; i++) {
            Future<String> submit = threadPoolExecutor.submit(new TestFutureTask("nihao","Task"));
            try {
                String s = submit.get(3000, TimeUnit.MILLISECONDS);
                //点进去看该方法会抛出3种异常
                list.add(submit);
            } catch (TimeoutException e) {
                System.out.println("任务执行失败msg=" + e.getMessage());
                e.printStackTrace();
            }catch (ExecutionException e) {
               throw new Exception("");
            }catch (InterruptedException e) {
            }finally {
                submit.cancel(true);
            }
        }
//        count.countDown();
        for (Future<String> stringFuture : list) {
            try {
                System.out.println("result=" + stringFuture.get());
            } catch (Exception e) {
                e.printStackTrace();
                System.out.println("FUTURN返回失败");
            }
        }
        System.out.println("任务执行完毕queueSize = " + threadPoolExecutor.getQueue().size());
        threadPoolExecutor.shutdown();
        AbortTaskQueue abortTaskQueue = AbortTaskQueue.newInstance();
        int size = abortTaskQueue.size();
        System.out.println("任务执行完毕queueSize = ");

//        if (threadPoolExecutor != null) {
//            int activeCount = threadPoolExecutor.getActiveCount();
//            int corePoolSize = threadPoolExecutor.getCorePoolSize();
//            String s = threadPoolExecutor.toString();
//            long keepAliveTime = threadPoolExecutor.getKeepAliveTime(TimeUnit.MILLISECONDS);
//            int poolSize = threadPoolExecutor.getPoolSize();
//            int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
//            BlockingQueue<Runnable> queue = threadPoolExecutor.getQueue();
//            RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor.getRejectedExecutionHandler();
//        }
    }
}

 

以上是关于线程池的使用的主要内容,如果未能解决你的问题,请参考以下文章

Java——线程池

Java线程池详解

Java线程池详解

Java 线程池详解

线程池的使用场景和代码实现!

Java多线程-线程池的使用与线程总结(狂神说含代码)