并行编程(Future)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并行编程(Future)相关的知识,希望对你有一定的参考价值。

参考技术A 说到并行,常见的几种模式 “回调驱动(多线程环境下)”、“消息/事件驱动(Actor模型中)。
回调是最常见的异步并发模式,它有即时性高、接口设计简单等有点。但相对于Future,其缺点也非常明显。
首先,多线程环境下的回调一般是在触发回调的模块线程中执行的,这就意味着编写回调方法时通常必须考虑线程互斥问题
其次,回调方式接口的提供者在本模块的线程中执行用户应用的回调也是相对不安全的,因为你无法确定它会花费多长时间或出现什么异常,从而可能间接导致本模块的即时性和可靠性受影响;
再者,使用回调接口不利于顺序流程的开发,因为回调方法的执行是孤立的,要与正常流程汇合是比较困难的。因此回调接口适合于在回调中只需要完成简单任务,并且不必与其它流程汇合的场景。

Future对象本身可以看作是一个显式的引用,一个对异步处理结果的引用,可以通过调用Future.isDone()判断引用的对象是否就绪,并采取不同的处理;而后一种情况则只需调用get()或
get(long timeout, TimeUnit unit)通过同步阻塞方式等待对象就绪。实际运行期是阻塞还是立即返回就取决于get()的调用时机和对象就绪的先后了.

除了上面提到的基础形态之外,Future还有丰富的衍生变化,这里就列举几个常见的。

与一般的Future不同,Lazy Future在创建之初不会主动开始准备引用的对象,而是等到请求对象时才开始相应的工作。因此,Lazy Future本身并不是为了实现并发,而是以节约不必要的运算资源为出发点,效果上与Lambda/Closure类似。例如设计某些API时,你可能需要返回一组信息,而其中某些信息的计算可能会耗费可观的资源。但调用者不一定都关心所有的这些信息,因此将那些需要耗费较多资源的对象以Lazy Future的形式提供,可以在调用者不需要用到特定的信息时节省资源。

另外Lazy Future也可以用于避免过早的获取或锁定资源而产生的不必要的互斥。

Promise可以看作是Future的一个特殊分支,常见的Future一般是由服务调用者直接触发异步处理流程,比如调用服务时立即触发处理或 Lazy Future的取值时触发处理。但Promise则用于显式表示那些异步流程并不直接由服务调用者触发的情景。例如Future接口的定时控制,其异步流程不是由调用者,而是由系统时钟触发,再比如淘宝的分布式订阅框架提供的Future式订阅接口,其等待数据的可用性不是由订阅者决定,而在于发布者何时发布或更新数据。因此,相对于标准的Future,Promise接口一般会多出一个set()或fulfill()接口。

常规的Future是一次性的,也就是说当你获得了异步的处理结果后,Future对象本身就失去意义了。但经过特殊设计的Future也可以实现复用,这对于可多次变更的数据显得非常有用。例如前面提到的淘宝分布式订阅框架所提供的Future式接口,它允许多次调用waitNext()方法(相当于Future.get()),每次调用时是否阻塞取决于在上次调用后是否又有数据发布,如果尚无更新,则阻塞直到下一次的数据发布。这样设计的好处是,接口的使用者可以在其任何合适的时机,或者直接简单的在独立的线程中通过一个无限循环响应订阅数据的变化,同时还可兼顾其它定时任务,甚至同时等待多个Future。简化的例子如下:

for (;;)
schedule = getNextScheduledTaskTime();
while(schedule > now())
try
data = subscription.waitNext(schedule - now());
processData(data);
catch(Exception e) ...

doScheduledTask();


【原文来自: http://www.cnblogs.com/uptownBoy/articles/1772483.html

Java异步编程Future应用

目录


1 Future接口介绍

此时有的人会说,对于任务并行需求,直接通过多线程实现不就可以了, 要注意,对于多线程的实现,java提供了三种方式:继承Thread类、实现Runnable接口和实现Callable接口。
但是业务代码在执行时会考虑执行顺序的问题,直接基于这些方式实现多线程会出现两个问题:
1)要想控制线程执行顺序,会通过join()等待线程结束,那这样的话又回归到了阻塞式调用的思路上,违背了并行的需求。 另外还可以通过wait()、notify()、notifyAll()结合状态变量实现,但实现起来过于复杂。
2)线程执行完之后,要想获取线程执行结果,还要用过共享变量或线程间通信等方式来获取,同样过于复杂。为了解决上述问题,Java5中推出了Future,其初衷就是用于构建复杂并行操作。内部方法在返回时,不是返回一个值,而是返回Future对象。其本质是在执行主业务的同时,异步的执行其他分业务,从而利用原本需要同步执行时的等待时间去执行其他的业务,当需要获取其结果时,再进行获取

Java官网对于Future的描述:

Future表示异步计算的结果。 提供了一些方法来检查计算是否完成,等待其完成以及检索计算结果。 只有在计算完成后才可以使用get方法检索结果,必要时将其阻塞,直到准备就绪为止。 取消通过cancel方法执行。 提供了其他方法来确定任务是正常完成还是被取消。 一旦计算完成,就不能取消计算。

在Future接口中有五个抽象方法:

cancel():取消任务, 取消成功返回true;入参mayInterruptIfRunning表示是否允许取消正在执行中的任务。


isCancelled():返回布尔值,代表是否取消成功。


isDone():返回布尔值,代表是否执行完毕。


get():返回Future对象,获取执行结果,如果任务没有完成会阻塞到任务完成再返回。

2 Future应用

Future的使用通常需要配合ExecutorService和Callable一起
使用,使用示例如下:

public class FutureAsyncDemo 
  static Random random = new Random();
  static ExecutorService executor =
Executors.newCachedThreadPool();
  //接收文章名称,获取并计算文章分数
  public static int getArticleScore(String
aname)
    Future<Integer> futureA =
executor.submit(new
CalculateArticleScoreA());
    Future<Integer> futureB =
executor.submit(new
CalculateArticleScoreA());
    Future<Integer> futureC =
executor.submit(new
CalculateArticleScoreA());
    doSomeThingElse();
    Integer a = null;
    try 
      a = futureA.get();
    catch (InterruptedException e) 
      futureA.cancel(true);
      e.printStackTrace();
    catch (ExecutionException e) 
      futureA.cancel(true);
       e.printStackTrace();
   
    Integer b = null;
    try 
      b = futureB.get();
    catch (InterruptedException e) 
      futureB.cancel(true);
      e.printStackTrace();
    catch (ExecutionException e) 
      futureB.cancel(true);
      e.printStackTrace();
   
    Integer c = null;
    try 
      c = futureC.get();
    catch (InterruptedException e) 
      futureC.cancel(true);
      e.printStackTrace();
    catch (ExecutionException e) 
      futureC.cancel(true);
      e.printStackTrace();
   
    executor.shutdown();
    return a+b+c;
 
  private static void doSomeThingElse() 
    System.out.println("exec other
things");
 
  public static void main(String[] args) 
  
 System.out.println(getArticleScore("demo"))
;
 

class CalculateArticleScoreA implements
Callable<Integer>
  @Override
  public Integer call() throws Exception 
    //业务代码
    Random random = new Random();
    TimeUnit.SECONDS.sleep(3);
  
 System.out.println(Thread.currentThread().g
etName());
    return random.nextInt(100);
 

执行结果

exec other things
pool-1-thread-1
pool-1-thread-3
pool-1-thread-2
159

上述方法改造了calculateArticleScore(),在其内部基于线程池调用重写了Callable接口中的call(),并在call()中对具体业
务完成编码,并且让其在执行时睡三秒钟。根据结果可以看到,先调用了计算文章分数方法,其内部开启了子线程去执行任务,并且子线程在执行时,并没有阻塞主线程的执行。当主线程需要结果时,在通过返回的Future来获取子任务中的返回值。

3 Future并行变串行问题解析

刚才已经基于Future演示了并行执行的效果,已经达到了期望,但是在使用的过程中,其实还有个坑需要说明。对于
Future的使用,如稍加不注意,就会让并行变为串行。
示例代码如下:

public class FutureAsyncDemo 
  static ExecutorService executor =
Executors.newCachedThreadPool();
  //接收文章名称,获取并计算文章分数
  public static int getArticleScore(String
aname)
    Future<Integer> futureA =
executor.submit(new
CalculateArticleScoreA());
    Future<Integer> futureB =
executor.submit(new
CalculateArticleScoreB());
     Future<Integer> futureC =
executor.submit(new
CalculateArticleScoreC());
    doSomeThingElse();
    Integer a = 0;
    try 
      a = futureA.get();
    catch (InterruptedException e) 
      futureA.cancel(true);
      e.printStackTrace();
    catch (ExecutionException e) 
      futureA.cancel(true);
      e.printStackTrace();
   
    Integer b = 0;
    try 
      b = futureB.get();
    catch (InterruptedException e) 
      futureB.cancel(true);
      e.printStackTrace();
    catch (ExecutionException e) 
      futureB.cancel(true);
      e.printStackTrace();
   
    Integer c = 0;
    try 
      c = futureC.get();
    catch (InterruptedException e) 
       futureC.cancel(true);
      e.printStackTrace();
    catch (ExecutionException e) 
      futureC.cancel(true);
      e.printStackTrace();
   
    executor.shutdown();
    return a+b+c;
 
  private static void doSomeThingElse() 
    System.out.println("exec other
things");
 
  public static void main(String[] args) 
  
 System.out.println(getArticleScore("demo"))
;
 

class CalculateArticleScoreA implements
Callable<Integer>
  @Override
  public Integer call() throws Exception 
    Random random = new Random();
    TimeUnit.SECONDS.sleep(10);
   
 System.out.println(Thread.currentThread().g
etName());
    return random.nextInt(100);
 

class CalculateArticleScoreB implements
Callable<Integer>
  @Override
  public Integer call() throws Exception 
    Random random = new Random();
    TimeUnit.SECONDS.sleep(20);
  
 System.out.println(Thread.currentThread().g
etName());
    return random.nextInt(100);
 

class CalculateArticleScoreC implements
Callable<Integer>
  @Override
  public Integer call() throws Exception 
    Random random = new Random();
    TimeUnit.SECONDS.sleep(30);
  
 System.out.println(Thread.currentThread().g
etName());
    return random.nextInt(100);
 
 

上述代码加计算得分方法复制出来两份,各自休眠10秒、20秒、30秒。当方法返回Future之后,调用get()进行值获取时,发现每次调用时都需要进行等待。这样可以发现,之前的并行现在变成了串行了!!!! 这个问题为什么会产生呢?需要看一下Future中对于get()的介绍

根据源码可知,当调用get()时,其会等待对应方法执行完毕后,才会返回结果,否则会一直等待。因为这个设定,所以上述代码则出现并行变串行的效果。
对于这个问题的解决,可以调用get()的重载,get(longtimeout, TimeUnit unit)。设置等待的时长,如果超时则抛出TimeoutException。

使用示例如下:

public class FutureAsyncDemo 
  static Random random = new Random();
  static ExecutorService executor =
Executors.newCachedThreadPool();
  //接收文章名称,获取并计算文章分数
   public static int
getArticleScore(String aname)
    Future<Integer> futureA =
executor.submit(new
CalculateArticleScoreA());
    Future<Integer> futureB =
executor.submit(new
CalculateArticleScoreB());
    Future<Integer> futureC =
executor.submit(new
CalculateArticleScoreC());
    doSomeThingElse();
    Integer a = 0;
    try 
      a = futureA.get();
    catch (InterruptedException e) 
      futureA.cancel(true);
      e.printStackTrace();
    catch (ExecutionException e) 
      futureA.cancel(true);
      e.printStackTrace();
   
    Integer b = 0;
    try 
       b = futureB.get(3L,
TimeUnit.SECONDS);
    catch (TimeoutException e) 
      e.printStackTrace();
   
    catch (InterruptedException e) 
      futureB.cancel(true);
      e.printStackTrace();
    catch (ExecutionException e) 
      futureB.cancel(true);
      e.printStackTrace();
   
    Integer c = 0;
    try 
      c = futureC.get();
    catch (InterruptedException e) 
      futureC.cancel(true);
      e.printStackTrace();
    catch (ExecutionException e) 
      futureC.cancel(true);
      e.printStackTrace();
   
    executor.shutdown();
    return a+b+c;
 
  private static void doSomeThingElse() 
     System.out.println("exec other
things");
 
  public static void main(String[] args)

  
 System.out.println(getArticleScore("demo")
);
 

class CalculateArticleScoreA implements
Callable<Integer>
  @Override
  public Integer call() throws Exception

    Random random = new Random();
    TimeUnit.SECONDS.sleep(10);
  
 System.out.println(Thread.currentThread().
getName());
    return random.nextInt(100);
 

class CalculateArticleScoreB implements
Callable<Integer>
  @Override
  public Integer call() throws Exception

    Random random = new Random();
    TimeUnit.SECONDS.sleep(20);
  
 System.out.println(Thread.currentThread().
getName());
    return random.nextInt(100);
 

class CalculateArticleScoreC implements
Callable<Integer>
  @Override
  public Integer call() throws Exception

    Random random = new Random();
    TimeUnit.SECONDS.sleep(30);
  
 System.out.println(Thread.currentThread().
getName());
    return random.nextInt(100);
 

在上述方法中,对于B的get()设置了超时时间三秒钟,如果当调用其获取返回值时,如果超过三秒仍然没有返回结果,则抛出超时异常,接着方法会再次向下运行。

对于Future来说,它能够支持任务并发执行,对于任务结果的获取顺序是按照提交的顺序获取,在使用的过程中建议通过CPU高速轮询的方式获取任务结果,但这种方式比较耗费资源。不建议使用

以上是关于并行编程(Future)的主要内容,如果未能解决你的问题,请参考以下文章

Java异步编程Future应用

java~并行计算~Future和Callable实现大任务的并行处理

使用 c++ async 进行并行编程

并发编程基础

并行设计模式-- Master-Worker模式

异步编程利器:CompletableFuture详解