我可以在Java 8中复制Stream吗?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了我可以在Java 8中复制Stream吗?相关的知识,希望对你有一定的参考价值。
有时我想在流上执行一组操作,然后以不同的方式处理结果流与其他操作。
我是否可以这样做而无需指定两次常见的初始操作?
例如,我希望存在如下的dup()
方法:
Stream [] desired_streams = IntStream.range(1, 100).filter(n -> n % 2 == 0).dup();
Stream stream14 = desired_streams[0].filter(n -> n % 7 == 0); // multiples of 14
Stream stream10 = desired_streams[1].filter(n -> n % 5 == 0); // multiples of 10
一般来说这是不可能的。
如果要复制输入流或输入迭代器,则有两个选项:
A. Keep everything in a collection, say a List<>
假设您将流复制到两个流s1
和s2
。如果你在n1
和s1
元素中使用n2
有高级s2
元素,你必须在记忆中保留|n2 - n1|
元素,以保持步伐。如果您的流是无限的,则可能没有所需存储的上限。
看看Python的tee()
,看看它需要什么:
这个itertool可能需要大量的辅助存储(取决于需要存储多少临时数据)。通常,如果一个迭代器在另一个迭代器启动之前使用大部分或全部数据,则使用
list()
而不是tee()
会更快。
B. When possible: Copy the state of the generator that creates the elements
要使此选项起作用,您可能需要访问流的内部工作方式。换句话说,生成器 - 创建元素的部分 - 应该首先支持复制。 [OP:请参阅此great answer,作为如何在问题中为示例执行此操作的示例]
它不适用于用户的输入,因为您必须复制整个“外部世界”的状态。 Java的Stream
不支持复制,因为它设计得尽可能通用,专门用于处理文件,网络,键盘,传感器,随机性等。[OP:另一个例子是按需读取温度传感器的流。不存储读数副本就不能复制]
这不仅仅是Java中的情况;这是一般规则。您可以看到C ++中的std::istream
仅支持移动语义,而不支持复制语义(“复制构造函数(已删除)”),因此(以及其他)。
无法以这种方式复制流。但是,您可以通过将公共部分移动到方法或lambda表达式来避免代码重复。
Supplier<IntStream> supplier = () ->
IntStream.range(1, 100).filter(n -> n % 2 == 0);
supplier.get().filter(...);
supplier.get().filter(...);
如果你正在缓冲你在一个副本中消耗的元素,但在另一个副本中却没有。
我们在duplicate()
中实现了一个jOOλ方法,这是一个开源库,我们创建它来改进jOOQ的集成测试。基本上,你可以写:
Tuple2<Seq<Integer>, Seq<Integer>> desired_streams = Seq.seq(
IntStream.range(1, 100).filter(n -> n % 2 == 0).boxed()
).duplicate();
(注意:我们目前需要打包流,因为我们还没有实现IntSeq
)
在内部,有一个LinkedList
缓冲区,存储从一个流消耗但不从另一个流消耗的所有值。如果您的两个流以相同的速率消耗,那么这可能是有效的。
以下是算法的工作原理:
static <T> Tuple2<Seq<T>, Seq<T>> duplicate(Stream<T> stream) {
final LinkedList<T> gap = new LinkedList<>();
final Iterator<T> it = stream.iterator();
@SuppressWarnings("unchecked")
final Iterator<T>[] ahead = new Iterator[] { null };
class Duplicate implements Iterator<T> {
@Override
public boolean hasNext() {
if (ahead[0] == null || ahead[0] == this)
return it.hasNext();
return !gap.isEmpty();
}
@Override
public T next() {
if (ahead[0] == null)
ahead[0] = this;
if (ahead[0] == this) {
T value = it.next();
gap.offer(value);
return value;
}
return gap.poll();
}
}
return tuple(seq(new Duplicate()), seq(new Duplicate()));
}
事实上,使用jOOλ,你将能够像这样写一个完整的单行:
Tuple2<Seq<Integer>, Seq<Integer>> desired_streams = Seq.seq(
IntStream.range(1, 100).filter(n -> n % 2 == 0).boxed()
).duplicate()
.map1(s -> s.filter(n -> n % 7 == 0))
.map2(s -> s.filter(n -> n % 5 == 0));
// This will yield 14, 28, 42, 56...
desired_streams.v1.forEach(System.out::println)
// This will yield 10, 20, 30, 40...
desired_streams.v2.forEach(System.out::println);
您还可以将流生成移动到单独的方法/函数中,该方法/函数返回此流并将其调用两次。
要么,
- 将初始化移动到方法中,然后再次调用该方法
这样做的好处是可以明确地了解您正在做什么,并且还适用于无限流。
- 收集流然后重新流
在你的例子中:
final int[] arr = IntStream.range(1, 100).filter(n -> n % 2 == 0).toArray();
然后
final IntStream s = IntStream.of(arr);
更新:这不起作用。在原始答案的文本之后,请参阅下面的说明。
我有多傻。我需要做的就是:
Stream desired_stream = IntStream.range(1, 100).filter(n -> n % 2 == 0);
Stream stream14 = desired_stream.filter(n -> n % 7 == 0); // multiples of 14
Stream stream10 = desired_stream.filter(n -> n % 5 == 0); // multiples of 10
解释为什么这不起作用:
如果你编码并尝试收集两个流,第一个将收集罚款,但尝试流第二个将抛出异常:java.lang.IllegalStateException: stream has already been operated upon or closed
。
详细说明,流是有状态对象(顺便说一句,它不能被重置或重绕)。您可以将它们视为迭代器,而迭代器又像指针一样。所以stream14
和stream10
可以被认为是对同一指针的引用。一直使用第一个流将导致指针“越过末尾”。尝试使用第二个流就像尝试访问已经“越过末尾”的指针,这自然是非法操作。
如接受的答案所示,创建流的代码必须执行两次,但可以将其划分为Supplier
lambda或类似的构造。
完整的测试代码:保存到Foo.java
,然后javac Foo.java
,然后java Foo
import java.util.stream.IntStream;
public class Foo {
public static void main (String [] args) {
IntStream s = IntStream.range(0, 100).filter(n -> n % 2 == 0);
IntStream s1 = s.filter(n -> n % 5 == 0);
s1.forEach(n -> System.out.println(n));
IntStream s2 = s.filter(n -> n % 7 == 0);
s2.forEach(n -> System.out.println(n));
}
}
输出:
$ javac Foo.java
$ java Foo
0
10
20
30
40
50
60
70
80
90
Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed
at java.util.stream.AbstractPipeline.<init>(AbstractPipeline.java:203)
at java.util.stream.IntPipeline.<init>(IntPipeline.java:91)
at java.util.stream.IntPipeline$StatelessOp.<init>(IntPipeline.java:592)
at java.util.stream.IntPipeline$9.<init>(IntPipeline.java:332)
at java.util.stream.IntPipeline.filter(IntPipeline.java:331)
at Foo.main(Foo.java:8)
对于非无限流,如果您有权访问源,则直接:
@Test
public void testName() throws Exception {
List<Integer> integers = Arrays.asList(1, 2, 4, 5, 6, 7, 8, 9, 10);
Stream<Integer> stream1 = integers.stream();
Stream<Integer> stream2 = integers.stream();
stream1.forEach(System.out::println);
stream2.forEach(System.out::println);
}
版画
1 2 4 5 6 7 8 9 10
1 2 4 5 6 7 8 9 10
对于你的情况:
Stream originalStream = IntStream.range(1, 100).filter(n -> n % 2 == 0)
List<Integer> listOf = originalStream.collect(Collectors.toList())
Stream stream14 = listOf.stream().filter(n -> n % 7 == 0);
Stream stream10 = listOf.stream().filter(n -> n % 5 == 0);
对于表演等,请阅读别人的答案;)
以上是关于我可以在Java 8中复制Stream吗?的主要内容,如果未能解决你的问题,请参考以下文章