具有批处理功能的 Java 8 Stream

Posted

技术标签:

【中文标题】具有批处理功能的 Java 8 Stream【英文标题】:Java 8 Stream with batch processing 【发布时间】:2015-08-18 22:04:21 【问题描述】:

我有一个包含项目列表的大文件。

我想创建一批项目,用这批项目发出一个 HTTP 请求(所有项目都需要作为 HTTP 请求中的参数)。我可以使用for 循环很容易地做到这一点,但作为 Java 8 爱好者,我想尝试使用 Java 8 的 Stream 框架编写它(并获得延迟处理的好处)。

例子:

List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) 
  batch.add(data.get(i));
  if (batch.size() == BATCH_SIZE) process(batch);


if (batch.size() > 0) process(batch);

我想做一些很长的事情 lazyFileStream.group(500).map(processBatch).collect(toList())

最好的方法是什么?

【问题讨论】:

我不太明白如何进行分组,抱歉,Files#lines 会懒惰地读取文件的内容。 所以你基本上需要flatMap 的倒数(+ 一个额外的 flatMap 来再次折叠流)?我认为标准库中不存在这样的方便方法。您要么必须找到第 3 方库,要么根据拆分器和/或发射流的收集器编写自己的库 也许您可以将Stream.generatereader::readLinelimit 结合使用,但问题是流不能很好地处理异常。此外,这可能不能很好地并行化。我认为for 循环仍然是最好的选择。 我刚刚添加了一个示例代码。我不认为 flatMap 是要走的路。怀疑我可能需要编写一个自定义 Spliterator 我为此类问题创造了“流滥用”一词。 【参考方案1】:

注意! 此解决方案在运行 forEach 之前读取整个文件。

您可以使用 jOOλ 来实现,这是一个为单线程、顺序流用例扩展 Java 8 流的库:

Seq.seq(lazyFileStream)              // Seq<String>
   .zipWithIndex()                   // Seq<Tuple2<String, Long>>
   .groupBy(tuple -> tuple.v2 / 500) // Map<Long, List<String>>
   .forEach((index, batch) -> 
       process(batch);
   );

在幕后,zipWithIndex() 只是:

static <T> Seq<Tuple2<T, Long>> zipWithIndex(Stream<T> stream) 
    final Iterator<T> it = stream.iterator();

    class ZipWithIndex implements Iterator<Tuple2<T, Long>> 
        long index;

        @Override
        public boolean hasNext() 
            return it.hasNext();
        

        @Override
        public Tuple2<T, Long> next() 
            return tuple(it.next(), index++);
        
    

    return seq(new ZipWithIndex());

... 而groupBy() 是 API 方便:

default <K> Map<K, List<T>> groupBy(Function<? super T, ? extends K> classifier) 
    return collect(Collectors.groupingBy(classifier));

(免责声明:我为 jOOλ 背后的公司工作)

【讨论】:

哇。这正是我正在寻找的。我们的系统通常按顺序处理数据流,因此非常适合迁移到 Java 8。 请注意,此解决方案不必要地将整个输入流存储到中间 Map(与 Ben Manes 解决方案不同) 确实,确定第一批的结束,初始化整个流并在内部进行缓冲。【参考方案2】:

纯 Java-8 实现也是可能的:

int BATCH = 500;
IntStream.range(0, (data.size()+BATCH-1)/BATCH)
         .mapToObj(i -> data.subList(i*BATCH, Math.min(data.size(), (i+1)*BATCH)))
         .forEach(batch -> process(batch));

请注意,与 JOOl 不同,它可以很好地并行工作(前提是您的 data 是一个随机访问列表)。

【讨论】:

如果您的数据实际上是一个流怎么办? (比如说文件中的行,甚至来自网络)。 @OmryYadan,问题是关于来自List 的输入(请参阅问题中的data.size()data.get())。我正在回答提出的问题。如果您还有其他问题,请改为提问(尽管我认为直播问题也已被问到)。 如何并行处理批次?【参考方案3】:

你也可以看看cyclops-react,我是这个库的作者。它实现了 jOOλ 接口(以及扩展的 JDK 8 Streams),但与 JDK 8 Parallel Streams 不同,它专注于异步操作(例如可能阻塞异步 I/O 调用)。相比之下,JDK Parallel Streams 专注于 CPU 绑定操作的数据并行性。它的工作原理是在后台管理基于 Future 的任务的聚合,但向最终用户提供标准的扩展 Stream API。

此示例代码可以帮助您入门

LazyFutureStream.parallelCommonBuilder()
                .react(data)
                .grouped(BATCH_SIZE)                  
                .map(this::process)
                .run();

有一个tutorial on batching here

还有一个more general Tutorial here

要使用您自己的线程池(这可能更适合阻塞 I/O),您可以开始处理

     LazyReact reactor = new LazyReact(40);

     reactor.react(data)
            .grouped(BATCH_SIZE)                  
            .map(this::process)
            .run();

【讨论】:

【参考方案4】:

你也可以使用RxJava:

RxJava v3:

int batchSize = 50;
List<Table> tables = new ArrayList<>();
Observable.fromIterable(_someStream_)
        .buffer(batchSize)
        .map(batch -> process(batch))
        .blockingSubscribe(tables::addAll, t -> Log.warning("Error", t));

以前的版本:

Observable.from(data).buffer(BATCH_SIZE).forEach((batch) -> process(batch));

Observable.from(lazyFileStream).buffer(500).map((batch) -> process(batch)).toList();

Observable.from(lazyFileStream).buffer(500).map(MyClass::process).toList();

【讨论】:

【参考方案5】:

为了完整起见,这里有一个Guava 解决方案。

Iterators.partition(stream.iterator(), batchSize).forEachRemaining(this::process);

在问题中,集合可用,因此不需要流,它可以写成,

Iterables.partition(data, batchSize).forEach(this::process);

【讨论】:

Lists.partition 是我应该提到的另一个变体。 这很懒,对吧?在处理相关批次之前,它不会将整个 Stream 调用到内存中 @orirab 是的。它在批次之间是惰性的,因为每次迭代都会消耗batchSize 个元素。【参考方案6】:

纯 Java 8 解决方案

我们可以创建一个自定义收集器来优雅地执行此操作,它接收一个batch size 和一个Consumer 来处理每个批次:

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.*;
import java.util.stream.Collector;

import static java.util.Objects.requireNonNull;


/**
 * Collects elements in the stream and calls the supplied batch processor
 * after the configured batch size is reached.
 *
 * In case of a parallel stream, the batch processor may be called with
 * elements less than the batch size.
 *
 * The elements are not kept in memory, and the final result will be an
 * empty list.
 *
 * @param <T> Type of the elements being collected
 */
class BatchCollector<T> implements Collector<T, List<T>, List<T>> 

    private final int batchSize;
    private final Consumer<List<T>> batchProcessor;


    /**
     * Constructs the batch collector
     *
     * @param batchSize the batch size after which the batchProcessor should be called
     * @param batchProcessor the batch processor which accepts batches of records to process
     */
    BatchCollector(int batchSize, Consumer<List<T>> batchProcessor) 
        batchProcessor = requireNonNull(batchProcessor);

        this.batchSize = batchSize;
        this.batchProcessor = batchProcessor;
    

    public Supplier<List<T>> supplier() 
        return ArrayList::new;
    

    public BiConsumer<List<T>, T> accumulator() 
        return (ts, t) -> 
            ts.add(t);
            if (ts.size() >= batchSize) 
                batchProcessor.accept(ts);
                ts.clear();
            
        ;
    

    public BinaryOperator<List<T>> combiner() 
        return (ts, ots) -> 
            // process each parallel list without checking for batch size
            // avoids adding all elements of one to another
            // can be modified if a strict batching mode is required
            batchProcessor.accept(ts);
            batchProcessor.accept(ots);
            return Collections.emptyList();
        ;
    

    public Function<List<T>, List<T>> finisher() 
        return ts -> 
            batchProcessor.accept(ts);
            return Collections.emptyList();
        ;
    

    public Set<Characteristics> characteristics() 
        return Collections.emptySet();
    

然后可以选择创建一个辅助实用程序类:

import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collector;

public class StreamUtils 

    /**
     * Creates a new batch collector
     * @param batchSize the batch size after which the batchProcessor should be called
     * @param batchProcessor the batch processor which accepts batches of records to process
     * @param <T> the type of elements being processed
     * @return a batch collector instance
     */
    public static <T> Collector<T, List<T>, List<T>> batchCollector(int batchSize, Consumer<List<T>> batchProcessor) 
        return new BatchCollector<T>(batchSize, batchProcessor);
    

示例用法:

List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> output = new ArrayList<>();

int batchSize = 3;
Consumer<List<Integer>> batchProcessor = xs -> output.addAll(xs);

input.stream()
     .collect(StreamUtils.batchCollector(batchSize, batchProcessor));

我也在 GitHub 上发布了我的代码,如果有人想看看:

Link to Github

【讨论】:

这是一个很好的解决方案,除非您无法将流中的所有元素都放入内存中。此外,它不适用于无休止的流 - collect 方法是终端,这意味着它不会产生批处理流,而是等待流完成,然后分批处理结果。 @AlexAckerman 无限流意味着永远不会调用完成器,但仍会调用累加器,因此仍会处理项目。此外,它只需要在任何时候都在内存中的项目的批量大小。 @Solubris,你是对的!我的错,感谢您指出这一点 - 如果有人对 collect 方法的工作原理有相同的想法,我不会删除参考评论。 应复制发送给消费者的列表以使其修改安全,例如:batchProcessor.accept(copyOf(ts))【参考方案7】:

我为这样的场景编写了一个自定义拆分器。它将从输入流中填充给定大小的列表。这种方法的优点是它会执行惰性处理,并且可以与其他流函数一起使用。

public static <T> Stream<List<T>> batches(Stream<T> stream, int batchSize) 
    return batchSize <= 0
        ? Stream.of(stream.collect(Collectors.toList()))
        : StreamSupport.stream(new BatchSpliterator<>(stream.spliterator(), batchSize), stream.isParallel());


private static class BatchSpliterator<E> implements Spliterator<List<E>> 

    private final Spliterator<E> base;
    private final int batchSize;

    public BatchSpliterator(Spliterator<E> base, int batchSize) 
        this.base = base;
        this.batchSize = batchSize;
    

    @Override
    public boolean tryAdvance(Consumer<? super List<E>> action) 
        final List<E> batch = new ArrayList<>(batchSize);
        for (int i=0; i < batchSize && base.tryAdvance(batch::add); i++)
            ;
        if (batch.isEmpty())
            return false;
        action.accept(batch);
        return true;
    

    @Override
    public Spliterator<List<E>> trySplit() 
        if (base.estimateSize() <= batchSize)
            return null;
        final Spliterator<E> splitBase = this.base.trySplit();
        return splitBase == null ? null
                : new BatchSpliterator<>(splitBase, batchSize);
    

    @Override
    public long estimateSize() 
        final double baseSize = base.estimateSize();
        return baseSize == 0 ? 0
                : (long) Math.ceil(baseSize / (double) batchSize);
    

    @Override
    public int characteristics() 
        return base.characteristics();
    


【讨论】:

真的很有帮助。如果有人想要批量处理一些自定义条件(例如集合大小(以字节为单位)),那么您可以委托您的自定义谓词并在 for 循环中使用它作为条件(恕我直言,while 循环将更具可读性) 我不确定实现是否正确。例如,如果基本流是SUBSIZED,则从trySplit 返回的拆分可能比拆分前有更多的项目(如果拆分发生在批处理中间)。 @Malt 如果我对Spliterators 的理解是正确的,那么trySplit 应该始终将数据分成两个大致相等的部分,这样结果就不会大于原始数据? @BruceHamilton 不幸的是,根据文档,这些部分不能大致相等。它们必须相等:if this Spliterator is SUBSIZED, then estimateSize() for this spliterator before splitting must be equal to the sum of estimateSize() for this and the returned Spliterator after splitting. 是的,这和我对Spliterator拆分的理解是一致的。但是,我很难理解“从 trySplit 返回的拆分如何比拆分前拥有更多的项目”,您能否详细说明您的意思?【参考方案8】:

我们有一个类似的问题要解决。我们希望采用大于系统内存的流(遍历数据库中的所有对象)并尽可能随机化顺序 - 我们认为缓冲 10,000 个项目并随机化它们是可以的。

目标是一个接收流的函数。

在此处提出的解决方案中,似乎有多种选择:

使用各种非 java 8 附加库 从不是流的东西开始 - 例如。随机访问列表 有一个可以在拆分器中轻松拆分的流

我们的直觉最初是使用自定义收集器,但这意味着退出流媒体。上面的自定义收集器解决方案非常好,我们几乎使用它。

这是一个作弊的解决方案,它利用 Streams 可以给你一个 Iterator 的事实来作弊,你可以将其用作一个逃生舱,让你做一些流不做的额外事情支持。 Iterator 使用另一位 Java 8 StreamSupport 巫术转换回流。

/**
 * An iterator which returns batches of items taken from another iterator
 */
public class BatchingIterator<T> implements Iterator<List<T>> 
    /**
     * Given a stream, convert it to a stream of batches no greater than the
     * batchSize.
     * @param originalStream to convert
     * @param batchSize maximum size of a batch
     * @param <T> type of items in the stream
     * @return a stream of batches taken sequentially from the original stream
     */
    public static <T> Stream<List<T>> batchedStreamOf(Stream<T> originalStream, int batchSize) 
        return asStream(new BatchingIterator<>(originalStream.iterator(), batchSize));
    

    private static <T> Stream<T> asStream(Iterator<T> iterator) 
        return StreamSupport.stream(
            Spliterators.spliteratorUnknownSize(iterator,ORDERED),
            false);
    

    private int batchSize;
    private List<T> currentBatch;
    private Iterator<T> sourceIterator;

    public BatchingIterator(Iterator<T> sourceIterator, int batchSize) 
        this.batchSize = batchSize;
        this.sourceIterator = sourceIterator;
    

    @Override
    public boolean hasNext() 
        prepareNextBatch();
        return currentBatch!=null && !currentBatch.isEmpty();
    

    @Override
    public List<T> next() 
        return currentBatch;
    

    private void prepareNextBatch() 
        currentBatch = new ArrayList<>(batchSize);
        while (sourceIterator.hasNext() && currentBatch.size() < batchSize) 
            currentBatch.add(sourceIterator.next());
        
    

使用它的一个简单示例如下所示:

@Test
public void getsBatches() 
    BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
        .forEach(System.out::println);

以上印刷品

[A, B, C]
[D, E, F]

对于我们的用例,我们想打乱批次,然后将它们保持为流 - 它看起来像这样:

@Test
public void howScramblingCouldBeDone() 
    BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
        // the lambda in the map expression sucks a bit because Collections.shuffle acts on the list, rather than returning a shuffled one
        .map(list -> 
            Collections.shuffle(list); return list; )
        .flatMap(List::stream)
        .forEach(System.out::println);

这会输出类似的东西(它是随机的,每次都不同)

A
C
B
E
D
F

这里的秘诀在于始终存在流,因此您可以对批次流进行操作,或者对每个批次执行某些操作,然后将flatMap 重新返回到流中。更好的是,以上所有内容仅作为最终的forEachcollect 或其他终止表达式PULL通过流中的数据运行。

原来iterator 是流上的一种特殊类型的终止操作,不会导致整个流运行并进入内存!感谢 Java 8 人员的出色设计!

【讨论】:

当每批被收集并坚持到List时,你可以完全迭代它是非常好的——你不能推迟批内元素的迭代,因为消费者可能想要跳过一个整个批次,如果您不消耗这些元素,那么它们就不会跳得太远。 (我已经在 C# 中实现了其中之一,尽管它要容易得多。) 这是这里最好的解决方案之一。完全被低估了。【参考方案9】:

使用 Spliterator 的简单示例

    // read file into stream, try-with-resources
    try (Stream<String> stream = Files.lines(Paths.get(fileName))) 
        //skip header
        Spliterator<String> split = stream.skip(1).spliterator();
        Chunker<String> chunker = new Chunker<String>();
        while(true)               
            boolean more = split.tryAdvance(chunker::doSomething);
            if (!more) 
                break;
            
                   
     catch (IOException e) 
        e.printStackTrace();
    


static class Chunker<T> 
    int ct = 0;
    public void doSomething(T line) 
        System.out.println(ct++ + " " + line.toString());
        if (ct % 100 == 0) 
            System.out.println("====================chunk=====================");               
                   
           

布鲁斯的回答更全面,但我一直在寻找快速而肮脏的东西来处理一堆文件。

【讨论】:

【参考方案10】:

纯 Java 8 示例也适用于并行流。

使用方法:

Stream<Integer> integerStream = IntStream.range(0, 45).parallel().boxed();
CsStreamUtil.processInBatch(integerStream, 10, batch -> System.out.println("Batch: " + batch));

方法声明与实现:

public static <ElementType> void processInBatch(Stream<ElementType> stream, int batchSize, Consumer<Collection<ElementType>> batchProcessor)

    List<ElementType> newBatch = new ArrayList<>(batchSize);

    stream.forEach(element -> 
        List<ElementType> fullBatch;

        synchronized (newBatch)
        
            if (newBatch.size() < batchSize)
            
                newBatch.add(element);
                return;
            
            else
            
                fullBatch = new ArrayList<>(newBatch);
                newBatch.clear();
                newBatch.add(element);
            
        

        batchProcessor.accept(fullBatch);
    );

    if (newBatch.size() > 0)
        batchProcessor.accept(new ArrayList<>(newBatch));

【讨论】:

【参考方案11】:

这是一个延迟评估的纯 java 解决方案。

public static <T> Stream<List<T>> partition(Stream<T> stream, int batchSize)
    List<List<T>> currentBatch = new ArrayList<List<T>>(); //just to make it mutable 
    currentBatch.add(new ArrayList<T>(batchSize));
    return Stream.concat(stream
      .sequential()                   
      .map(new Function<T, List<T>>()
          public List<T> apply(T t)
              currentBatch.get(0).add(t);
              return currentBatch.get(0).size() == batchSize ? currentBatch.set(0,new ArrayList<>(batchSize)): null;
            
      ), Stream.generate(()->currentBatch.get(0).isEmpty()?null:currentBatch.get(0))
                .limit(1)
    ).filter(Objects::nonNull);

【讨论】:

【参考方案12】:

使用Java 8com.google.common.collect.Lists,您可以执行以下操作:

public class BatchProcessingUtil 
    public static <T,U> List<U> process(List<T> data, int batchSize, Function<List<T>, List<U>> processFunction) 
        List<List<T>> batches = Lists.partition(data, batchSize);
        return batches.stream()
                .map(processFunction) // Send each batch to the process function
                .flatMap(Collection::stream) // flat results to gather them in 1 stream
                .collect(Collectors.toList());
    

这里T是输入列表中项目的类型,U是输出列表中项目的类型

你可以这样使用它:

List<String> userKeys = [... list of user keys]
List<Users> users = BatchProcessingUtil.process(
    userKeys,
    10, // Batch Size
    partialKeys -> service.getUsers(partialKeys)
);

【讨论】:

【参考方案13】:

您可以使用 apache.commons :

ListUtils.partition(ListOfLines, 500).stream()
                .map(partition -> processBatch(partition)
                .collect(Collectors.toList());

分区部分是非惰性完成的,但在列表分区后,您将获得使用流的好处(例如,使用并行流、添加过滤器等)。 其他答案提出了更复杂的解决方案,但有时可读性和可维护性更重要(有时它们不是:-))

【讨论】:

不确定谁投了反对票,但很高兴能理解为什么。我给出的答案补充了无法使用番石榴的人的其他答案 你在这里处理的是一个列表,而不是一个流。 @Drakemor 我正在处理子列表流。注意 stream() 函数调用 但是首先你把它变成一个子列表的列表,这对于 true 流数据将无法正常工作。这里是对分区的引用:commons.apache.org/proper/commons-collections/apidocs/org/… TBH 我不完全理解你的论点,但我想我们可以同意不同意。我已经编辑了我的答案以反映我们在这里的对话。感谢讨论【参考方案14】:

这可以很容易地使用 Reactor:

Flux.fromStream(fileReader.lines().onClose(() -> safeClose(fileReader)))
            .map(line -> someProcessingOfSingleLine(line))
            .buffer(BUFFER_SIZE)
            .subscribe(apiService::makeHttpRequest);

【讨论】:

【参考方案15】:

平心而论,看看优雅的Vavr 解决方案:

Stream.ofAll(data).grouped(BATCH_SIZE).forEach(this::process);

【讨论】:

以上是关于具有批处理功能的 Java 8 Stream的主要内容,如果未能解决你的问题,请参考以下文章

Java 8中处理集合的优雅姿势——Stream

Java 8中处理集合的优雅姿势——Stream

java 8 stream学习整理

简洁方便的集合处理——Java 8 stream流

Java 8 Stream

Java 8新特性stream API用法总结