Java8 - 处理 Stream<Callable<...>> 的惯用方式,并行交付给非线程安全的消费者?

Posted

技术标签:

【中文标题】Java8 - 处理 Stream<Callable<...>> 的惯用方式,并行交付给非线程安全的消费者?【英文标题】:Java8 - idiomatic way to process a Stream<Callable<...>> in parallel delivering to a non-thread-safe consumer? 【发布时间】:2017-05-15 16:57:19 【问题描述】:

假设我有一个Stream&lt;Callable&lt;SomeClass&gt;&gt; stream;。该流正在访问超过一百万个无法放入内存的对象。

将其转换为Stream&lt;SomeClass&gt; 的惯用方法是什么,以确保Callable::call 在交付给非线程安全的使用者之前并行执行(可能通过调用.sequential().forEach() 或其他一些瓶颈机制)?

即并行处理流,但按顺序传递输出(随机顺序可以,只要它是单线程的)。

我知道我可以通过在原始流和消费者之间设置ExecutionServiceQueue 来做我想做的事。但这似乎是很多代码,有没有神奇的单行代码?

【问题讨论】:

对于.parallel().sequential(),最后一次调用决定了整个流操作的工作方式。你不能让一些操作是并行的,而一些不是。但坦率地说,我希望 .parallel().map(Callable::call) 能够正常工作并做正确的事情。 在这种情况下你应该只使用 forEachOrdered。 @LouisWasserman map(Callable::call) 不会编译失败,因为该方法被声明为抛出异常? 相关:forEach vs forEachOrdered in Java 8 Stream 在这种情况下,“顺序执行”不等于“单线程执行”。 forEachOrdered 将与并行流一起工作,同时最终仍会随后输出元素,但不是单线程意义上的 - 它确保元素上的操作之间的发生前关系仍然可以由不同的线程;讨论了here。 【参考方案1】:

您仍然可以使用ExecutorService 进行并行化。像这样:

ExecutorService service = Executors.newFixedThreadPool(4);
stream.map(c -> service.submit(c)).map(future -> 
    try                
        return future.get(); //retrieve callable result
     catch (InterruptedException | ExecutionException ex)         
        //Exception handling    
        throw new RuntimeException(ex);         
    
);

您可以进一步按顺序处理生成的Stream&lt;SomeClass&gt;

如果您直接在 Stream&lt;Future&lt;SomeClass&gt;&gt; 上使用 forEach/forEachOrdered,您可以在当前未来完成后直接处理生成的 SomeClass-object(与使用 invokeAll() 时不同,它会阻塞直到每个任务完成)。

如果您想按照可用的确切顺序处理可调用对象的结果,则必须使用CompletionService,由于必须调用@987654328,它不能与单个流操作链一起使用@ 提交调用后。

编辑:

在流中使用ExecutorService 不能像我上面展示的那样工作,因为每个Callable 都是通过future.get() 一个接一个地提交和请求的。

我发现了一个可能的副作用更大的解决方案,将Callables 划分为固定的并行块。

我使用TaskMapper 类作为映射函数来提交Callables 并将它们映射到块:

class TaskMapper implements Function<Callable<Integer>, List<Future<Integer>>>
    private final ExecutorService service;
    private final int chunkSize;
    private List<Future<Integer>> chunk = new ArrayList<>(); 

    TaskMapper(ExecutorService service, int chunkSize)
        this.service = service;
        this.chunkSize = chunkSize;
    

    @Override
    public List<Future<Integer>> apply(Callable<Integer> c) 
        chunk.add(service.submit(c));
        if(chunk.size() == chunkSize)
            List<Future<Integer>> fList = chunk;
            chunk = new ArrayList<>();              
            return fList;
        else
            return null;
        
    

    List<Future<Integer>> getChunk()
        return chunk;
    

流操作链如下所示:

ExecutorService service = Executors.newFixedThreadPool(4);
TaskMapper taskMapper = new TaskMapper(service, 4);
stream.map(taskMapper)
    .filter(fl -> fl != null) //filter for the chunks
    .flatMap(fl -> fl.stream()) //flat-map the chunks to futures
    .map(future -> 
        try                
            return future.get();
         catch (InterruptedException | ExecutionException ex)     
            throw new RuntimeException(ex);
        
    );  
//process the remaining futures  
for(Future<Integer> f : taskMapper.getChunk())
    try                
        Integer i = f.get();
        //process i
     catch (InterruptedException | ExecutionException ex)     
        //exception handling
    

它的工作原理如下:TaskMapper 每次将 4 个可调用对象提交到服务并将它们映射到一大块期货(没有 Spliterator)。这可以通过每次映射到null 来解决第一个、第二个和第三个可调用的问题。例如,null 可以替换为虚拟对象。将 future 映射到结果的映射函数等待块的每个 future 的结果。我在示例中使用Integer 而不是SomeClass。当当前 chunk 中的 future 的所有结果都被映射后,一个新的 chunk 将被创建并并行化。最后,如果流中的元素数量不能被chunkSize(在我的示例中为4)整除,则必须从TaskMapper 中检索剩余的期货并在流之外进行处理。

此构造适用于我执行的测试,但我知道由于副作用、状态完整性和流的未定义评估行为,它可能很脆弱。

EDIT2:

我使用自定义 Spliterator 制作了上一个 EDIT 中的构造版本:

public class ExecutorServiceSpliterator<T> extends AbstractSpliterator<Future<T>>
    private final Spliterator<? extends Callable<T>> srcSpliterator;
    private final ExecutorService service;
    private final int chunkSize;
    private final Queue<Future<T>> futures = new LinkedList<>();

    private ExecutorServiceSpliterator(Spliterator<? extends Callable<T>> srcSpliterator) 
        this(srcSpliterator, Executors.newFixedThreadPool(8), 30); //default
    

    private ExecutorServiceSpliterator(Spliterator<? extends Callable<T>> srcSpliterator, ExecutorService service, int chunkSize) 
        super(Long.MAX_VALUE, srcSpliterator.characteristics() & ~SIZED & ~CONCURRENT);
        this.srcSpliterator = srcSpliterator;
        this.service = service;
        this.chunkSize = chunkSize;
    

    public static <T> Stream<T> pipeParallelized(Stream<? extends Callable<T>> srcStream)
        return getStream(new ExecutorServiceSpliterator<>(srcStream.spliterator()));
    

    public static <T> Stream<T> pipeParallelized(Stream<? extends Callable<T>> srcStream, ExecutorService service, int chunkSize)
        return getStream(new ExecutorServiceSpliterator<>(srcStream.spliterator(), service, chunkSize));
    

    private static <T> Stream<T> getStream(ExecutorServiceSpliterator<T> serviceSpliterator)
        return StreamSupport.stream(serviceSpliterator, false)
            .map(future -> 
                try                
                    return future.get();
                 catch (InterruptedException | ExecutionException ex)     
                    throw new RuntimeException(ex);
                
            
        );
    

    @Override
    public boolean tryAdvance(Consumer<? super Future<T>> action) 
        boolean didAdvance = true;
        while((didAdvance = srcSpliterator.tryAdvance(c -> futures.add(service.submit(c))))
                && futures.size() < chunkSize);
        if(!didAdvance)
            service.shutdown();
        

        if(!futures.isEmpty())
            Future<T> future = futures.remove();
            action.accept(future);
            return true; 
        
        return false;
               


这个类提供了函数 (pipeParallelized()),它接受 Callable 元素的流,以并行方式逐块执行它们,然后输出包含结果的顺序流。 Spliterators 被允许是有状态的。因此,这个版本应该不会违反任何流操作约束。这就是Splitterator 的使用方式(接近“魔术单线”):

ExecutorServiceSpliterator.pipeParallelized(stream);

这一行采用Callables stream 的流并行化它们的执行并返回一个包含结果的顺序流(管道延迟发生 -> 应该与数百万个可调用对象一起工作)可以通过常规流操作进一步处理.

ExecutorServiceSpliterator的实现非常基础。它应该主要展示如何在原则上做到这一点。可以优化服务的再供应和结果的检索。例如,如果允许生成的流是无序的,则可以使用 CompletionService

【讨论】:

catch 块中的正确做法是将捕获的异常包装在运行时异常中,而不是用 null 污染 Stream。 如果流是按顺序处理的,它会将每个可调用对象一个一个映射到一个未来,然后立即等待刚刚创建的未来并在处理下一个可调用对象之前处理其结果。此解决方案不会进行并行处理。 @DidierL 感谢您指出这一点。我重新设计了解决方案,使其具有实际的并行处理功能。它似乎按预期工作,但它依赖于流实现可能无法保证满足的某些条件。【参考方案2】:

您要求的是惯用的解决方案。不鼓励在其行为参数中具有副作用的流(在 Stream 的 javadoc 中明确说明)。

所以惯用的解决方案基本上是 ExecutorService + Futures 和一些循环/forEach()。如果你有一个 Stream 作为参数,只需使用标准 Collector 将其转换为 List。

类似的东西:

    ExecutorService service = Executors.newFixedThreadPool(5);
    service.invokeAll(callables).forEach( doSomething );
    // or just
    return service.invokeAll(callables);

【讨论】:

编辑:流正在访问超过一百万个不适合内存的对象。所以我无法将流转换为invokeAll 的列表。 其实这个方案真正的问题是forEach()是并行执行的,而我的doSomething必须是单线程的。【参考方案3】:

第一个例子:

ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
    () -> "job1", 
    () -> "job2",  
    () -> "job3");

executor.invokeAll(callables).stream().map(future -> 
    return future.get();
).forEach(System.out::println);

第二个例子:

Stream.of("1", "2", "3", "4", "", "5")
      .filter(s->s.length() > 0)
      .parallel()
      .forEachOrdered(System.out::println);

【讨论】:

编辑:流正在访问超过一百万个不适合内存的对象。所以我无法将流转换为invokeAll 的列表。 这会有帮助吗? Stream.of("1", "2", "3", "4", "", "5").filter(s->s.length() > 0).parallel().forEachOrdered(System. out::println);【参考方案4】:
    public static void main(String[] args) 
            testInfititeCallableStream();
        
        private static void testInfititeCallableStream() 
            ExecutorService service = Executors.newFixedThreadPool(100);
            Consumer<Future<String>> consumeResult = (Future<String> future)->
                try 
                    System.out.println(future.get());
                 catch (InterruptedException | ExecutionException  e) 
                    e.printStackTrace();
                 
            ;
        getCallableStream().parallel().map(callable -> service.submit(callable)).forEach(consumeResult);   

        
    private static Stream<Callable<String>> getCallableStream() 
            Random randomWait = new Random();
            return Stream.<Callable<String>>generate(() -> 
new Callable<String>() 
                public String call() throws Exception 
                    //wait for testing
                    long time = System.currentTimeMillis();
                    TimeUnit.MILLISECONDS.sleep(randomWait.nextInt(5000));
                    return time + ":" +UUID.randomUUID().toString();
                ;
            ).limit(Integer.MAX_VALUE);
        

【讨论】:

【参考方案5】:

其他答案都不适合我。

我终于确定了这样的东西(伪代码):

ExecutorService executor = Executors.newWorkStealingPool();
CompletionService completor = new CompletionService(executor);
int count = stream.map(completor::submit).count();
while(count-- > 0) 
  SomeClass obj = completor.take();
  consume(obj);

consume(obj) 循环在单个线程中按顺序执行,而各个可调用任务通过 CompletionService 的多个线程异步工作。内存消耗是有限的,因为CompletionService 一次只能处理与可用线程一样多的项目。等待执行的 Callable 会从流中急切地具体化,但与每个开始执行时消耗的内存相比,其影响可以忽略不计(您的用例可能会有所不同)。

【讨论】:

以上是关于Java8 - 处理 Stream<Callable<...>> 的惯用方式,并行交付给非线程安全的消费者?的主要内容,如果未能解决你的问题,请参考以下文章

Java8新特性Stream流的使用

Java8新特性Stream流的使用

Java8新特性Stream流的使用

Java8新特性Stream流的使用

关于java8(Stream)的一些用法

具有批处理功能的 Java 8 Stream