Java 8 并行流中的自定义线程池

Posted

技术标签:

【中文标题】Java 8 并行流中的自定义线程池【英文标题】:Custom thread pool in Java 8 parallel stream 【发布时间】:2014-02-05 10:30:59 【问题描述】:

是否可以为 Java 8 parallel stream 指定自定义线程池?我在任何地方都找不到它。

假设我有一个服务器应用程序,我想使用并行流。但是这个应用程序很大而且是多线程的,所以我想把它分开。我不希望在另一个模块的 applicationblock 任务的一个模块中运行缓慢的任务。

如果我不能为不同的模块使用不同的线程池,这意味着我不能在大多数现实世界的情况下安全地使用并行流。

试试下面的例子。有一些 CPU 密集型任务在单独的线程中执行。 这些任务利用并行流。第一个任务被破坏了,所以每一步需要 1 秒(通过线程睡眠模拟)。问题是其他线程卡住并等待中断的任务完成。这是一个人为的例子,但想象一个 servlet 应用程序和某人向共享分叉连接池提交一个长时间运行的任务。

public class ParallelTest 
    public static void main(String[] args) throws InterruptedException 
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    

    private static void runTask(int delay) 
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    

    public static boolean isPrime(long n) 
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    

【问题讨论】:

自定义线程池是什么意思?有一个通用的 ForkJoinPool,但您始终可以创建自己的 ForkJoinPool 并向其提交请求。 提示:Java Champion Heinz Kabutz 检查了同样的问题,但影响更严重:公共分叉连接池的线程死锁。见javaspecialists.eu/archive/Issue223.html 【参考方案1】:

并行流使用默认的ForkJoinPool.commonPool by default has one less threads as you have processors,由Runtime.getRuntime().availableProcessors() 返回(这意味着并行流为调用线程留出一个处理器)。

对于需要单独或自定义池的应用程序,可以使用给定的目标并行度级别构建 ForkJoinPool;默认情况下,等于可用处理器的数量。

这也意味着,如果您有嵌套的并行流或多个并行流同时启动,它们都将共享同一个池。优点:您永远不会使用超过默认值(可用处理器的数量)。缺点:您可能不会将“所有处理器”分配给您启动的每个并行流(如果您碰巧有多个)。 (显然你可以使用ManagedBlocker 来规避它。)

要更改并行流的执行方式,您可以

将并行流执行提交到您自己的 ForkJoinPool:yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get(); 或 您可以使用系统属性更改公共池的大小:System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20") 用于 20 个线程的目标并行度。

后者在我的机器上有 8 个处理器的示例。如果我运行以下程序:

long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> 
    try  Thread.sleep(100);  catch (Exception ignore) 
    System.out.print((System.currentTimeMillis() - start) + " ");
);

输出是:

215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416

因此您可以看到并行流一次处理 8 个项目,即它使用 8 个线程。但是,如果我取消注释注释行,则输出为:

215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216

这一次,并行流使用了 20 个线程,并且流中的所有 20 个元素都被并发处理了。

【讨论】:

commonPool 实际上比availableProcessors 少一个,导致总并行度等于availableProcessors,因为调用线程计为一个。 提交返回ForkJoinTask。模仿parallel()需要get()stream.parallel().forEach(soSomething)).get(); 我不相信ForkJoinPool.submit(() -> stream.forEach(...)) 会使用给定的ForkJoinPool 运行我的流操作。我希望整个 Stream-Action 在 ForJoinPool 中作为 ONE 操作执行,但在内部仍使用默认/常见的 ForkJoinPool。您在哪里看到 ForkJoinPool.submit() 会按照您说的做? 我现在看到***.com/a/34930831/1520422 很好地表明它确实像宣布的那样工作。但是我仍然不明白它是如何工作的。但我对“它有效”很好。谢谢! 我建议恢复 Tod Casasent 的编辑,因为 JDK-8190974 中没有任何内容表明 System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", …) 将不再工作,并且从 JDK 18 开始,它仍然可以按预期工作。【参考方案2】:

如果您不想依赖实施技巧,总有一种方法可以通过实施自定义收集器来实现相同的目标,这些收集器将结合 mapcollect 语义...而且您不会被限制在 ForkJoinPool :

list.stream()
  .collect(parallel(i -> process(i), executor, 4))
  .join()

幸运的是,它已经在这里完成并且可以在 Maven Central 上找到: http://github.com/pivovarit/parallel-collectors

免责声明:我写了它并对此负责。

【讨论】:

【参考方案3】:

实际上有一个技巧可以在特定的 fork-join 池中执行并行操作。如果您将它作为一个任务在一个 fork-join 池中执行,它会留在那里并且不使用公共池。

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try 
    forkJoinPool = new ForkJoinPool(parallelism);
    final List<Integer> primes = forkJoinPool.submit(() ->
        // Parallel task here, for example
        IntStream.range(1, 1_000_000).parallel()
                .filter(PrimesPrint::isPrime)
                .boxed().collect(Collectors.toList())
    ).get();
    System.out.println(primes);
 catch (InterruptedException | ExecutionException e) 
    throw new RuntimeException(e);
 finally 
    if (forkJoinPool != null) 
        forkJoinPool.shutdown();
    

技巧基于ForkJoinTask.fork,它指定:“安排在当前任务正在运行的池中异步执行此任务,如果适用,或者使用ForkJoinPool.commonPool(),如果不是inForkJoinPool()

【讨论】:

这里描述了解决方案的详细信息blog.krecan.net/2014/03/18/… 但它是否还指定流使用ForkJoinPool 或者这是一个实现细节?文档的链接会很好。 @Lukas 感谢您的 sn-p。我要补充一点,当不再需要 ForkJoinPool 实例以避免线程泄漏时,它应该是 shutdown()。 (example) 请注意,Java 8 中存在一个错误,即即使任务在自定义池实例上运行,它们仍然与共享池耦合:计算的大小仍然与公共池成比例,并且不是自定义池。在 Java 10 中已修复:JDK-8190974 @terran 这个问题也已针对 Java 8 bugs.openjdk.java.net/browse/JDK-8224620修复了【参考方案4】:

这是我如何以编程方式设置上面提到的最大线程数标志,并截取代码以验证参数是否得到尊重

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "2");
Set<String> threadNames = Stream.iterate(0, n -> n + 1)
  .parallel()
  .limit(100000)
  .map(i -> Thread.currentThread().getName())
  .collect(Collectors.toSet());
System.out.println(threadNames);

// Output -> [ForkJoinPool.commonPool-worker-1, Test worker, ForkJoinPool.commonPool-worker-3]

【讨论】:

【参考方案5】:

您可以尝试实现这个 ForkJoinWorkerThreadFactory 并将其注入 Fork-Join 类。

public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode) 
        this(checkParallelism(parallelism),
             checkFactory(factory),
             handler,
             asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
             "ForkJoinPool-" + nextPoolId() + "-worker-");
        checkPermission();
    

你可以使用这个 Fork-Join pool 的构造函数来做到这一点。

注意事项:-- 1.如果你使用这个,考虑到根据你新线程的实现,JVM的调度会受到影响,这通常将fork-join线程调度到不同的核心(视为计算线程)。 2. fork-join 对线程的任务调度不会受到影响。 3. 还没有真正弄清楚并行流是如何挑选线程的 来自 fork-join(找不到合适的文档),所以试试 使用不同的线程命名工厂,以确保如果线程 正在从您的 customThreadFactory 中选择并行流 提供。 4. commonThreadPool 不会使用这个customThreadFactory。

【讨论】:

您能否提供一个有用的示例来演示如何使用您指定的内容?【参考方案6】:

原来的解决方案(设置 ForkJoinPool 通用并行属性)不再有效。查看原始答案中的链接,破坏此问题的更新已被移植回 Java 8。如链接线程中所述,不能保证此解决方案永远有效。基于此,解决方案是在接受的答案中讨论的带有 .get 解决方案的 forkjoinpool.submit。我认为 backport 也解决了此解决方案的不可靠性问题。

ForkJoinPool fjpool = new ForkJoinPool(10);
System.out.println("stream.parallel");
IntStream range = IntStream.range(0, 20);
fjpool.submit(() -> range.parallel()
        .forEach((int theInt) ->
        
            try  Thread.sleep(100);  catch (Exception ignore) 
            System.out.println(Thread.currentThread().getName() + " -- " + theInt);
        )).get();
System.out.println("list.parallelStream");
int [] array = IntStream.range(0, 20).toArray();
List<Integer> list = new ArrayList<>();
for (int theInt: array)

    list.add(theInt);

fjpool.submit(() -> list.parallelStream()
        .forEach((theInt) ->
        
            try  Thread.sleep(100);  catch (Exception ignore) 
            System.out.println(Thread.currentThread().getName() + " -- " + theInt);
        )).get();

【讨论】:

当我在调试模式下执行ForkJoinPool.commonPool().getParallelism() 时,我看不到并行度的变化。 谢谢。我做了一些测试/研究并更新了答案。看起来更新改变了它,因为它适用于旧版本。 为什么我总是收到这个:unreported exception InterruptedException; must be caught or declared to be thrown,即使循环中有所有catch 异常。 洛基,我没有看到任何错误。了解 Java 版本和确切的行会有所帮助。 “InterruptedException”表明您的版本中未正确关闭睡眠的 try/catch。 当我执行System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "10"); System.out.println(ForkJoinPool.commonPool().getParallelism()); 时,它会在从JDK 8 到JDK 18 的所有版本上始终打印10。我不知道您为什么声称这个常见的并行属性不起作用;您添加到另一个答案的链接甚至没有远程说明这个属性,它的补丁根本没有触及这个功能。【参考方案7】:

我们可以使用以下属性更改默认并行度:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=16

可以设置使用更多的并行度。

【讨论】:

虽然是全局设置,但是可以增加parallelStream 与上述相同的人,这对我在 openjdk "11.0.6" 上不起作用 @abbas 适用于我尝试过的所有版本,从 Java 8 到 Java 18。【参考方案8】:

去获取AbacusUtil。可以为并行流指定线程号。下面是示例代码:

LongStream.range(4, 1_000_000).parallel(threadNum)...

披露:我是AbacusUtil的开发者。

【讨论】:

【参考方案9】:

如果您不需要自定义的 ThreadPool 但您想限制并发任务的数量,您可以使用:

List<Path> paths = List.of("/path/file1.csv", "/path/file2.csv", "/path/file3.csv").stream().map(e -> Paths.get(e)).collect(toList());
List<List<Path>> partitions = Lists.partition(paths, 4); // Guava method

partitions.forEach(group -> group.parallelStream().forEach(csvFilePath -> 
       // do your processing   
));

(重复的问题要求这个被锁定,所以请在这里忍耐)

【讨论】:

【参考方案10】:

注意: JDK 10 中似乎实现了一个修复程序,可确保自定义线程池使用预期的线程数。

自定义 ForkJoinPool 中的并行流执行应遵循并行性 https://bugs.openjdk.java.net/browse/JDK-8190974

【讨论】:

【参考方案11】:

我尝试了 custom ForkJoinPool 如下调整池大小:

private static Set<String> ThreadNameSet = new HashSet<>();
private static Callable<Long> getSum() 
    List<Long> aList = LongStream.rangeClosed(0, 10_000_000).boxed().collect(Collectors.toList());
    return () -> aList.parallelStream()
            .peek((i) -> 
                String threadName = Thread.currentThread().getName();
                ThreadNameSet.add(threadName);
            )
            .reduce(0L, Long::sum);


private static void testForkJoinPool() 
    final int parallelism = 10;

    ForkJoinPool forkJoinPool = null;
    Long result = 0L;
    try 
        forkJoinPool = new ForkJoinPool(parallelism);
        result = forkJoinPool.submit(getSum()).get(); //this makes it an overall blocking call

     catch (InterruptedException | ExecutionException e) 
        e.printStackTrace();
     finally 
        if (forkJoinPool != null) 
            forkJoinPool.shutdown(); //always remember to shutdown the pool
        
    
    out.println(result);
    out.println(ThreadNameSet);

这里的输出表明池使用的线程数比默认的 4 多。

50000005000000
[ForkJoinPool-1-worker-8, ForkJoinPool-1-worker-9, ForkJoinPool-1-worker-6, ForkJoinPool-1-worker-11, ForkJoinPool-1-worker-10, ForkJoinPool-1-worker-1, ForkJoinPool-1-worker-15, ForkJoinPool-1-worker-13, ForkJoinPool-1-worker-4, ForkJoinPool-1-worker-2]

但实际上有一个怪异,当我尝试使用ThreadPoolExecutor 达到相同的结果时,如下所示:

BlockingDeque blockingDeque = new LinkedBlockingDeque(1000);
ThreadPoolExecutor fixedSizePool = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, blockingDeque, new MyThreadFactory("my-thread"));

但我失败了。

它只会在一个新线程中启动parallelStream,然后其他一切都一样,这再次证明parallelStream将使用 ForkJoinPool 启动它的子线程。

【讨论】:

不允许其他执行者的可能原因是什么? @omjego 这是一个很好的问题,也许您可​​以提出一个新问题并提供更多细节来阐述您的想法;) @omjego 因为它从来都不是预期的功能。 Stream 实现碰巧使用了 Fork/Join 任务,并且不认为这些任务如果从 Fork/Join 池的工作线程中调用,则具有拾取调用者池的特性。即使在今天,这个技巧也没有记录,也没有官方支持。这也是为什么第一个版本不尊重自定义池的并行性而不一致地使用公共池的原因。没有预见到使用不同的池。【参考方案12】:

如果您不介意使用第三方库,您可以使用cyclops-react 在同一管道中混合顺序流和并行流,并提供自定义 ForkJoinPools。例如

 ReactiveSeq.range(1, 1_000_000)
            .foldParallel(new ForkJoinPool(10),
                          s->s.filter(i->true)
                              .peek(i->System.out.println("Thread " + Thread.currentThread().getId()))
                              .max(Comparator.naturalOrder()));

或者如果我们希望在顺序流中继续处理

 ReactiveSeq.range(1, 1_000_000)
            .parallel(new ForkJoinPool(10),
                      s->s.filter(i->true)
                          .peek(i->System.out.println("Thread " + Thread.currentThread().getId())))
            .map(this::processSequentially)
            .forEach(System.out::println);

[披露我是 cyclops-react 的主要开发者]

【讨论】:

【参考方案13】:

直到现在,我都使用了这个问题的答案中描述的解决方案。现在,我为此创建了一个名为 Parallel Stream Support 的小库:

ForkJoinPool pool = new ForkJoinPool(NR_OF_THREADS);
ParallelIntStreamSupport.range(1, 1_000_000, pool)
    .filter(PrimesPrint::isPrime)
    .collect(toList())

但正如@PabloMatiasGomez 在 cmets 中指出的那样,并行流的拆分机制存在缺陷,这在很大程度上取决于公共池的大小。见Parallel stream from a HashSet doesn't run in parallel。

我使用此解决方案只是为了为不同类型的工作设置单独的池,但即使我不使用它,我也无法将公共池的大小设置为 1。

【讨论】:

【参考方案14】:

要测量实际使用的线程数,可以查看Thread.activeCount()

    Runnable r = () -> IntStream
            .range(-42, +42)
            .parallel()
            .map(i -> Thread.activeCount())
            .max()
            .ifPresent(System.out::println);

    ForkJoinPool.commonPool().submit(r).join();
    new ForkJoinPool(42).submit(r).join();

这可以在 4 核 CPU 上产生如下输出:

5 // common pool
23 // custom pool

没有.parallel() 它给出:

3 // common pool
4 // custom pool

【讨论】:

Thread.activeCount() 不会告诉您哪些线程正在处理您的流。而是映射到 Thread.currentThread().getName(),然后是 distinct()。然后你会意识到,并不是池中的每个线程都会被使用……给你的处理添加一个延迟,池中的所有线程都会被使用。【参考方案15】:

除了在您自己的 forkJoinPool 中触发并行计算的技巧之外,您还可以将该池传递给 CompletableFuture.supplyAsync 方法,如下所示:

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() ->
    //parallel task here, for example
    range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()), 
    forkJoinPool
);

【讨论】:

以上是关于Java 8 并行流中的自定义线程池的主要内容,如果未能解决你的问题,请参考以下文章

如何优雅的自定义 ThreadPoolExecutor 线程池

如何优雅的自定义 ThreadPoolExecutor 线程池

如何优雅的自定义 ThreadPoolExecutor 线程池

8天玩转并行开发——第七天 简要分析任务与线程池

Java线程池实现原理及其在美团业务中的实践

Java线程池实现原理及其在美团业务中的实践