异步多线程----执行器(Executor)
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了异步多线程----执行器(Executor)相关的知识,希望对你有一定的参考价值。
参考技术A 如果 程序中创建了大量的生命期很短的线程 ,应该 使用线程池(thread pool) 。一个线程池中包含许多准备运行的空线程。将Runnable对象交给线程池,就会有一个线程调用run方法。当run方法退出时,线程不会死亡,而是在池中准备为下一个请求提供服务。
另一个使用线程池的理由是减少并发线程的数目 。 创建大量线程会大大降低性能甚至使虚拟机崩溃 。如果有一个会创建许多线程的算法, 应该使用一个线程数“固定的”线程池 以限制并发线程的总数。
执行器(Executor)类有许多静态工厂方法用来构建线程池。
上面3个方法返回实现了ExecutorService接口 的 ThreadPoolExecutor类的对象 。
可以使用下面的方法 将一个Runnable对象或Callable对象提交给ExecutorService :
该线程池会在方便的时候尽早执行提交的任务。 调用submit时 , 会得到一个Future对象 ,可用来查询该任务的状态。
第一个submit方法返回一个Future<?>。可以使用这样一个对象来调用isDone、cancel或isCancelled。但是,get方法在完成的时候只是简单地返回null。
第二版本的Submit也提交一个Runnable,并且Future的get方法在完成的时候返回指定的result对象。
第三个版本的Submit提交一个Callable,并且返回的Future对象将在计算结构准备好的时候得到它。
当用完一个线程池的时候,调用shutdown 。该方法 启动该池的关闭序列 。被关闭的执行器 不再接受新的任务 。当 所有任务都完成以后,线程池中的线程死亡 。另一种方法是调用 shutdownNow 。该池 取消尚未开始的所有任务并试图中断正在运行的线程 。
总结在使用线程池时应该做的事:
继续以一个计算匹配的文件数目程序为例:
ScheduledExecutorService接口 具有 为预定执行 (Scheduled Execution)或 重复执行任务而设计 的方法。它是一种允许使用线程池机制的java.util.Timer的泛化。Executors类的 newScheduledThreadPool 和 newSingleThreadScheduledExecutor方法 将返回实现了ScheduledExecutorService接口的对象。
可以预定Runnable或Callable在初始的延迟之后只运行一次。也可以预定一个Runnable对象周期性地运行。详见API。
Java-多线程框架Executor(下)
概述
在Java中,使用线程来异步执行任务。Java线程的创建与销毁需要一定的开销,如果我们为每一个任务创建一个新线程来执行,这些线程的创建与销毁将消耗大量的计算资源。同时,为每一个任务创建一个新线程来执行,这种策略可能会使处于高负荷状态的应用最终崩溃。
Java线程既是工作单元,也是执行单元。从JDK1.5开始,把工作单元与执行机制分离开来。工作单元包括Runnable 和 Callable,而执行机制由Executor框架提供。
Java从1.5版本开始,为简化多线程并发编程,引入全新的并发编程包:java.util.concurrent及其并发编程框架(Executor框架)
Executor框架是指java 5中引入的一系列并发库中与executor相关的一些功能类,其中包括线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。
类关系图如下:
在Executor框架中,使用执行器(Exectuor)来管理Thread对象,从而简化了并发编程。
Executor框架简介
Executor框架的两级调度模型
在HotSpot VM的线程模型中,Java线程被一对一映射为本地操作系统线程。
Java线程启动时会创建一个本地操作系统线程;当Java线程终止时,这个操作系统线程也会被回收。操作系统会调用所有线程并将他们分配给可用的CPU。
可以将此种模式分为两层
-
在上层,Java多线程程序通常把应用程序分解为若干任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程
-
在底层,操作系统内核将这些线程映射到硬件处理器上。
两级调度模型的示意图如下:
从图中可以看出,该框架用来控制应用程序的上层调度,下层调度由操作系统内核控制,不受应用程序控制.
Executor框架成员
任务
被执行任务需要实现的接口:Runnable接口和Callable接口
执行任务
任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。
Executor框架有两个关键类实现了ExecutorService接口:ThreadPoolExecutor 和 ScheduledThreadPoolExector.
异步计算的结果
Future和实现Future接口的FutureTask类。
Executor框架的类与接口
- Executor是一个接口,Executor框架的基础,它将任务的提交与任务的执行分离。
- Executors 线程池工厂类
- AbstractExecutorService 执行框架抽象类。
- ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。
- ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor 比 Timer 更灵活,功能更强大。
- Future接口和它的实现FutureTask类,代表异步计算的结果。
- Runnable和Callable接口的实现类,都可以被ThreadPoolExecutor 或 ScheduledThreadPoolExecutor执行.
使用Executor框架
1、主线程首先要创建实现 Runnable接口或者Callable接口的任务对象。Executors可以把一个Runnable对象封装为一个Callable对象,如下
Executors.callable(Runnale task);
或者
Executors.callable(Runnable task, Object result);
2、然后把Runnable对象直接交给ExecutorService执行
ExecutorService.execute(Runnable command);
或者也可以把Runnable对象或Callable对象提交给ExecutorService执行。 如果执行ExecutorService.submit(…),ExecutorService将返回一个实现Future接口的对象(到目前为止的JDK中,返回的是FutureTask对象)。由于FutureTask实现了Runnable接口,我们也可以创建FutureTask类,然后直接交给ExecutorService执行。
ExecutorService.submit(Runnable task);
3、最后,主线程可以执行FutureTask.get()方法来等待任务执行完成。主线程也可以执行FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。
Executors 工厂方法
JDK内部提供了五种最常见的线程池。由Executors类的五个静态工厂方法创建
-
newFixedThreadPool
-
newSingleThreadExecutor
-
newCachedThreadPool
-
newSingleThreadScheduledExecutor
-
newScheduledThreadPool
newFixedThreadPool 固定大小的线程池
我们来看下源码及注释
只有一个入参nThreads的静态方法
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* <tt>nThreads</tt> threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
两个入参nThreads和threadFactory的静态方法
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue, using the provided
* ThreadFactory to create new threads when needed. At any point,
* at most <tt>nThreads</tt> threads will be active processing
* tasks. If additional tasks are submitted when all threads are
* active, they will wait in the queue until a thread is
* available. If any thread terminates due to a failure during
* execution prior to shutdown, a new one will take its place if
* needed to execute subsequent tasks. The threads in the pool will
* exist until it is explicitly {@link ExecutorService#shutdown
* shutdown}.
*
* @param nThreads the number of threads in the pool
* @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
该方法返回一个包含指定数目线程的线程池,如果任务数量多于线程数目,那么没有没有执行的任务必须等待,直到有任务完成为止
newSingleThreadExecutor 单线程的线程池
/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue. (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* <tt>newFixedThreadPool(1)</tt> the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
*
* @return the newly created single-threaded Executor
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue, and uses the provided ThreadFactory to
* create a new thread when needed. Unlike the otherwise
* equivalent <tt>newFixedThreadPool(1, threadFactory)</tt> the
* returned executor is guaranteed not to be reconfigurable to use
* additional threads.
*
* @param threadFactory the factory to use when creating new
* threads
*
* @return the newly created single-threaded Executor
* @throws NullPointerException if threadFactory is null
*/
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。
返回单线程的Executor,将多个任务交给此Exector时,这个线程处理完一个任务后接着处理下一个任务,若该线程出现异常,将会有一个新的线程来替代。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
说明:LinkedBlockingQueue会无限的添加需要执行的Runnable。
newCachedThreadPool 可缓存的线程池
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Calls to <tt>execute</tt> will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using {@link ThreadPoolExecutor} constructors.
*
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available, and uses the provided
* ThreadFactory to create new threads when needed.
* @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
*/
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。
newCachedThreadPool方法创建的线程池可以自动的扩展线程池的容量。核心线程数量为0。
SynchronousQueue是个特殊的队列。 SynchronousQueue队列的容量为0。当试图为SynchronousQueue添加Runnable,则执行会失败。只有当一边从SynchronousQueue取数据,一边向SynchronousQueue添加数据才可以成功。SynchronousQueue仅仅起到数据交换的作用,并不保存线程。但newCachedThreadPool()方法没有线程上限。Runable添加到SynchronousQueue会被立刻取出。
根据用户的任务数创建相应的线程来处理,该线程池不会对线程数目加以限制,完全依赖于JVM能创建线程的数量,可能引起内存不足
newSingleThreadScheduledExecutor
/**
* Creates a single-threaded executor that can schedule commands
* to run after a given delay, or to execute periodically.
* (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* <tt>newScheduledThreadPool(1)</tt> the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
* @return the newly created scheduled executor
*/
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
/**
* Creates a single-threaded executor that can schedule commands
* to run after a given delay, or to execute periodically. (Note
* however that if this single thread terminates due to a failure
* during execution prior to shutdown, a new one will take its
* place if needed to execute subsequent tasks.) Tasks are
* guaranteed to execute sequentially, and no more than one task
* will be active at any given time. Unlike the otherwise
* equivalent <tt>newScheduledThreadPool(1, threadFactory)</tt>
* the returned executor is guaranteed not to be reconfigurable to
* use additional threads.
* @param threadFactory the factory to use when creating new
* threads
* @return a newly created scheduled executor
* @throws NullPointerException if threadFactory is null
*/
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
该线程池支持定时以及周期性执行任务的需求。
newScheduledThreadPool 定时任务调度的线程池
/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle.
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle.
* @param threadFactory the factory to use when the executor
* creates a new thread.
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @throws NullPointerException if threadFactory is null
*/
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
实例
newFixedThreadPool示例
package com.xgj.master.java.executor.newFixedThreadPool;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.junit.Test;
/**
*
*
* @ClassName: NewFixedThreadPoolDemo
*
* @Description: NewFixedThreadPool
*
* @author: Mr.Yang
*
* @date: 2017年9月3日 下午9:09:25
*/
public class NewFixedThreadPoolDemo {
@Test
public void test() {
// 通过Executors的静态方法创建一个包含2个固定线程的线程池
ExecutorService fixPool = Executors.newFixedThreadPool(2);
// 第一种形式:通过Callable匿名内部类的形式 创建执行对象
Callable<String> callable = new Callable<String>() {
String result = "Bussiness deals successfully";
@Override
public String call() throws Exception {
System.out.println("Callable is working");
Thread.sleep(5 * 1000);
System.out.println("Callable some bussiness logic is here ");
return result;
}
};
// 第二种形式:通过Runna匿名内部类的形式 创建执行对象
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
System.out.println("Runnable is working");
Thread.sleep(5 * 1000);
System.out.println("Runnable some bussiness logic is here ");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
// 提交两个不同形式创建的任务 (因为创建了2个固定线程的线程池,所以两个都可以提交,如果只有一个线程的话,第二个必须等待
// 任务数量多于线程数目,那么没有没有执行的任务必须等待,直到有任务完成为止。
// Future<> gets parameterized based on how Callable is parameterized
// Since Runnable is not parameterized, you get a Future <?>
Future<String> callableFuture = fixPool.submit(callable);
Future<?> runnableFuture = fixPool.submit(runnable);
// check if tasks are done or not
if (callableFuture.isDone()) {
System.out.println("\\t\\tCallable is done !");
} else {
System.out.println("\\t\\tCallable is not done !");
}
if (runnableFuture.isDone()) {
System.out.println("\\t\\tRunnable is done !");
} else {
System.out.println("\\t\\tRunnable is not done !");
}
// callableFuture有返回值,获取返回值,runnable没有返回值
try {
String result = callableFuture.get();
System.out.println("CallableFuture的返回值为:" + result);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
// 根据需要决定是否需要关闭线程池
fixPool.shutdown();
System.out.println("fixPool shutdown");
}
}
运行结果:
Callable is not done !
Callable is working
Runnable is not done !
Runnable is working
Callable some bussiness logic is here
Runnable some bussiness logic is here
CallableFuture的返回值为:Bussiness deals successfully
fixPool shutdown
newSingleThreadExecutor示例
package com.xgj.master.java.executor.newSingleThreadExecutor;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
/**
*
*
* @ClassName: NewSingleThreadExecutorDemo
*
* @Description: newSingleThreadExecutor() returns ExecutorService with one
* thread pool size. ExecutorService uses single thread to execute
* the task. Other task will wait in queue. If thread is
* terminated or interrupted, a new thread will be created.
* ExecutorService guarantees to finish the task if not shutdown
*
* @author: Mr.Yang
*
* @date: 2017年9月3日 下午9:57:56
*/
public class NewSingleThreadExecutorDemo {
@Test
public void test() throws InterruptedException, ExecutionException {
// creates thread pool with one thread
ExecutorService newSingleThreadPool = Executors.newSingleThreadExecutor();
// callable thread starts to execute
Future<Integer> callableFuture = newSingleThreadPool
.submit(new NewSingleThreadExecutorDemo().new CallableThread());
// gets value of callable thread
int callval = callableFuture.get();
System.out.println("Callable:" + callval);
// checks for thread termination
boolean isTerminated = newSingleThreadPool.isTerminated();
System.out.println("newSingleThreadPool isTerminated :" + isTerminated);
// waits for termination for 10 seconds only
newSingleThreadPool.awaitTermination(10, TimeUnit.SECONDS);
newSingleThreadPool.shutdownNow();
System.out.println("newSingleThreadPool shutdownNow ");
}
/**
*
*
* @ClassName: CallableThread
*
* @Description: 内部类, Callable泛型类的入参假设为Integer
*
* @author: Mr.Yang
*
* @date: 2017年9月3日 下午11:05:25
*/
class CallableThread implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int cnt = 0;
for (; cnt < 5; cnt++) {
Thread.sleep(5 * 1000);
System.out.println("call:" + cnt);
}
return cnt;
}
}
}
运行结果
call:0
call:1
call:2
call:3
call:4
Callable:5
newSingleThreadPool isTerminated :false
newSingleThreadPool shutdownNow
newCachedThreadPool示例
package com.xgj.master.java.executor.newCachedThreadPool;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.junit.Test;
/**
*
*
* @ClassName: NewCachedThreadPoolDemo
*
* @Description: 1.The pool creates new threads if needed but reuses previously
* constructed threads if they are available.
*
* 2.Cached thread pool helps improve the performance of
* applications that make many short-lived asynchronous tasks.
*
* 3.Only if no threads are available for reuse will a new thread
* be created and added to the pool.
*
* 4.Threads that have not been used for more than sixty seconds
* are terminated and removed from the cache. Hence a pool which
* has not been used long enough will not consume any resources.
*
* @author: Mr.Yang
*
* @date: 2017年9月3日 下午11:15:05
*/
public class NewCachedThreadPoolDemo {
@Test
public void test() {
// Obtain a cached thread pool
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// Create an anonymous Callable<T> object.Override call()
Callable<String> callable = new Callable<String>() {
String message = "Callable is done !";
@Override
public String call() throws Exception {
for (int i = 0; i < 10; i++) {
System.out.println("Callable is doing something");
Thread.sleep(500); // make it sleep a little
}
return message;
}
};
// Create an anonymous instance of Runnable
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
System.out.println("\\tRunnable is doing something");
Thread.sleep(1000);
}
} catch (Exception e) {
}
}
};
// Time to run these
// Future<> gets parameterized based on how Callable is parameterized
// Since Runnable is not parameterized, you get a Future <?>
Future<String> callableFuture = cachedThreadPool.submit(callable);
Future<?> runnableFuture = cachedThreadPool.submit(runnable);
// check if tasks are done or not
if (callableFuture.isDone()) {
System.out.println("\\t\\tCallable is done !");
} else {
System.out.println("\\t\\tCallable is not done !");
}
if (runnableFuture.isDone()) {
System.out.println("\\t\\tRunnable is done !");
} else {
System.out.println("\\t\\tRunnable is not done !");
}
try {
// get() waits for the task to finish and then gets the result
String returnedValue = callableFuture.get();
System.out.println(returnedValue);
} catch (InterruptedException e) {
// thrown if task was interrupted before completion
e.printStackTrace();
} catch (ExecutionException e) {
// thrown if the task threw an execption while executing
e.printStackTrace();
}
// shutdown the pool if needed.
cachedThreadPool.shutdown();
}
}
运行结果
Callable is not done !
Runnable is not done !
Callable is doing something
Runnable is doing something
Callable is doing something
Callable is doing something
Runnable is doing something
Callable is doing something
Runnable is doing something
Callable is doing something
Callable is doing something
Runnable is doing something
Callable is doing something
Callable is doing something
Runnable is doing something
Callable is doing something
Callable is doing something
Runnable is doing something
Callable is done !
newSingleThreadScheduledExecutor示例
package com.xgj.master.java.executor.newSingleThreadScheduledExecutor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
*
*
* @ClassName: NewSingleThreadScheduleExectorDemo
*
* @Description: scheduleSingleThreadPool
* 更多方法的使用: http://www.codejava.net/java-core/concurrency/java-concurrency-
* scheduling-tasks-to-execute-after-a-given-delay-or-periodically
*
* @author: Mr.Yang
*
* @date: 2017年9月3日 下午11:48:47
*/
public class NewSingleThreadScheduleExectorDemo {
private static String threadNamePrefix = "XiaoGongJiang";
public static void main(String[] args) {
// Get the scheduler
ScheduledExecutorService scheduleSingleThreadPool = Executors
.newSingleThreadScheduledExecutor(new ThreadFactory() {
public Thread newThread(Runnable r) {
return new Thread(r, "Thread-" + threadNamePrefix);
}
});
// Create an anonymous instance of Runnable
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
System.out.println("Begin");
for (int i = 0; i < 3; i++) {
System.out.println("\\tRunnable is doing something");
Thread.sleep(1000);
}
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
};
// Get a handle, starting now, with a 2 seconds delay,
// and run at fixed rate (5 seconds)
scheduleSingleThreadPool.scheduleAtFixedRate(runnable, 2, 5, TimeUnit.SECONDS);
}
}
运行结果:
Begin
Runnable is doing something
Runnable is doing something
Runnable is doing something
Begin
Runnable is doing something
Runnable is doing something
Runnable is doing something
Begin
........
等上个任务处理完成后,紧接着处理下一个,一直循环下去。
两个方法的区别:
-
scheduleAtFixedRate ,以固定的频率来执行某个任务。
-
scheduleWithFixedDealy, 相对固定的延迟后,执行某个任务。
newScheduledThreadPool示例
package com.xgj.master.java.executor.newScheduledThreadPool;
import java.text.DateFormat;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
*
*
* @ClassName: NewScheduledThreadPoolDemo
*
* @Description:三个常用方法 1.schedule(): This allows you to schedule a Callable or a
* Runnable for one-shot execution after a specified delay.
*
* 2.scheduleAtFixedRate(): This lets you schedule tasks
* that will first execute after a specified delay and then
* will execute again based on the period you specified. If
* you set the initial delay for five seconds and then
* subsequent period to five seconds then your task will
* first execute five seconds after the first submission and
* then will execute periodically every five seconds.
*
* 3.scheduleWithFixedDelay(): This lets you create tasks
* that will first be executed after the initial delay then
* subsequently with given delay between the termination of
* one execution and commencement of another execution. So
* if you create a task with initial delay of five seconds
* and the subsequent delay of five seconds, the task will
* be executed five seconds after the submission. Once the
* task finishes execution, the scheduler will wait for five
* seconds and then execute the task again.
*
* @author: Mr.Yang
*
* @date: 2017年9月4日 上午12:24:31
*/
public class NewScheduledThreadPoolDemo {
final static DateFormat fmt = DateFormat.getTimeInstance(DateFormat.LONG);
public static void main(String[] args) {
// Create a scheduled thread pool with 5 core threads
ScheduledThreadPoolExecutor sch = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(5);
// Create a task for one-shot execution using schedule()
Runnable oneShotTask = new Runnable() {
@Override
public void run() {
System.out.println("\\t oneShotTask Execution Time: " + fmt.format(new Date()));
}
};
// Create another task
Runnable delayTask = new Runnable() {
@Override
public void run() {
try {
System.out.println("\\t delayTask Execution Time: " + fmt.format(new Date()));
Thread.sleep(10 * 1000);
System.out.println("\\t delayTask End Time: " + fmt.format(new Date()));
} catch (Exception e) {
}
}
};
// And yet another
Runnable periodicTask = new Runnable() {
@Override
public void run() {
try {
System.out.println("\\t periodicTask Execution Time: " + fmt.format(new Date()));
Thread.sleep(10 * 1000);
System.out.println("\\t periodicTask End Time: " + fmt.format(new Date()));
} catch (Exception e) {
}
}
};
System.out.println("Submission Time: " + fmt.format(new Date()));
// ScheduledFuture<?> oneShotFuture = sch.schedule(oneShotTask, 5,
// TimeUnit.SECONDS);
// ScheduledFuture<?> delayFuture =
// sch.scheduleWithFixedDelay(delayTask, 5, 5, TimeUnit.SECONDS);
ScheduledFuture<?> periodicFuture = sch.scheduleAtFixedRate(periodicTask, 5, 5, TimeUnit.SECONDS);
}
}
Submission Time: 上午12时26分27秒
periodicTask Execution Time: 上午12时26分32秒
periodicTask End Time: 上午12时26分42秒
periodicTask Execution Time: 上午12时26分42秒
periodicTask End Time: 上午12时26分52秒
periodicTask Execution Time: 上午12时26分52秒
periodicTask End Time: 上午12时27分02秒
periodicTask Execution Time: 上午12时27分02秒
.........
.........
以上是关于异步多线程----执行器(Executor)的主要内容,如果未能解决你的问题,请参考以下文章