带有 .forEach 和 .add 的 Java ForkJoinPool

Posted

技术标签:

【中文标题】带有 .forEach 和 .add 的 Java ForkJoinPool【英文标题】:Java ForkJoinPool with .forEach and .add 【发布时间】:2021-03-07 03:54:50 【问题描述】:

我有一个TicketDTO 对象列表,其中每个TicketDTO 都需要通过一个函数将数据转换为TicketDataDTO。我在这里想要的是减少运行此代码所需的时间,因为当列表大小较大时,转换它需要很长时间,并且通过 GET 映射获取数据是不可接受的。但是,当我尝试实现 ForkJoinPoolparallelStream)(下面的代码)来完成它时,我的返回 List` 是空的。谁能告诉我我做错了什么?

@Override
public List<TicketDataDTO> getOtrsTickets(String value, String startDate, String endDate, String product, String user) 
  // TODO Implement threads
  List<TicketDTO> tickets = ticketDao.findOtrsTickets(value, startDate, endDate, product, user);
  Stream<TicketDTO> ticketsStream = tickets.parallelStream();
  List<TicketDataDTO> data = new ArrayList<TicketDataDTO>();
  
  ForkJoinPool forkJoinPool = new ForkJoinPool(6);
  
  forkJoinPool.submit(() -> 
    try 
      ticketsStream.forEach(ticket -> data.add(createTicketData(ticket)));
     catch (Exception e) 
      throw new RuntimeException(e);
    
  );
  
  forkJoinPool.shutdown();
  
  //ticketsStream.forEach(ticket -> data.add(createTicketData(ticket)));

  return data;

createTicketData 只是一个带有两个 for 循环和一个 switch 循环的函数,用于创建我需要的一些新列作为输出。

【问题讨论】:

您正在从多个线程修改单个 ArrayList 而不进行任何同步。即使您解决了当前的问题,这也会导致错误。我建议您阅读docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/… 并使用collect(Collectors.toList()) 换成使用无副作用的方法。此外,不支持在并行流中使用自定义 ForkJoinPools 的功能。最好显式调用ExecutorService.submitinvokeAll 来获取期货列表并等待它们。 【参考方案1】:

除了在 ForkJoinPool 上调用 shutdown() 之外,您还必须像这样等待它的终止

forkJoinPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);

如果您不等待终止,data 将在线程有机会将其结果添加到它之前返回。

看 How to wait for all threads to finish, using ExecutorService? 更多详情

【讨论】:

以上是关于带有 .forEach 和 .add 的 Java ForkJoinPool的主要内容,如果未能解决你的问题,请参考以下文章

初识java集合——迭代器

Java使用foreach遍历集和时不能add/remove的原因剖析

为什么阿里巴巴Java开发手册中强制要求不要在foreach循环里进行元素的remove和add操作?

Java 中foreach()循环

java foreach

带有索引的Java 8 forEach [重复]