你能把一个流分成两个流吗?

Posted

技术标签:

【中文标题】你能把一个流分成两个流吗?【英文标题】:Can you split a stream into two streams? 【发布时间】:2013-11-25 06:13:55 【问题描述】:

我有一个由 Java 8 流表示的数据集:

Stream<T> stream = ...;

我可以看到如何过滤它以获取随机子集 - 例如

Random r = new Random();
PrimitiveIterator.OfInt coin = r.ints(0, 2).iterator();   
Stream<T> heads = stream.filter((x) -> (coin.nextInt() == 0));

我还可以看到如何减少此流,例如,获得代表数据集的两个随机部分的两个列表,然后将它们转换回流。 但是,有没有一种直接的方法可以从最初的流中生成两个流?类似的东西

(heads, tails) = stream.[some kind of split based on filter]

感谢您的任何见解。

【问题讨论】:

马克的回答比路易斯的回答更有帮助,但我必须说路易斯的回答与原始问题更相关。问题主要集中在将Stream 转换为多个Streams 没有中间转换的可能性,尽管我认为遇到这个问题的人实际上正在寻找实现这一目标的方法,而不管这种限制如何,这是马克的回答。这可能是由于标题中的问题与描述中的问题不同 【参考方案1】:

收集器可用于此。

对于两个类别,使用Collectors.partitioningBy() factory。

这将创建一个Map&lt;Boolean, List&gt;,并根据Predicate 将项目放入一个或另一个列表中。

注意:由于流需要被整个消费,这不能在无限流上工作。而且因为流无论如何都会被消耗,所以这个方法只是将它们放在列表中,而不是创建一个新的带内存的流。如果您需要流作为输出,您可以随时流式传输这些列表。

此外,不需要迭代器,即使在您提供的仅用于头部的示例中也是如此。

二进制拆分如下所示:
Random r = new Random();

Map<Boolean, List<String>> groups = stream
    .collect(Collectors.partitioningBy(x -> r.nextBoolean()));

System.out.println(groups.get(false).size());
System.out.println(groups.get(true).size());
如需更多类别,请使用Collectors.groupingBy() 工厂。
Map<Object, List<String>> groups = stream
    .collect(Collectors.groupingBy(x -> r.nextInt(3)));
System.out.println(groups.get(0).size());
System.out.println(groups.get(1).size());
System.out.println(groups.get(2).size());

如果流不是Stream,而是像IntStream 这样的原始流之一,则此.collect(Collectors) 方法不可用。您必须在没有收集器工厂的情况下手动进行。它的实现如下所示:

[2020-04-16 以来的示例 2.0]

    IntStream    intStream = IntStream.iterate(0, i -> i + 1).limit(100000).parallel();
    IntPredicate predicate = ignored -> r.nextBoolean();

    Map<Boolean, List<Integer>> groups = intStream.collect(
            () -> Map.of(false, new ArrayList<>(100000),
                         true , new ArrayList<>(100000)),
            (map, value) -> map.get(predicate.test(value)).add(value),
            (map1, map2) -> 
                map1.get(false).addAll(map2.get(false));
                map1.get(true ).addAll(map2.get(true ));
            );

在此示例中,我使用初始集合的完整大小初始化 ArrayLists(如果知道的话)。即使在最坏的情况下,这也可以防止调整大小事件,但可能会占用 2NT 空间(N = 初始元素数,T = 线程数)。为了以空间换取速度,您可以忽略它或使用您最有根据的猜测,例如一个分区中预期的最多元素数(通常刚好超过 N/2 以实现平衡拆分)。

我希望我不会因为使用 Java 9 方法而冒犯任何人。对于 Java 8 版本,请查看编辑历史记录。

【讨论】:

美丽。但是,在并行化流的情况下, IntStream 的最后一个解决方案不会是线程安全的。解决方案比您想象的要简单得多...stream.boxed().collect(...);!它会像宣传的那样:将原始 IntStream 转换为装箱的 Stream&lt;Integer&gt; 版本。 这应该是公认的答案,因为它直接解决了 OP 问题。 如果找到更好的答案,我希望 Stack Overflow 允许社区覆盖所选答案。 我不确定这是否能回答问题。该问题要求将流拆分为流 - 而不是列表。 累加器函数不必要地冗长。而不是(map, x) -&gt; boolean partition = p.test(x); List&lt;Integer&gt; list = map.get(partition); list.add(x); ,您可以简单地使用(map, x) -&gt; map.get(p.test(x)).add(x)。此外,我看不出collect 操作不应该是线程安全的任何理由。它的工作方式与预期的工作方式完全相同,并且与Collectors.partitioningBy(p) 的工作方式非常接近。但是当不使用boxed() 时,我会使用IntPredicate 而不是Predicate&lt;Integer&gt;,以避免装箱两次。【参考方案2】:

我自己偶然发现了这个问题,我觉得分叉流有一些可以证明是有效的用例。我作为消费者编写了下面的代码,因此它不会做任何事情,但您可以将其应用于函数以及您可能遇到的任何其他事情。

class PredicateSplitterConsumer<T> implements Consumer<T>

  private Predicate<T> predicate;
  private Consumer<T>  positiveConsumer;
  private Consumer<T>  negativeConsumer;

  public PredicateSplitterConsumer(Predicate<T> predicate, Consumer<T> positive, Consumer<T> negative)
  
    this.predicate = predicate;
    this.positiveConsumer = positive;
    this.negativeConsumer = negative;
  

  @Override
  public void accept(T t)
  
    if (predicate.test(t))
    
      positiveConsumer.accept(t);
    
    else
    
      negativeConsumer.accept(t);
    
  

现在你的代码实现可能是这样的:

personsArray.forEach(
        new PredicateSplitterConsumer<>(
            person -> person.getDateOfBirth().isPresent(),
            person -> System.out.println(person.getName()),
            person -> System.out.println(person.getName() + " does not have Date of birth")));

【讨论】:

【参考方案3】:

很遗憾,你的要求在JavaDoc of Stream中被直接否决:

应该对流进行操作(调用中间或终端 流操作)只有一次。例如,这排除了“分叉” 流,其中相同的源为两个或多个管道提供数据,或者 多次遍历同一流。

如果您确实需要这种行为,您可以使用peek 或其他方法解决此问题。在这种情况下,您应该做的不是尝试使用分叉过滤器从同一原始 Stream 源返回两个流,而是复制您的流并适当地过滤每个重复项。

但是,您可能希望重新考虑 Stream 是否适合您的用例。

【讨论】:

javadoc 的措辞不排除分割成多个流,只要单个流项只进入其中的一个 @ThorbjørnRavnAndersen 我不确定复制流项目是分叉流的主要障碍。主要问题是分叉操作本质上是一个终端操作,所以当你决定分叉时,你基本上是在创建某种集合。例如。我可以编写一个方法List&lt;Stream&gt; forkStream(Stream s),但我的结果流将至少部分由集合支持,而不是直接由底层流支持,而不是filter,它不是终端流操作。 这是我觉得 Java 流与github.com/ReactiveX/RxJava/wiki 相比有点半途而废的原因之一,因为流的重点是对可能无限的元素集和现实世界的操作频繁地应用操作需要拆分、复制和合并流。【参考方案4】:

不完全是。你不能从一个中得到两个Streams;这没有任何意义——你将如何迭代一个而不需要同时生成另一个?一个流只能被操作一次。

但是,如果您想将它们转储到列表或其他内容中,您可以这样做

stream.forEach((x) -> ((x == 0) ? heads : tails).add(x));

【讨论】:

为什么没有意义?由于流是管道,它没有理由不能创建原始流的两个生产者,我可以看到这是由提供两个流的收集器处理的。 不是线程安全的。尝试直接添加到集合的坏建议,这就是为什么我们有 stream.collect(...) 用于预定义线程安全的 Collectors,即使在非线程安全的集合上也能很好地工作(没有同步锁争用)。 @MarkJeronimus 的最佳答案。 @JoD 如果 head 和 tails 是线程安全的,则它是线程安全的。此外,假设使用非并行流,仅不能保证顺序,因此它们是线程安全的。由程序员来解决并发问题,所以如果集合是线程安全的,这个答案是非常合适的。 @Nixon 它不适合我们这里有更好的解决方案。拥有这样的代码可能会导致不好的先例,导致其他人以错误的方式使用它。即使不使用并行流,也只有一步之遥。良好的编码实践要求我们不要在流操作期间维护状态。接下来我们要做的是在 Apache spark 之类的框架中编码,同样的做法确实会导致意想不到的结果。这是一个创造性的解决方案,我认为这是我不久前自己写的。 @JoD 这不是一个更好的解决方案,实际上效率更低。这种思路最终得出的结论是,所有集合默认情况下都应该是线程安全的,以防止意外后果,这是完全错误的.【参考方案5】:

这违背了 Stream 的一般机制。假设您可以根据需要将 Stream S0 拆分为 Sa 和 Sb。在 Sa 上执行任何终端操作,比如count(),必然会“消耗”S0 中的所有元素。因此 Sb 丢失了它的数据源。

我认为,以前 Stream 有一个 tee() 方法,它将一个流复制到两个。现已删除。

不过,Stream 有一个 peek() 方法,您也许可以使用它来满足您的要求。

【讨论】:

peek 正是以前的 tee 使用 Java 12 Collectors 获得了一个新方法 teeing(),但是,它有点难以管理。一个例子是here。【参考方案6】:

不完全是,但您可以通过调用Collectors.groupingBy() 来完成您需要的事情。您创建一个新集合,然后可以在该新集合上实例化流。

【讨论】:

【参考方案7】:

可以得到两个Stream一个 从 Java 12 开始,teeing 在 100 次掷硬币中计算正面和反面

Random r = new Random();
PrimitiveIterator.OfInt coin = r.ints(0, 2).iterator();
List<Long> list = Stream.iterate(0, i -> coin.nextInt())
    .limit(100).collect(teeing(
        filtering(i -> i == 1, counting()),
        filtering(i -> i == 0, counting()),
        (heads, tails) -> 
          return(List.of(heads, tails));
        ));
System.err.println("heads:" + list.get(0) + " tails:" + list.get(1));

获取例如:heads:51 tails:49

【讨论】:

docs.oracle.com/en/java/javase/12/docs/api/java.base/java/util/…【参考方案8】:

这是我能想到的最不坏的答案。

import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;

public class Test 

    public static <T, L, R> Pair<L, R> splitStream(Stream<T> inputStream, Predicate<T> predicate,
            Function<Stream<T>, L> trueStreamProcessor, Function<Stream<T>, R> falseStreamProcessor) 

        Map<Boolean, List<T>> partitioned = inputStream.collect(Collectors.partitioningBy(predicate));
        L trueResult = trueStreamProcessor.apply(partitioned.get(Boolean.TRUE).stream());
        R falseResult = falseStreamProcessor.apply(partitioned.get(Boolean.FALSE).stream());

        return new ImmutablePair<L, R>(trueResult, falseResult);
    

    public static void main(String[] args) 

        Stream<Integer> stream = Stream.iterate(0, n -> n + 1).limit(10);

        Pair<List<Integer>, String> results = splitStream(stream,
                n -> n > 5,
                s -> s.filter(n -> n % 2 == 0).collect(Collectors.toList()),
                s -> s.map(n -> n.toString()).collect(Collectors.joining("|")));

        System.out.println(results);
    


这需要一个整数流并将它们拆分为 5。对于大于 5 的整数,它只过滤偶数并将它们放入一个列表中。对于其余部分,它将它们与 | 连接起来。

输出:

 ([6, 8],0|1|2|3|4|5)

这并不理想,因为它将所有内容收集到中断流的中间集合中(并且有太多参数!)

【讨论】:

【参考方案9】:

我在寻找一种从流中过滤某些元素并将它们记录为错误的方法时偶然发现了这个问题。因此,我并不真正需要拆分流,而是将过早的终止操作附加到具有不显眼的语法的谓词上。这是我想出的:

public class MyProcess 
    /* Return a Predicate that performs a bail-out action on non-matching items. */
    private static <T> Predicate<T> withAltAction(Predicate<T> pred, Consumer<T> altAction) 
    return x -> 
        if (pred.test(x)) 
            return true;
        
        altAction.accept(x);
        return false;
    ;

    /* Example usage in non-trivial pipeline */
    public void processItems(Stream<Item> stream) 
        stream.filter(Objects::nonNull)
              .peek(this::logItem)
              .map(Item::getSubItems)
              .filter(withAltAction(SubItem::isValid,
                                    i -> logError(i, "Invalid")))
              .peek(this::logSubItem)
              .filter(withAltAction(i -> i.size() > 10,
                                    i -> logError(i, "Too large")))
              .map(SubItem::toDisplayItem)
              .forEach(this::display);
    

【讨论】:

【参考方案10】:

使用 Lombok 的较短版本

import java.util.function.Consumer;
import java.util.function.Predicate;

import lombok.RequiredArgsConstructor;

/**
 * Forks a Stream using a Predicate into postive and negative outcomes.
 */
@RequiredArgsConstructor
@FieldDefaults(makeFinal = true, level = AccessLevel.PROTECTED)
public class StreamForkerUtil<T> implements Consumer<T> 
    Predicate<T> predicate;
    Consumer<T> positiveConsumer;
    Consumer<T> negativeConsumer;

    @Override
    public void accept(T t) 
        (predicate.test(t) ? positiveConsumer : negativeConsumer).accept(t);
    

【讨论】:

【参考方案11】:

怎么样:

Supplier<Stream<Integer>> randomIntsStreamSupplier =
    () -> (new Random()).ints(0, 2).boxed();

Stream<Integer> tails =
    randomIntsStreamSupplier.get().filter(x->x.equals(0));
Stream<Integer> heads =
    randomIntsStreamSupplier.get().filter(x->x.equals(1));

【讨论】:

由于供应商被调用两次,你会得到两个不同的随机集合。我认为 OP 的想法是在 same 生成的序列中将几率与偶数分开

以上是关于你能把一个流分成两个流吗?的主要内容,如果未能解决你的问题,请参考以下文章

Kafka 流可以有效地处理加入流吗?

我应该尽可能使用并行流吗?

处理流阅读器会关闭流吗?

QueueStream 可以用于结构化流吗?

GNU Readline 可以处理多个流吗?

AudioGraph 可以播放流吗?