通过谓词限制流

Posted

技术标签:

【中文标题】通过谓词限制流【英文标题】:Limit a stream by a predicate 【发布时间】:2014-01-11 20:13:16 【问题描述】:

是否存在限制(可能无限)Stream 直到第一个元素无法匹配谓词的 Java 8 流操作?

在 Java 9 中,我们可以使用takeWhile 来打印所有小于 10 的数字。

IntStream
    .iterate(1, n -> n + 1)
    .takeWhile(n -> n < 10)
    .forEach(System.out::println);

由于 Java 8 中没有这样的操作,以一般方式实现它的最佳方法是什么?

【问题讨论】:

可能有用的信息:***.com/q/19803058/248082 相关:Equivalent of Scala dropWhile 我想知道架构师如何在不遇到这个用例的情况下完成“我们实际上可以 使用 做什么”。从 Java 8 开始,流实际上只对现有数据结构有帮助:-/ 另见How to short-circuit a reduce() operation on a Stream? 使用Java 9,写IntStream.iterate(1, n-&gt;n&lt;10, n-&gt;n+1).forEach(System.out::print);会更容易 【参考方案1】:

操作 takeWhiledropWhile 已添加到 JDK 9。您的示例代码

IntStream
    .iterate(1, n -> n + 1)
    .takeWhile(n -> n < 10)
    .forEach(System.out::println);

在 JDK 9 下编译和运行时,其行为将完全符合您的预期。

JDK 9 已经发布。可在此处下载:JDK 9 Releases。

【讨论】:

直接链接到 JDK9 Stream 的预览文档,takeWhile/dropWhile:download.java.net/jdk9/docs/api/java/util/stream/Stream.html @LukasEder takeWhiledropWhile 非常普遍,出现在 Scala、Python、Groovy、Ruby、Haskell 和 Clojure 中。 skiplimit 的不对称是不幸的。也许 skiplimit 应该被称为 droptake,但除非你已经熟悉 Haskell,否则它们并不那么直观。 @StuartMarks:我知道dropXXXtakeXXX 是更流行的术语,但我个人可以接受更多SQL 风格的limitXXXskipXXX。我发现这种新的不对称比单独选择术语更令人困惑...... :)(顺便说一句:Scala 也有 drop(int)take(int) 是的,让我在生产环境中升级到 Jdk 9。许多开发人员仍在使用 Jdk8,这样的功能应该从一开始就包含在 Streams 中。 IntStream .iterate(1, n -&gt; n + 1) .takeWhile(n -&gt; n &lt; 10)可以简化为IntStream .iterate(1, n -&gt; n &lt; 10, n -&gt; n + 1)【参考方案2】:

这样的操作应该可能使用 Java 8 Stream,但不一定能有效地完成 - 例如,您不一定可以并行化这样的操作,如您必须按顺序查看元素。

API 没有提供简单的方法来实现它,但最简单的方法可能是采用 Stream.iterator(),包装 Iterator 以进行“暂时”实现,然后返回Spliterator,然后是 Stream。或者——也许——包装Spliterator,尽管在这个实现中它不能再被分割了。

这是takeWhileSpliterator 上的未经测试的实现:

static <T> Spliterator<T> takeWhile(
    Spliterator<T> splitr, Predicate<? super T> predicate) 
  return new Spliterators.AbstractSpliterator<T>(splitr.estimateSize(), 0) 
    boolean stillGoing = true;
    @Override public boolean tryAdvance(Consumer<? super T> consumer) 
      if (stillGoing) 
        boolean hadNext = splitr.tryAdvance(elem -> 
          if (predicate.test(elem)) 
            consumer.accept(elem);
           else 
            stillGoing = false;
          
        );
        return hadNext && stillGoing;
      
      return false;
    
  ;


static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> predicate) 
   return StreamSupport.stream(takeWhile(stream.spliterator(), predicate), false);

【讨论】:

理论上,使用无状态谓词并行化 takeWhile 很容易。以并行批次评估条件(假设谓词在执行几次后不会抛出或产生副作用)。问题是在 Streams 使用的递归分解(fork/join 框架)的上下文中执行此操作。确实,流效率非常低。 如果 Streams 不那么专注于自动并行性,它们会好很多。只有一小部分可以使用 Streams 的地方需要并行性。此外,如果 Oracle 如此关心性能,他们可以让 JVM JIT 自动向量化,并获得更大的性能提升,而不会打扰开发人员。现在这是正确的自动并行处理。 Java 9 发布后,您应该更新此答案。 不,@Radiodef。该问题专门要求 Java 8 解决方案。【参考方案3】:

allMatch()是一个短路函数,所以你可以用它来停止处理。主要缺点是您必须进行两次测试:一次是看是否应该处理它,另一次是看是否继续进行。

IntStream
    .iterate(1, n -> n + 1)
    .peek(n->if (n<10) System.out.println(n);)
    .allMatch(n->n < 10);

【讨论】:

起初这对我来说似乎不直观(给定方法名称),但docs confirm Stream.allMatch() 是 short-circuiting operation。所以即使在像IntStream.iterate() 这样的无限流上也能完成。当然,回想起来,这是一个明智的优化。 这很简洁,但我认为它不能很好地传达它的意图是peek 的主体。如果我下个月遇到它,我会花一分钟的时间想知道为什么我之前的程序员检查了allMatch然后忽略了答案。 此解决方案的缺点是它返回一个布尔值,因此您无法像往常一样收集流的结果。【参考方案4】:

作为@StuartMarks answer 的后续行动。我的 StreamEx 库具有与当前 JDK-9 实现兼容的 takeWhile 操作。在 JDK-9 下运行时,它只会委托给 JDK 实现(通过 MethodHandle.invokeExact,这真的很快)。在 JDK-8 下运行时,将使用“polyfill”实现。所以使用我的库可以像这样解决问题:

IntStreamEx.iterate(1, n -> n + 1)
           .takeWhile(n -> n < 10)
           .forEach(System.out::println);

【讨论】:

为什么没有为 StreamEx 类实现它? @Someguy 我确实实现了它。【参考方案5】:

takeWhile是protonpack library提供的功能之一。

Stream<Integer> infiniteInts = Stream.iterate(0, i -> i + 1);
Stream<Integer> finiteInts = StreamUtils.takeWhile(infiniteInts, i -> i < 10);

assertThat(finiteInts.collect(Collectors.toList()),
           hasSize(10));

【讨论】:

【参考方案6】:

更新:Java 9 Stream 现在带有 takeWhile 方法。

无需破解或其他解决方案。就用那个吧!


我相信这可以大大改善: (也许有人可以使它成为线程安全的)

Stream<Integer> stream = Stream.iterate(0, n -> n + 1);

TakeWhile.stream(stream, n -> n < 10000)
         .forEach(n -> System.out.print((n == 0 ? "" + n : "," + n)));

肯定是 hack... 不优雅 - 但它有效 ~:D

class TakeWhile<T> implements Iterator<T> 

    private final Iterator<T> iterator;
    private final Predicate<T> predicate;
    private volatile T next;
    private volatile boolean keepGoing = true;

    public TakeWhile(Stream<T> s, Predicate<T> p) 
        this.iterator = s.iterator();
        this.predicate = p;
    

    @Override
    public boolean hasNext() 
        if (!keepGoing) 
            return false;
        
        if (next != null) 
            return true;
        
        if (iterator.hasNext()) 
            next = iterator.next();
            keepGoing = predicate.test(next);
            if (!keepGoing) 
                next = null;
            
        
        return next != null;
    

    @Override
    public T next() 
        if (next == null) 
            if (!hasNext()) 
                throw new NoSuchElementException("Sorry. Nothing for you.");
            
        
        T temp = next;
        next = null;
        return temp;
    

    public static <T> Stream<T> stream(Stream<T> s, Predicate<T> p) 
        TakeWhile tw = new TakeWhile(s, p);
        Spliterator split = Spliterators.spliterator(tw, Integer.MAX_VALUE, Spliterator.ORDERED);
        return StreamSupport.stream(split, false);
    


【讨论】:

【参考方案7】:

你可以使用java8 + rxjava。

import java.util.stream.IntStream;
import rx.Observable;


// Example 1)
IntStream intStream  = IntStream.iterate(1, n -> n + 1);
Observable.from(() -> intStream.iterator())
    .takeWhile(n ->
          
                System.out.println(n);
                return n < 10;
          
    ).subscribe() ;


// Example 2
IntStream intStream  = IntStream.iterate(1, n -> n + 1);
Observable.from(() -> intStream.iterator())
    .takeWhile(n -> n < 10)
    .forEach( n -> System.out.println(n));

【讨论】:

【参考方案8】:

实际上,在 Java 8 中有两种方法可以做到这一点,无需任何额外的库或使用 Java 9。

如果您想在控制台上打印 2 到 20 的数字,您可以这样做:

IntStream.iterate(2, (i) -> i + 2).peek(System.out::println).allMatch(i -> i < 20);

IntStream.iterate(2, (i) -> i + 2).peek(System.out::println).anyMatch(i -> i >= 20);

两种情况下的输出:

2
4
6
8
10
12
14
16
18
20

还没有人提到anyMatch。这就是这篇文章的原因。

【讨论】:

【参考方案9】:

这是从 JDK 9 java.util.stream.Stream.takeWhile(Predicate) 复制的源代码。为了与 JDK 8 一起工作,有一点不同。

static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> p) 
    class Taking extends Spliterators.AbstractSpliterator<T> implements Consumer<T> 
        private static final int CANCEL_CHECK_COUNT = 63;
        private final Spliterator<T> s;
        private int count;
        private T t;
        private final AtomicBoolean cancel = new AtomicBoolean();
        private boolean takeOrDrop = true;

        Taking(Spliterator<T> s) 
            super(s.estimateSize(), s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED));
            this.s = s;
        

        @Override
        public boolean tryAdvance(Consumer<? super T> action) 
            boolean test = true;
            if (takeOrDrop &&               // If can take
                    (count != 0 || !cancel.get()) && // and if not cancelled
                    s.tryAdvance(this) &&   // and if advanced one element
                    (test = p.test(t)))    // and test on element passes
                action.accept(t);           // then accept element
                return true;
             else 
                // Taking is finished
                takeOrDrop = false;
                // Cancel all further traversal and splitting operations
                // only if test of element failed (short-circuited)
                if (!test)
                    cancel.set(true);
                return false;
            
        

        @Override
        public Comparator<? super T> getComparator() 
            return s.getComparator();
        

        @Override
        public void accept(T t) 
            count = (count + 1) & CANCEL_CHECK_COUNT;
            this.t = t;
        

        @Override
        public Spliterator<T> trySplit() 
            return null;
        
    
    return StreamSupport.stream(new Taking(stream.spliterator()), stream.isParallel()).onClose(stream::close);

【讨论】:

【参考方案10】:

这是一个在整数上完成的版本 - 正如问题中所问的那样。

用法:

StreamUtil.takeWhile(IntStream.iterate(1, n -> n + 1), n -> n < 10);

这是 StreamUtil 的代码:

import java.util.PrimitiveIterator;
import java.util.Spliterators;
import java.util.function.IntConsumer;
import java.util.function.IntPredicate;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;

public class StreamUtil

    public static IntStream takeWhile(IntStream stream, IntPredicate predicate)
    
        return StreamSupport.intStream(new PredicateIntSpliterator(stream, predicate), false);
    

    private static class PredicateIntSpliterator extends Spliterators.AbstractIntSpliterator
    
        private final PrimitiveIterator.OfInt iterator;
        private final IntPredicate predicate;

        public PredicateIntSpliterator(IntStream stream, IntPredicate predicate)
        
            super(Long.MAX_VALUE, IMMUTABLE);
            this.iterator = stream.iterator();
            this.predicate = predicate;
        

        @Override
        public boolean tryAdvance(IntConsumer action)
        
            if (iterator.hasNext()) 
                int value = iterator.nextInt();
                if (predicate.test(value)) 
                    action.accept(value);
                    return true;
                
            

            return false;
        
    

【讨论】:

【参考方案11】:

去获取图书馆AbacusUtil。它提供了您想要的确切 API 以及更多内容:

IntStream.iterate(1, n -> n + 1).takeWhile(n -> n < 10).forEach(System.out::println);

声明:我是AbacusUtil的开发者。

【讨论】:

【参考方案12】:

如果您知道将要执行的具体重复次数,您可以这样做

IntStream
          .iterate(1, n -> n + 1)
          .limit(10)
          .forEach(System.out::println);

【讨论】:

虽然这可能会回答作者的问题,但它缺少一些解释性文字和文档链接。如果没有围绕它的一些短语,原始代码 sn-ps 并不是很有帮助。您可能还会发现how to write a good answer 非常有帮助。请编辑您的答案。【参考方案13】:
    IntStream.iterate(1, n -> n + 1)
    .peek(System.out::println) //it will be executed 9 times
    .filter(n->n>=9)
    .findAny();

您可以使用 mapToObj 代替峰值返回最终对象或消息

    IntStream.iterate(1, n -> n + 1)
    .mapToObj(n->   //it will be executed 9 times
            if(n<9)
                return "";
            return "Loop repeats " + n + " times";);
    .filter(message->!message.isEmpty())
    .findAny()
    .ifPresent(System.out::println);

【讨论】:

这应该是公认的答案,如果它一直有效的话【参考方案14】:

您不能中止流,除非通过短路终端操作,这会使某些流值未处理,而不管它们的值如何。但是,如果您只想避免对流进行操作,则可以向流中添加转换和过滤器:

import java.util.Objects;

class ThingProcessor

    static Thing returnNullOnCondition(Thing thing)
        return( (*** is condition met ***)? null : thing);    

    void processThings(Collection<Thing> thingsCollection)
    
        thingsCollection.stream()
        *** regular stream processing ***
        .map(ThingProcessor::returnNullOnCondition)
        .filter(Objects::nonNull)
        *** continue stream processing ***
    
 // class ThingProcessor

当事物满足某些条件时将事物流转换为空值,然后过滤掉空值。如果您愿意沉迷于副作用,您可以在遇到某些事情时将条件值设置为 true,这样所有后续的事情都会被过滤掉,而不管它们的值如何。但即使不是,您也可以通过从流中过滤掉您不想处理的值来节省大量(如果不是全部)处理。

【讨论】:

有些匿名评分者在没有说明原因的情况下低估了我的回答,这很糟糕。所以我和任何其他读者都不知道我的回答有什么问题。在没有他们的理由的情况下,我会认为他们的批评无效,并且我发布的答案是正确的。 您的回答并没有解决 OPs 问题,即处理无限流。这似乎也使事情变得不必要地复杂化,因为您可以在 filter() 调用本身中编写条件,而无需 map()。该问题已经有一个示例代码,只需尝试将您的答案应用于该代码,您将看到程序将永远循环。【参考方案15】:

即使我也有类似的要求——调用网络服务,如果失败,重试 3 次。如果经过多次试验仍失败,请发送电子邮件通知。在谷歌上搜索了很多之后,anyMatch() 成为了救星。我的示例代码如下。在下面的示例中,如果 webServiceCall 方法在第一次迭代本身中返回 true,则流不会进一步迭代,因为我们调用了 anyMatch()。我相信,这就是你要找的。​​p>

import java.util.stream.IntStream;

import io.netty.util.internal.ThreadLocalRandom;

class TrialStreamMatch 

public static void main(String[] args)         
    if(!IntStream.range(1,3).anyMatch(integ -> webServiceCall(integ)))
         //Code for sending email notifications
    


public static boolean webServiceCall(int i)
    //For time being, I have written a code for generating boolean randomly
    //This whole piece needs to be replaced by actual web-service client code
    boolean bool = ThreadLocalRandom.current().nextBoolean();
    System.out.println("Iteration index :: "+i+" bool :: "+bool);

    //Return success status -- true or false
    return bool;

【讨论】:

【参考方案16】:

如果您有不同的问题,可能需要不同的解决方案,但对于您当前的问题,我会简单地选择:

IntStream
    .iterate(1, n -> n + 1)
    .limit(10)
    .forEach(System.out::println);

【讨论】:

【参考方案17】:

可能有点离题,但这是我们为List&lt;T&gt; 而不是Stream&lt;T&gt; 所拥有的。

首先你需要有一个take util 方法。此方法首先采用n 元素:

static <T> List<T> take(List<T> l, int n) 
    if (n <= 0) 
        return newArrayList();
     else 
        int takeTo = Math.min(Math.max(n, 0), l.size());
        return l.subList(0, takeTo);
    

它就像scala.List.take一样工作

    assertEquals(newArrayList(1, 2, 3), take(newArrayList(1, 2, 3, 4, 5), 3));
    assertEquals(newArrayList(1, 2, 3), take(newArrayList(1, 2, 3), 5));

    assertEquals(newArrayList(), take(newArrayList(1, 2, 3), -1));
    assertEquals(newArrayList(), take(newArrayList(1, 2, 3), 0));

现在基于take编写takeWhile方法将相当简单

static <T> List<T> takeWhile(List<T> l, Predicate<T> p) 
    return l.stream().
            filter(p.negate()).findFirst(). // find first element when p is false
            map(l::indexOf).        // find the index of that element
            map(i -> take(l, i)).   // take up to the index
            orElse(l);  // return full list if p is true for all elements

它是这样工作的:

    assertEquals(newArrayList(1, 2, 3), takeWhile(newArrayList(1, 2, 3, 4, 3, 2, 1), i -> i < 4));

此实现会部分迭代列表几次,但不会添加 add O(n^2) 操作。希望这是可以接受的。

【讨论】:

【参考方案18】:

我有另一个快速的解决方案,通过实现这个(这实际上是不干净的,但你明白了):

public static void main(String[] args) 
    System.out.println(StreamUtil.iterate(1, o -> o + 1).terminateOn(15)
            .map(o -> o.toString()).collect(Collectors.joining(", ")));


static interface TerminatedStream<T> 
    Stream<T> terminateOn(T e);


static class StreamUtil 
    static <T> TerminatedStream<T> iterate(T seed, UnaryOperator<T> op) 
        return new TerminatedStream<T>() 
            public Stream<T> terminateOn(T e) 
                Builder<T> builder = Stream.<T> builder().add(seed);
                T current = seed;
                while (!current.equals(e)) 
                    current = op.apply(current);
                    builder.add(current);
                
                return builder.build();
            
        ;
    

【讨论】:

您正在提前评估整个流!如果current 从来没有.equals(e),你会得到一个无限循环。即使您随后申请两者,例如.limit(1)。这比'unclean'要糟糕得多。【参考方案19】:

这是我仅使用 Java Stream 库的尝试。

        IntStream.iterate(0, i -> i + 1)
        .filter(n -> 
                if (n < 10) 
                    System.out.println(n);
                    return false;
                 else 
                    return true;
                
            )
        .findAny();

【讨论】:

The filter predicate is supposed to be stateless. System.out.println 是副作用。

以上是关于通过谓词限制流的主要内容,如果未能解决你的问题,请参考以下文章

不支持没有相等谓词的流流连接

流分析:源“子查询”只能用于使用“datediff”函数的时间谓词

为啥 HKSample 数组对于一个没有限制、没有谓词、没有锚点的 HKAnchoredObjectQuery 总是有 1 个值?

为啥 HKSample 数组对于一个没有限制、没有谓词、没有锚点的 HKAnchoredObjectQuery 总是有 1 个值?

谓词下推

代码混淆之道——控制流扁平与不透明谓词理论篇