将 threadpoolexecutor 传递给 CompletableFuture 的方法都有哪些?

Posted

技术标签:

【中文标题】将 threadpoolexecutor 传递给 CompletableFuture 的方法都有哪些?【英文标题】:What are the ways to pass threadpoolexecutor to CompletableFuture?将 threadpoolexecutor 传递给 CompletableFuture 的方法有哪些? 【发布时间】:2019-10-05 21:46:39 【问题描述】:

我最近一直在研究 Java CompletableFuture,发现我们应该始终使用自定义的线程池。有了它,我发现了两种将线程池传递给现有代码的方法。如下所示

这是我在配置文件中的线程池

@Override
@Bean(name = "commonThreadPool")
public Executor getAsyncExecutor() 
  return new ThreadPoolTaskExecutor();

1.在参数中传递现有线程池。

 @Autowired
 @Qualifier("commonThreadPool") 
 TaskExecutor existingThreadPool;       
 CompletableFuture.runAsync(() -> executeTask(),existingThreadPool);

2。像下面这样使用异步

@Async("commonThreadPool")
public void executeTask() 
// Execute Some Task

是否有任何第三种方法可以编写 CompletableFuture 处理程序或在可以传递自定义线程池的单个位置覆盖其现有行为。之后,无论我在哪里使用下面的代码,它都应该选择我现有的 ThreadPool 而不是 forkJoin 池。

 CompletableFuture.runAsync(() -> executeTask());

【问题讨论】:

@Async 有什么问题? 【参考方案1】:

我强烈建议不要这样做,但如果你真的愿意,你可以使用反射来更改可完成未来使用的线程池。

public static void main(String[] args) throws Exception 
    // Prints ForkJoinPool.commonPool-worker-1
    CompletableFuture<Void> c = CompletableFuture.runAsync(() -> System.out.println(Thread.currentThread().getName()));
    c.get();

    setFinalStatic(CompletableFuture.class.getDeclaredField("asyncPool"), Executors.newFixedThreadPool(10));

    // Prints pool-1-thread-1
    c = CompletableFuture.runAsync(() -> System.out.println(Thread.currentThread().getName()));
    c.get();


static void setFinalStatic(Field field, Object newValue) throws Exception 
    field.setAccessible(true);
    Field modifiersField = Field.class.getDeclaredField("modifiers");
    modifiersField.setAccessible(true);
    modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
    field.set(null, newValue);

setFinalStatic 取自https://***.com/a/3301720/1398418

【讨论】:

谢谢@Oleg,但是,我正在寻找任何现有的 Spring 实现的东西,或者 Spring 给了我们任何选择来做它? “Spring 实现的东西”是什么意思?春天有@Async。当你做CompletableFuture.runAsync(() -&gt; executeTask());时,你只是在调用一个java标准库类的方法,它与Spring无关。 我的意思是说任何 Spring 现有的已实现配置,我们可以在应用设置期间传递现有的 ThreadPool 一次,它将在每个 CompletableFuture 内部使用,而不是 CommonForkJoin。 那么就按照我在应用启动时的建议去做吧。 您也可以通过aop 实现您的目标。你需要aspectj,spring aop 是不够的。【参考方案2】:

没有标准方法可以替换所有CompletableFuture 实例的默认执行程序。但是从 Java 9 开始,您可以为子类定义默认执行程序。例如。与

public class MyCompletableFuture<T> extends CompletableFuture<T> 
    static final Executor EXEC = r -> 
        System.out.println("executing "+r);
        new Thread(r).start();
    ;

    @Override
    public Executor defaultExecutor() 
        return EXEC;
    

    @Override
    public <U> CompletableFuture<U> newIncompleteFuture() 
        return new MyCompletableFuture<>();
    

    public static CompletableFuture<Void> runAsync​(Runnable runnable) 
        Objects.requireNonNull(runnable);
        return supplyAsync(() -> 
            runnable.run();
            return null;
        );
    

    public static <U> CompletableFuture<U> supplyAsync​(Supplier<U> supplier) 
        return new MyCompletableFuture<U>().completeAsync(supplier);
    

您完成了为MyCompletableFuture 的所有链接阶段定义默认执行程序的所有必要步骤。 EXEC 中的执行器仅用作示例,使用时会生成打印输出,因此当您使用该示例类时,例如

MyCompletableFuture.supplyAsync(() -> "test")
    .thenApplyAsync(String::toUpperCase)
    .thenAcceptAsync(System.out::println);

它将打印

executing java.util.concurrent.CompletableFuture$AsyncSupply@65ab7765
executing java.util.concurrent.CompletableFuture$UniApply@119d7047
executing java.util.concurrent.CompletableFuture$UniAccept@404b9385
TEST

【讨论】:

以上是关于将 threadpoolexecutor 传递给 CompletableFuture 的方法都有哪些?的主要内容,如果未能解决你的问题,请参考以下文章

ThreadPoolExecutor线程池execute和submit的区别

线程池ThreadPoolExecutor源码分析

ThreadPoolExecutor源码中的适配器模式

彻底停止运行线程池ThreadPoolExecutor

将 InheritableThreadLocal 与 ThreadPoolExecutor - 或 - 不重用线程的 ThreadPoolExecutor 一起使用

删除 ThreadPoolExecutor 的所有排队任务