Java Collectors.groupingBy 可以将 Stream 作为其分组项目列表返回吗?

Posted

技术标签:

【中文标题】Java Collectors.groupingBy 可以将 Stream 作为其分组项目列表返回吗?【英文标题】:Can a Java Collectors.groupingBy return a Stream as its list of grouped items? 【发布时间】:2018-11-07 11:09:53 【问题描述】:

在 C# Linq 中,GroupBy 返回 IEnumerableIGrouping 项目,而这些项目又是所选值类型的项目的 IEnumerable。这是一个例子:

var namesAndScores = new Dictionary<string, int>> 
    ["David"] = 90,
    ["Jane"] = 91,
    ["Bill"] = 90,
    ["Tina"] = 89)
;
var IEnumerable<IGrouping<int, string>> namesGroupedByScore =
    namesAndScores
        .GroupBy(
            kvp => kvp.Value,
            kvp => kvp.Key
        );

// Result:
// 90 :  David, Bill 
// 91 :  Jane 
// 89 :  Tina 

具体来说,请注意每个IGrouping&lt;int, string&gt; 都是IEnumerable&lt;string&gt;,而不是List&lt;string&gt;。 (它还有一个.Key 属性。)

GroupBy 显然必须在发出单个分组之前完全枚举输入项,但是,由于它确实发出 IEnumerable&lt;string&gt; 而不是 List&lt;string&gt;,如果不这样做可能会带来性能优势t 枚举整个分组,例如你刚刚做了.First()

除此之外:从技术上讲,我想 GroupBy 可以等到您枚举它以使用输入中的单个项目,然后发出单个 IGrouping,并且仅将输入的其余部分枚举为 @987654337 @ 被枚举,在它搜索当前组中的下一个项目时将其他组收集到其内部数据结构中,但我发现这是一个不太可能且有问题的实现,并期望 GroupBy 将在调用时完全枚举。

First() 的代码如下所示:

 var oneStudentForEachNumericScore = namesGroupedByScore
     .ToDictionary(
         grouping => grouping.Key,
         grouping => grouping.First() // does not fully enumerate the values
     );
 // Result:
 // 90 : David -- Bill is missing and we don't care
 // 91 : Jane
 // 89 : Tina

现在在 Java Streams 中,要进行分组,您必须进行收集,并且不能只给 groupingBy 收集器提供第二个 lambda 来提取值。如果您想要一个与整个输入不同的值,则必须再次映射(但请注意,groupingBy 收集器可让您在一个步骤中创建多级组...的组)。以下是与上述 C# 代码等效的代码:

Map<Integer, List<String>> namesGroupedByScore = namesAndScores
      .entrySet().stream()
      .collect(Collectors.groupingBy(
          Map.Entry::getValue,
          Collectors.mapping(
              Map.Entry::getKey,
              Collectors.toList(),
          )
      ));

这似乎不太理想。所以我的问题是:

    有什么方法可以更简单地表达这一点,而不必使用Collectors.mapping 来获取组项作为值? 为什么我们必须收集到一个完全枚举的类型?有没有办法模拟 C# 的 GroupByIEnumerable 值类型并从 Collectors.mapping() 返回 Map&lt;Integer, Stream&lt;String&gt;&gt;,或者这没有用,因为无论如何都必须完全枚举值项?或者我们是否可以编写自己的 Collectors.groupingBy,它接受一个 lambda 作为第二个参数并为我们完成这项工作,使语法更接近 Linq 的 GroupBy,并且至少具有更简洁的语法并可能稍微提高性能? 作为一项理论练习,即使没有实际用处,是否可以编写我们自己的 Java 流收集器 toStream(),它返回一个 Stream 并且不会迭代其输入,除非它被枚举(迭代一个元素时间,推迟)?

【问题讨论】:

如果你真的想要一个Map&lt;Integer, Stream&lt;String&gt;&gt;,那么你可以使用toMap收集器namesAndScores.entrySet().stream().collect(toMap(Map.Entry::getValue, e -&gt; Stream.of(e.getKey()), Stream::concat));的这个重载。 @Aominè 请注意Stream.concat 的警告:“从重复连接构造流时要小心。访问深度串联流的元素可能会导致深度调用链,甚至***Error”。这不是一个理论问题。最后,这种方法在幕后仍然存在一种存储结构,但是对于更大的组来说扩展性很差(并且不支持多次遍历)。 【参考方案1】:

虽然这些操作在某些方面看起来很相似,但它们有着根本的不同。与 Linq 的 GroupBy 操作不同,Java 的 groupingByCollector,旨在与 Stream API 的 终端操作 collect 一起使用,这本身不是中间操作,因此, 一般不能用于实现惰性流操作。

groupingBy 收集器为组使用另一个下游 Collector,因此最好在最佳情况下指定一个收集器就地执行该操作,而不是通过组的元素流式传输来执行另一个操作。虽然这些收集器不支持短路,但它们消除了将组收集到Lists 的需要,只是为了流过它们。考虑一下,例如groupingBy(f1, summingInt(f2))。将组收集到List 的情况已被认为足够普遍,以至于在您未指定收集器时隐含toList(),但在收集到列表之前映射元素的情况并未考虑这种情况.

如果你经常遇到这种情况,定义自己的收集器很容易

public static <T,K,V> Collector<T,?,Map<K,List<V>>> groupingBy(
    Function<? super T, ? extends K> key, Function<? super T, ? extends V> value) 
    return Collectors.groupingBy(key, Collectors.mapping(value, Collectors.toList()));

并像使用它

Map<Integer,List<String>> result = map.entrySet().stream()
    .collect(groupingBy(Map.Entry::getValue, Map.Entry::getKey));

并且,由于您不需要使用方法引用并且希望更接近 Linq 原始:

Map<Integer,List<String>> result = map.entrySet().stream()
        .collect(groupingBy(kvp -> kvp.getValue(), kvp -> kvp.getKey()));

但是,如前所述,如果您以后要在此地图上进行流式传输并担心此操作的非惰性,您可能希望使用与 toList() 不同的收集器。

虽然这种方法在结果值方面提供了一些灵活性,但Map 及其键是此操作中不可避免的一部分,因为Map 不仅提供存储逻辑,它的查找操作还负责形成组,这也决定了语义。例如。当您将the variant with a map supplier 与() -&gt; new TreeMap&lt;&gt;(customComparator) 一起使用时,您可能会得到与默认HashMap 完全不同的组(想想,例如String.CASE_INSENSITIVE_ORDER)。另一方面,当您提供 EnumMap 时,您可能不会获得不同的语义,但会获得完全不同的性能特征。

相比之下,您描述的 Linq 中的 GroupBy 操作看起来像是一个中间操作,在 Stream API 中根本没有挂件。正如您自己建议的那样,当第一个元素被轮询时,它仍然进行完整遍历的可能性很高,在幕后完全填充数据结构。即使实现尝试一些惰性,结果也是有限的。你可以很便宜地得到第一组的第一个元素,但如果你只对那个元素感兴趣,你根本不需要分组。第一组的第二个元素可能已经是源流的最后一个,需要完整的遍历和存储。

因此,提供这样的操作意味着一些复杂性,而与急切收集相比几乎没有什么好处。也很难想象它的并行实现(提供优于collect 操作的好处)。实际的不便并非源于此设计决策,而是源于生成的Map is not a Collection(注意单独实现Iterablewouldn’t imply having a stream() method)和决策to separate collection operations and stream operations。这两个方面导致需要使用entrySet().stream() 在地图上进行流式传输,但这超出了本问题的范围。而且,如上所述,如果您需要此功能,请首先检查 groupingBy 收集器的不同下游收集器是否无法提供所需的结果。

为了完整起见,这里是一个尝试实现惰性分组的解决方案:

public interface Group<K,V> 
    K key();
    Stream<V> values();

public static <T,K,V> Stream<Group<K,V>> group(Stream<T> s,
    Function<? super T, ? extends K> key, Function<? super T, ? extends V> value) 

    return StreamSupport.stream(new Spliterator<Group<K,V>>() 
        final Spliterator<T> sp = s.spliterator();
        final Map<K,GroupImpl<T,K,V>> map = new HashMap<>();
        ArrayDeque<Group<K,V>> pendingGroup = new ArrayDeque<>();
        Consumer<T> c;
        
        c = t -> map.compute(key.apply(t), (k,g) -> 
            V v = value.apply(t);
            if(g == null) pendingGroup.addLast(g = new GroupImpl<>(k, v, sp, c));
            else g.add(v);
            return g;
        );
        
        public boolean tryAdvance(Consumer<? super Group<K,V>> action) 
            do  while(sp.tryAdvance(c) && pendingGroup.isEmpty());
            Group<K,V> g = pendingGroup.pollFirst();
            if(g == null) return false;
            action.accept(g);
            return true;
        
        public Spliterator<Group<K,V>> trySplit() 
            return null; // that surely doesn't work in parallel
        
        public long estimateSize() 
            return sp.estimateSize();
        
        public int characteristics() 
            return ORDERED|NONNULL;
        
    , false);

static class GroupImpl<T,K,V> implements Group<K,V> 
    private final K key;
    private final V first;
    private final Spliterator<T> source;
    private final Consumer<T> sourceConsumer;
    private List<V> values;

    GroupImpl(K k, V firstValue, Spliterator<T> s, Consumer<T> c) 
        key = k;
        first = firstValue;
        source = s;
        sourceConsumer = c;
    
    public K key() 
        return key;
    
    public Stream<V> values() 
        return StreamSupport.stream(
            new Spliterators.AbstractSpliterator<V>(1, Spliterator.ORDERED) 
            int pos;
            public boolean tryAdvance(Consumer<? super V> action) 
                if(pos == 0) 
                    pos++;
                    action.accept(first);
                    return true;
                
                do  while((values==null || values.size()<pos)
                           &&source.tryAdvance(sourceConsumer));
                if(values==null || values.size()<pos) return false;
                action.accept(values.get(pos++ -1));
                return true;
            
        , false);
    
    void add(V value) 
        if(values == null) values = new ArrayList<>();
        values.add(value);
    

您可以通过以下示例对其进行测试:

group(
    Stream.of("foo", "bar", "baz", "hello", "world", "a", "b", "c")
          .peek(s -> System.out.println("source traversal: "+s)),
        String::length,
        String::toUpperCase)
    .filter(h -> h.values().anyMatch(s -> s.startsWith("B")))
    .findFirst()
    .ifPresent(g -> System.out.println("group with key "+g.key()));

将打印:

source traversal: foo
source traversal: bar
group with key 3

尽可能地表明懒惰。但是

每个需要知道所有组/键的操作都需要完全遍历源,因为最后一个元素可能会引入一个新组 需要处理至少一个组的所有元素的每个操作都需要完全遍历,因为源的最后一个元素可能属于该组 如果不能及早停止,前一点甚至适用于短路操作。例如,在上面的示例中,在第二组中找到匹配项意味着对第一组的完全遍历不成功,因此对源的完全遍历

上面的例子可以改写为

Stream.of("foo", "bar", "baz", "hello", "world", "a", "b", "c")
      .peek(s -> System.out.println("source traversal: "+s))
      .filter(s -> s.toUpperCase().startsWith("H"))
      .map(String::length)
      .findFirst()
      .ifPresent(key -> System.out.println("group with key "+key));

这提供了更好的惰性(例如,如果匹配不在第一组内)。

当然,这个例子是人为的,但我有一种强烈的感觉,几乎任何具有惰性处理潜力的操作,即不需要所有组并且不需要至少一个组的所有元素,都可以重写进入一个根本不需要分组的操作。

【讨论】:

这个实现很漂亮(不过需要几个小时才能真正理解它)【参考方案2】:

以下是StreamEx 和我的图书馆AbacusUtil 为您的部分问题提供的解决方案

Map<String, Integer> namesAndScores 
             = N.asMap("David", 90, "Jane", 91, "Bill", 90, "Tina", 89);

// By StreamEx
Map<Integer, List<String>> namesGroupedByScore = EntryStream.of(namesAndScores)
                                .invert().grouping();

// By AbacusUtil
Map<Integer, List<String>> namesGroupedByScore = EntryStream.of(namesAndScores)
                                   .groupTo(Fn.value(), Fn.key());
// Or
Map<Integer, Stream<String>> namesGroupedByScore2 = 
        EntryStream.of(namesAndScores).toMap(Fn.value(), collectingAndThen(mapping(Fn.key()), Stream::of));

如果您只想保存分组后的名字:

Map<Integer, List<String>> namesAndScores3 = 
      EntryStream.of(namesAndScores).distinctByValue().groupTo(Fn.value(), Fn.key());
// Or
Map<Integer, String> namesAndScores4 = 
          EntryStream.of(namesAndScores).distinctByValue().toMap(Fn.value(), Fn.key());

如果你想最多保存 2 个值。

Map<Integer, List<String>> namesAndScores5 = EntryStream.of(namesAndScores).toMap(Fn.value(),
        MoreCollectors.mapping(Fn.key(), MoreCollectors.toList(2)));

对于剩下的问题,我相信Holger所说的:“......但我有一种强烈的感觉,几乎任何具有惰性处理潜力的操作,即不需要所有组,也不需要所有元素至少一组,可以重写为根本不需要分组的操作。”

在任何情况下,如果需要groupBy,我认为不迭代所有元素就不会存在这样的实现,无论您使用的是哪种语言。如果迭代所有元素是不必要的,很可能groupBy 是不必要或被滥用的。

【讨论】:

以上是关于Java Collectors.groupingBy 可以将 Stream 作为其分组项目列表返回吗?的主要内容,如果未能解决你的问题,请参考以下文章

Java 布尔运算

java [Java] Java常用代码#java

Java - 35 Java 实例

Java While 循环

Java 字符串

Java If ... Else