为啥原始流没有收集(收集器)?

Posted

技术标签:

【中文标题】为啥原始流没有收集(收集器)?【英文标题】:Why don't primitive Stream have collect(Collector)?为什么原始流没有收集(收集器)? 【发布时间】:2015-07-30 09:40:28 【问题描述】:

我正在为新手程序员编写一个库,因此我试图让 API 尽可能干净。

我的库需要做的一件事是对大量 int 或 long 集合执行一些复杂的计算。我的用户需要从中计算这些值的场景和业务对象有很多,所以我认为最好的方法是使用流来允许用户将业务对象映射到 IntStreamLongStream,然后计算里面的计算的收藏家。

但是 IntStream 和 LongStream 只有 3 个参数的 collect 方法:

collect(Supplier<R> supplier, ObjIntConsumer<R> accumulator, BiConsumer<R,R> combiner)

并且没有Stream&lt;T&gt; 拥有的更简单的collect(Collector) 方法。

所以不是能够做到

Collection<T> businessObjs = ...
MyResult result = businessObjs.stream()
                              .mapToInt( ... )
                              .collect( new MyComplexComputation(...));

我必须像这样提供供应商、累加器和组合器:

MyResult result = businessObjs.stream()
                              .mapToInt( ... )
                              .collect( 
                                  ()-> new MyComplexComputationBuilder(...),
                                  (builder, v)-> builder.add(v),
                                  (a,b)-> a.merge(b))
                              .build(); //prev collect returns Builder object

这对我的新手用户来说太复杂了,而且很容易出错。

我的工作是制作以IntStreamLongStream 作为输入的静态方法,并为您隐藏收集器的创建和执行

public static MyResult compute(IntStream stream, ...)
       return .collect( 
                        ()-> new MyComplexComputationBuilder(...),
                        (builder, v)-> builder.add(v),
                        (a,b)-> a.merge(b))
               .build();

但这不符合使用 Streams 的常规约定:

IntStream tmpStream = businessObjs.stream()
                              .mapToInt( ... );

 MyResult result = MyUtil.compute(tmpStream, ...);

因为您必须保存一个临时变量并将其传递给静态方法,或者在静态调用中创建 Stream,当它与我的计算的其他参数混合时可能会造成混淆。

在使用IntStreamLongStream 的同时,是否有更简洁的方法来执行此操作?

【问题讨论】:

不幸的是,我的建议是使用Stream&lt;Integer&gt;。您可以通过mapToObj(Function.identity())IntStream 获取它。 如果编译器可以内联从转换到消费者的代码路径,它可能能够消除装箱/拆箱。只需像使用 IntStream 一样使用基于 int 的接口编写它们,看看它是否会产生任何垃圾。 @DmitryGinzburg, IntStream#boxed() 提供相同的功能。 好吧,我们也不应该对编译器优化抱有太大的信心。 对你有好处。对我来说,它主要是基于信仰的——我相信我可以这样写,因为编译器可以优化它;我相信我不应该那样做,因为编译器不太可能优化它。 - 我懒得去测试实施策略的每一个排列。 【参考方案1】:

我们实际上做了一些Collector.OfXxx 专业化的原型。我们发现——除了更专业化类型的明显烦恼之外——如果没有完整的原始专业化集合(如 Trove 所做的,或 GS-Collections,但 JDK 所做的),这并不是非常有用没有)。例如,如果没有 IntArrayList,Collector.OfInt 只会将装箱推到其他地方——从收集器到容器——这并没有什么大的胜利,而且还有更多的 API 表面。

【讨论】:

所以你不认为会有性能差异吗?如果我使用IntStream,我不知道会有任何拳击 您正在查看流的错误端。 IntStream 在操作整数时不进行装箱。但是你可以在没有装箱的情况下将 int 放入什么结果容器中?不是 ArrayList 或 HashSet 或 ... Collector.OfInt 如果没有一组丰富的对 int 友好的东西来收集,就没有用处。 我认为我的用例将主要使用返回整数的映射函数,如 stream.mapToInt(String::getLength) 也不应该有任何装箱 @dkatzel 对,这里的 IntStream 和朋友们的重点是能够不对每个操作进行装箱,你确实明白了。如果我们有原始友好的集合,那么 Collector.OfXxx 会更有意义,但我们没有这些,所以我们停止了专业化产生实用价值的地方。 @bayou.io 你不必被说服。问题是“他们为什么不这样做”,答案是“我们对其进行了原型设计,并得出结论认为努力、成本、复杂性不存在回报。”【参考方案2】:

也许如果使用方法引用而不是 lambda,原始流收集所需的代码似乎不会那么复杂。

MyResult result = businessObjs.stream()
                              .mapToInt( ... )
                              .collect( 
                                  MyComplexComputationBuilder::new,
                                  MyComplexComputationBuilder::add,
                                  MyComplexComputationBuilder::merge)
                              .build(); //prev collect returns Builder object

在 Brian 的 definitive answer to this question 中,他提到了另外两个 Java 集合框架,它们确实具有原始集合,实际上可以与原始流上的 collect 方法一起使用。我认为举例说明如何将这些框架中的原始容器与原始流一起使用可能会很有用。下面的代码也适用于并行流。

// Eclipse Collections
List<Integer> integers = Interval.oneTo(5).toList();

Assert.assertEquals(
        IntInterval.oneTo(5),
        integers.stream()
                .mapToInt(Integer::intValue)
                .collect(IntArrayList::new, IntArrayList::add, IntArrayList::addAll));

// Trove Collections

Assert.assertEquals(
        new TIntArrayList(IntStream.range(1, 6).toArray()),
        integers.stream()
                .mapToInt(Integer::intValue)
                .collect(TIntArrayList::new, TIntArrayList::add, TIntArrayList::addAll));

注意:我是Eclipse Collections 的提交者。

【讨论】:

【参考方案3】:

如果缺少某些方法,请将原始流转换为装箱对象流。

MyResult result = businessObjs.stream()
                          .mapToInt( ... )
                          .boxed()
                          .collect( new MyComplexComputation(...));

或者一开始就不要使用原始流,而是一直使用Integers。

MyResult result = businessObjs.stream()
                          .map( ... )     // map to Integer not int
                          .collect( new MyComplexComputation(...));

【讨论】:

【参考方案4】:

我已经在我的库 StreamEx 中实现了原始收集器(从 0.3.0 版开始)。有接口IntCollectorLongCollectorDoubleCollector 扩展了Collector 接口并专门用于处理原语。组合过程还有一个细微的差别,因为像 IntStream.collect 这样的方法接受 BiConsumer 而不是 BinaryOperator

有一堆预定义的收集方法可以将数字连接到字符串,存储到原始数组,到BitSet,查找最小值,最大值,求和,计算汇总统计,执行分组和分区操作。当然,您可以定义自己的收集器。下面是几个使用示例(假设您有带有输入数据的int[] input 数组)。

用分隔符将数字连接为字符串:

String nums = IntStreamEx.of(input).collect(IntCollector.joining(","));

按最后一位分组:

Map<Integer, int[]> groups = IntStreamEx.of(input)
      .collect(IntCollector.groupingBy(i -> i % 10));

分别对正数和负数求和:

Map<Boolean, Integer> sums = IntStreamEx.of(input)
      .collect(IntCollector.partitioningBy(i -> i > 0, IntCollector.summing()));

这是一个简单的benchmark,它比较了这些收集器和通常的对象收集器。

请注意,我的库不提供(将来也不会提供)任何用户可见的数据结构,例如图元上的地图,因此分组是按通常的HashMap 执行的。但是,如果您使用的是 Trove/GS/HFTC/whatever,为这些库中定义的数据结构编写额外的原始收集器以获得更高的性能并不难。

【讨论】:

【参考方案5】:

先生。 Geotz provided the definitive answer for why the decision was made not to include specialized Collectors,然而,我想进一步调查这个决定对性能的影响程度。

我想我会发布我的结果作为答案。

我使用jmh microbenchmark framework 来计算使用两种收集器对大小为 1、100、1000、100,000 和 100 万的集合进行计算所需的时间:

@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@State(Scope.Thread)
public class MyBenchmark 

@Param("1", "100", "1000", "100000", "1000000")
public int size;

List<BusinessObj> seqs;

@Setup
public void setup()
    seqs = new ArrayList<BusinessObj>(size);
    Random rand = new Random();
    for(int i=0; i< size; i++)
        //these lengths are random but over 128 so no caching of Longs
        seqs.add(BusinessObjFactory.createOfRandomLength());
    

@Benchmark
public double objectCollector()        

    return seqs.stream()
                .map(BusinessObj::getLength)
                .collect(MyUtil.myCalcLongCollector())
                .getAsDouble();


@Benchmark
public double primitiveCollector() 

    LongStream stream= seqs.stream()
                                    .mapToLong(BusinessObj::getLength);
    return MyUtil.myCalc(stream)        
                        .getAsDouble();


public static void main(String[] args) throws RunnerException
    Options opt = new OptionsBuilder()
                        .include(MyBenchmark.class.getSimpleName())
                        .build();

    new Runner(opt).run();



结果如下:

# JMH 1.9.3 (released 4 days ago)
# VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_31.jdk/Contents/Home/jre/bin/java
# VM options: <none>
# Warmup: 20 iterations, 1 s each
# Measurement: 20 iterations, 1 s each
# Timeout: 10 min per iteration
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Average time, time/op
# Benchmark: org.sample.MyBenchmark.objectCollector

# Run complete. Total time: 01:30:31

Benchmark                        (size)  Mode  Cnt          Score         Error  Units
MyBenchmark.objectCollector           1  avgt  200        140.803 ±       1.425  ns/op
MyBenchmark.objectCollector         100  avgt  200       5775.294 ±      67.871  ns/op
MyBenchmark.objectCollector        1000  avgt  200      70440.488 ±    1023.177  ns/op
MyBenchmark.objectCollector      100000  avgt  200   10292595.233 ±  101036.563  ns/op
MyBenchmark.objectCollector     1000000  avgt  200  100147057.376 ±  979662.707  ns/op
MyBenchmark.primitiveCollector        1  avgt  200        140.971 ±       1.382  ns/op
MyBenchmark.primitiveCollector      100  avgt  200       4654.527 ±      87.101  ns/op
MyBenchmark.primitiveCollector     1000  avgt  200      60929.398 ±    1127.517  ns/op
MyBenchmark.primitiveCollector   100000  avgt  200    9784655.013 ±  113339.448  ns/op
MyBenchmark.primitiveCollector  1000000  avgt  200   94822089.334 ± 1031475.051  ns/op

如您所见,原始 Stream 版本稍快一些,但即使集合中有 100 万个元素,也仅快 0.05 秒(平均)。

对于我的 API,我宁愿保持更简洁的 Object Stream 约定并使用盒装版本,因为它对性能的影响很小。

感谢所有深入了解此问题的人。

【讨论】:

这取决于您要解决的任务。在您的情况下,您的计算可能非常复杂,因此装箱开销并不重要。我几乎在我的库中实现了原始收集器,性能提升非常显着,从“按最后一位分组”任务的 30%,到字符串连接的 2 倍和“按符号求和”任务的 5 倍。查看我的基准测试和结果here。 @TagirValeev 是的,您可能是正确的。大部分时间都花在我已经拆箱的计算中 @dkatzel 尝试测量IntStream.sum()Stream&lt;Integer&gt;.reduce(0, Integer::sum, Integer::sum) 之间的速度、可扩展性 差异。您会看到在这些情况下,速度和并行加速的差异是巨大的——这就是为什么原始专业化对于基本操作是合理的。但随着操作变得越来越重量级,收益越来越低,而且拳击变得更容易接受。

以上是关于为啥原始流没有收集(收集器)?的主要内容,如果未能解决你的问题,请参考以下文章

为啥 PHP 的垃圾收集器会降低性能,没有它如何管理内存?

为啥在执行 Java Stream 终端操作时对象没有被垃圾收集?

Java流:收集嵌套集合

Kotlin 协程Flow 异步流 ⑥ ( 调用 Flow#launchIn 函数指定流收集协程 | 通过取消流收集所在的协程取消流 )

Kotlin 协程Flow 异步流 ⑥ ( 调用 Flow#launchIn 函数指定流收集协程 | 通过取消流收集所在的协程取消流 )

可以用多个收集器收集的 Kotlin 流(或类似的东西)