Java多线程工具包java.util.concurrent---ExecutorService

Posted yvan1115

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java多线程工具包java.util.concurrent---ExecutorService相关的知识,希望对你有一定的参考价值。

什么是ExecutorService

java.util.concurrent.ExecutorService 接口表示一个异步执行机制,使我们能够在后台执行任务。因此一个 ExecutorService 很类似于一个线程池。实际上,存在于 java.util.concurrent 包里的 ExecutorService 实现就是一个线程池实现。

ExecutorService的实现

ExecutorService 接口的以下实现类:

  • ThreadPoolExecutor
  • ScheduledThreadPoolExecutor

工厂java.util.concurrent.Executors 类

Executors提供了多个工厂方法,下面来具体说明
首先看下ThreadPoolExecutor

int  corePoolSize  =    5;  
int  maxPoolSize   =   10;  
long keepAliveTime = 5000;  

ExecutorService threadPoolExecutor =  
        new ThreadPoolExecutor(  
                corePoolSize,  
                maxPoolSize,  
                keepAliveTime,  
                TimeUnit.MILLISECONDS,  
                new LinkedBlockingQueue<Runnable>()  
                );  

以下是jdk1.6中文API对参数解释

corePoolSize - 池中所保存的线程数,包括空闲线程。
maximumPoolSize - 池中允许的最大线程数。
keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
unit - keepAliveTime 参数的时间单位。
workQueue - 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。
threadFactory - 执行程序创建新线程时使用的工厂。

这里说明一下corePoolSize
核心线程会一直存活,即使没有任务需要执行

  • newSingleThreadExecutor()

public static ExecutorService newSingleThreadExecutor() 
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    

由参数可知,创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务
示例

package com.yvacn.executorServiceCallable;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
 * 
 * @author yvan
 *
 */
public class AppMainTestSingle 
    public static void main(String[] args) 
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.submit(new Runnable() 

            @Override
            public void run() 
                try 
                    TimeUnit.SECONDS.sleep(3);
                    System.out.println(Thread.currentThread().getName() + "执行" + this.toString());
                 catch (InterruptedException e) 
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                
            
        );
        executorService.submit(new Runnable() 

            @Override
            public void run() 
                try 
                    TimeUnit.SECONDS.sleep(1);
                    System.out.println(Thread.currentThread().getName() + "执行" + this.toString());
                 catch (InterruptedException e) 
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                
            
        );
        executorService.shutdown();
    

结果

pool-1-thread-1执行com.yvacn.executorServiceCallable.AppMainTestSingle$1@6695b54d
pool-1-thread-1执行com.yvacn.executorServiceCallable.AppMainTestSingle$2@685257ef

由结果也可以看出,线程池中有一个线程且是顺序执行(FIFO)

  • newFixedThreadPool

 public static ExecutorService newFixedThreadPool(int nThreads) 
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    

创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待

示例

package com.yvacn.executorServiceCallable;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
 * 
 * @author yvan
 *
 */
public class AppMainTestFixedPool 
    public static void main(String[] args) 
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.submit(new Runnable() 

            @Override
            public void run() 
                try 
                    TimeUnit.MILLISECONDS.sleep(500);
                    System.out.println("我是线程一");
                    System.out.println(Thread.currentThread().getName() + "执行" );
                 catch (InterruptedException e) 
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                
            
        );
        executorService.submit(new Runnable() 

            @Override
            public void run() 
                try 
                    TimeUnit.MILLISECONDS.sleep(200);
                    System.out.println("我是线程二");
                    System.out.println(Thread.currentThread().getName() + "执行" );
                 catch (InterruptedException e) 
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                
            
        );
        executorService.submit(new Runnable() 

            @Override
            public void run() 
                try 
                    TimeUnit.MILLISECONDS.sleep(100);
                    System.out.println("我是线程三");
                    System.out.println(Thread.currentThread().getName() + "执行");
                 catch (Exception e) 
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                
            
        );
        executorService.shutdown();
    


结果

我是线程二
pool-1-thread-2执行
我是线程三
pool-1-thread-2执行
我是线程一
pool-1-thread-1执行

由结果可以看出,虽然提交三个执行任务,但是因为固定的线程池为2,只创建了2个线程在执行任务
-

newCachedThreadPool


创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程

public static ExecutorService newCachedThreadPool() 
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    

示例

package com.yvacn.executorServiceCallable;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
 * 
 * @author yvan
 *
 */
public class AppMainTestFixedPool 
    public static void main(String[] args) 
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.submit(new Runnable() 

            @Override
            public void run() 
                try 
                    TimeUnit.MILLISECONDS.sleep(500);
                    System.out.println("我是线程一");
                    System.out.println(Thread.currentThread().getName() + "执行" );
                 catch (InterruptedException e) 
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                
            
        );
        executorService.submit(new Runnable() 

            @Override
            public void run() 
                try 
                    TimeUnit.MILLISECONDS.sleep(200);
                    System.out.println("我是线程二");
                    System.out.println(Thread.currentThread().getName() + "执行" );
                 catch (InterruptedException e) 
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                
            
        );
        executorService.submit(new Runnable() 

            @Override
            public void run() 
                try 
                    TimeUnit.MILLISECONDS.sleep(100);
                    System.out.println("我是线程三");
                    System.out.println(Thread.currentThread().getName() + "执行");
                 catch (Exception e) 
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                
            
        );
        executorService.shutdown();
    

结果

我是线程三
pool-1-thread-3执行
我是线程二
pool-1-thread-2执行
我是线程一
pool-1-thread-1执行

  • newScheduledThreadPool


    创建一个定长线程池,支持定时及周期性任务执行
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) 
        return new ScheduledThreadPoolExecutor(corePoolSize);
    

示例

package com.yvan.scheduledExecutorService;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
 * 
 * @author yvan
 *
 */
public class AppMain 

    private static int count = 0;


    private final static SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    public static void main(String[] args) throws Exception 
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
        executorService.scheduleAtFixedRate(new Runnable() 
            @Override
            public void run() 
                count++;
                System.out.println(format.format(new Date()) + "---执行" + count + "次");
            
        , // 执行线程
                1000, // 初次延迟多久执行
                5000, // 执行间隔时间
                TimeUnit.MILLISECONDS // 执行间隔时间类型,此处为毫秒
        );
        ScheduledFuture<Object> reuslt = executorService.schedule(new Callable<Object>() 

            @Override
            public Object call() throws Exception 
                return count;
            

        , 7000, TimeUnit.MILLISECONDS);
        System.out.println(reuslt.get().toString());
        executorService.shutdown();
    

结果

2017-11-23 15:33:28—执行1次
2017-11-23 15:33:33—执行2次
2

execute和submit

下面的示例中说明了他们的不同以及用法

package com.yvacn.executorServiceCallable;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
 * 
 * @author yvan
 *
 */
public class AppMain 
    public static void main(String[] args) throws Exception 
        ExecutorService eService = Executors.newCachedThreadPool();
        //execute Runnable
        eService.execute(new Runnable() 
            @Override
            public void run() 
                try 
                    System.out.println(Thread.currentThread().getName()+"开始执行");
                    TimeUnit.MILLISECONDS.sleep(1000);
                    System.out.println(Thread.currentThread().getName()+"   执行完毕");;
                 catch (InterruptedException e) 
                    e.printStackTrace();
                

            
        );
        //submit Callable
        //call函数返回的结果放入Future中
        Future<Object> future = eService.submit(new Callable<Object>() 
            @Override
            public Object call() throws Exception 
                System.out.println(Thread.currentThread().getName()+"开始执行");
                TimeUnit.MILLISECONDS.sleep(1000);
                return Thread.currentThread().getName()+"   执行完毕";
            
        );
        // submit Runnable
        //如果返回的结果为null说明线程执行完毕
        Future<?> future1 = eService.submit(new Runnable() 

            @Override
            public void run() 
                try 
                    System.out.println(Thread.currentThread().getName()+"开始执行");
                    TimeUnit.MILLISECONDS.sleep(5000);
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
            
        );
        System.out.println(future.get().toString());
        while (future1.get()==null) 
            System.out.println( "submit Runnable 执行完毕");
            break;
        
        eService.shutdown();

    

如何终止线程

  • shutdown()

ExecutorService
并不会立即关闭,但它将不再接受新的任务,而且一旦所有线程都完成了当前任务的时候,ExecutorService 将会关闭。在
shutdown() 被调用之前所有提交给 ExecutorService 的任务都被执行。

  • shutdownNow()

如果你想要立即关闭 ExecutorService,你可以调用 shutdownNow() 方法。这样会立即尝试停止所有执行中的任务,并忽略掉那些已提交但尚未开始处理的任务。无法担保执行任务的正确执行。可能它们被停止了,也可能已经执行结束。

以上是关于Java多线程工具包java.util.concurrent---ExecutorService的主要内容,如果未能解决你的问题,请参考以下文章

Java多线程工具包java.util.concurrent---CyclicBarrier

Java多线程工具包java.util.concurrent---ExecutorService

Java多线程工具包java.util.concurrent---ReadWriteLock

Java多线程工具包java.util.concurrent---Lock

Java多线程_同步工具CyclicBarrier

Java多线程(线程池原子性并发工具类)