使用 JDK8 和 lambda (java.util.stream.Streams.zip) 压缩流
Posted
技术标签:
【中文标题】使用 JDK8 和 lambda (java.util.stream.Streams.zip) 压缩流【英文标题】:Zipping streams using JDK8 with lambda (java.util.stream.Streams.zip) 【发布时间】:2013-07-12 12:30:53 【问题描述】:在带有 lambda b93 的 JDK 8 中,有一个类 java.util.stream.Streams.zip in b93 可用于压缩流(这在教程 Exploring Java8 Lambdas. Part 1 by Dhananjay Nene 中有说明)。这个函数:
创建一个惰性和顺序组合的 Stream,其元素是 组合两个流的元素的结果。
但是在 b98 中这已经消失了。事实上,Streams
类甚至无法在 java.util.stream in b98 中访问。
是否已移动此功能,如果是,我如何使用 b98 简洁地压缩流?
我想到的应用程序是in this java implementation of Shen,我在其中替换了 zip 功能
static <T> boolean every(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred)
static <T> T find(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred)
具有相当冗长代码的函数(不使用 b98 中的功能)。
【问题讨论】:
啊刚发现好像被彻底删除了:mail.openjdk.java.net/pipermail/lambda-libs-spec-observers/… “探索 Java8 Lambda。第 1 部分” - 本文的新链接是 blog.dhananjaynene.com/2013/02/exploring-java8-lambdas-part-1 感谢@AlekseiEgorov,现在也修复了帖子中的链接 【参考方案1】:这对你有用吗?这是一个简短的函数,它懒惰地评估它正在压缩的流,因此您可以为它提供无限的流(它不需要采用被压缩的流的大小)。
如果流是有限的,它会在其中一个流用完元素时立即停止。
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.stream.Stream;
class StreamUtils
static <ARG1, ARG2, RESULT> Stream<RESULT> zip(
Stream<ARG1> s1,
Stream<ARG2> s2,
BiFunction<ARG1, ARG2, RESULT> combiner)
final var i2 = s2.iterator();
return s1.map(x1 -> i2.hasNext() ? combiner.apply(x1, i2.next()) : null)
.takeWhile(Objects::nonNull);
这是一些单元测试代码(比代码本身长得多!)
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
class StreamUtilsTest
@ParameterizedTest
@MethodSource("shouldZipTestCases")
<ARG1, ARG2, RESULT>
void shouldZip(
String testName,
Stream<ARG1> s1,
Stream<ARG2> s2,
BiFunction<ARG1, ARG2, RESULT> combiner,
Stream<RESULT> expected)
var actual = StreamUtils.zip(s1, s2, combiner);
assertEquals(
expected.collect(Collectors.toList()),
actual.collect(Collectors.toList()),
testName);
private static Stream<Arguments> shouldZipTestCases()
return Stream.of(
Arguments.of(
"Two empty streams",
Stream.empty(),
Stream.empty(),
(BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
Stream.empty()),
Arguments.of(
"One singleton and one empty stream",
Stream.of(1),
Stream.empty(),
(BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
Stream.empty()),
Arguments.of(
"One empty and one singleton stream",
Stream.empty(),
Stream.of(1),
(BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
Stream.empty()),
Arguments.of(
"Two singleton streams",
Stream.of("blah"),
Stream.of(1),
(BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
Stream.of(pair("blah", 1))),
Arguments.of(
"One singleton, one multiple stream",
Stream.of("blob"),
Stream.of(2, 3),
(BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
Stream.of(pair("blob", 2))),
Arguments.of(
"One multiple, one singleton stream",
Stream.of("foo", "bar"),
Stream.of(4),
(BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
Stream.of(pair("foo", 4))),
Arguments.of(
"Two multiple streams",
Stream.of("nine", "eleven"),
Stream.of(10, 12),
(BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
Stream.of(pair("nine", 10), pair("eleven", 12)))
);
private static List<Object> pair(Object o1, Object o2)
return List.of(o1, o2);
static private <T1, T2> List<Object> combine(T1 o1, T2 o2)
return List.of(o1, o2);
@Test
void shouldLazilyEvaluateInZip()
final var a = new AtomicInteger();
final var b = new AtomicInteger();
final var zipped = StreamUtils.zip(
Stream.generate(a::incrementAndGet),
Stream.generate(b::decrementAndGet),
(xa, xb) -> xb + 3 * xa);
assertEquals(0, a.get(), "Should not have evaluated a at start");
assertEquals(0, b.get(), "Should not have evaluated b at start");
final var takeTwo = zipped.limit(2);
assertEquals(0, a.get(), "Should not have evaluated a at take");
assertEquals(0, b.get(), "Should not have evaluated b at take");
final var list = takeTwo.collect(Collectors.toList());
assertEquals(2, a.get(), "Should have evaluated a after collect");
assertEquals(-2, b.get(), "Should have evaluated b after collect");
assertEquals(List.of(2, 4), list);
【讨论】:
我不得不在最后删除takeWhile
,这似乎不在java8中,但这不是问题,因为被调用者可以过滤掉压缩流时发生的任何空值大小不一样。我认为这个答案应该是第一答案,因为它是包含且可以理解的。再次感谢您。【参考方案2】:
由于我无法想象对索引集合(列表)以外的集合进行任何压缩,而且我非常喜欢简单,这将是我的解决方案:
<A,B,C> Stream<C> zipped(List<A> lista, List<B> listb, BiFunction<A,B,C> zipper)
int shortestLength = Math.min(lista.size(),listb.size());
return IntStream.range(0,shortestLength).mapToObj( i ->
return zipper.apply(lista.get(i), listb.get(i));
);
【讨论】:
我认为mapToObject
应该是mapToObj
。
如果列表不是RandomAccess
(例如在链表上)这会很慢
当然。但大多数 Java 开发人员都清楚 LinkedList 对索引访问操作的性能较差。【参考方案3】:
使用最新的 Guava 库(Streams
类)你应该可以做到
final Map<String, String> result =
Streams.zip(
collection1.stream(),
collection2.stream(),
AbstractMap.SimpleEntry::new)
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
【讨论】:
【参考方案4】:如果有人还需要这个,streamex 库中有 StreamEx.zipWith
函数:
StreamEx<String> givenNames = StreamEx.of("Leo", "Fyodor")
StreamEx<String> familyNames = StreamEx.of("Tolstoy", "Dostoevsky")
StreamEx<String> fullNames = givenNames.zipWith(familyNames, (gn, fn) -> gn + " " + fn);
fullNames.forEach(System.out::println); // prints: "Leo Tolstoy\nFyodor Dostoevsky\n"
【讨论】:
【参考方案5】:我谦虚地建议这个实现。结果流被截断为两个输入流中较短的一个。
public static <L, R, T> Stream<T> zip(Stream<L> leftStream, Stream<R> rightStream, BiFunction<L, R, T> combiner)
Spliterator<L> lefts = leftStream.spliterator();
Spliterator<R> rights = rightStream.spliterator();
return StreamSupport.stream(new AbstractSpliterator<T>(Long.min(lefts.estimateSize(), rights.estimateSize()), lefts.characteristics() & rights.characteristics())
@Override
public boolean tryAdvance(Consumer<? super T> action)
return lefts.tryAdvance(left->rights.tryAdvance(right->action.accept(combiner.apply(left, right))));
, leftStream.isParallel() || rightStream.isParallel());
【讨论】:
我喜欢你的提议。但我并不完全同意最后一个.., leftStream.isParallel() || rightStream.isParallel()
。我认为它没有效果,因为AbstractSpliterator
默认提供有限的并行性。所以我认为最终的结果会和通过false
一样。
@MiguelGamboa - 感谢您的评论。我不确定您所说的“默认有限并行”是什么意思——您有指向某些文档的链接吗?【参考方案6】:
如果你的项目中有 Guava,可以使用Streams.zip 方法(在 Guava 21 中添加):
返回一个流,其中每个元素都是将 streamA 和 streamB 的相应元素传递给函数的结果。结果流将仅与两个输入流中较短的一个一样长;如果一个流更长,它的额外元素将被忽略。生成的流不能有效地拆分。这可能会损害并行性能。
public class Streams
...
public static <A, B, R> Stream<R> zip(Stream<A> streamA,
Stream<B> streamB, BiFunction<? super A, ? super B, R> function)
...
【讨论】:
【参考方案7】:我也需要这个,所以我只是从 b93 获取源代码并将其放入“util”类中。我不得不稍微修改它以使用当前的 API。
参考这里的工作代码(风险自负...):
public static<A, B, C> Stream<C> zip(Stream<? extends A> a,
Stream<? extends B> b,
BiFunction<? super A, ? super B, ? extends C> zipper)
Objects.requireNonNull(zipper);
Spliterator<? extends A> aSpliterator = Objects.requireNonNull(a).spliterator();
Spliterator<? extends B> bSpliterator = Objects.requireNonNull(b).spliterator();
// Zipping looses DISTINCT and SORTED characteristics
int characteristics = aSpliterator.characteristics() & bSpliterator.characteristics() &
~(Spliterator.DISTINCT | Spliterator.SORTED);
long zipSize = ((characteristics & Spliterator.SIZED) != 0)
? Math.min(aSpliterator.getExactSizeIfKnown(), bSpliterator.getExactSizeIfKnown())
: -1;
Iterator<A> aIterator = Spliterators.iterator(aSpliterator);
Iterator<B> bIterator = Spliterators.iterator(bSpliterator);
Iterator<C> cIterator = new Iterator<C>()
@Override
public boolean hasNext()
return aIterator.hasNext() && bIterator.hasNext();
@Override
public C next()
return zipper.apply(aIterator.next(), bIterator.next());
;
Spliterator<C> split = Spliterators.spliterator(cIterator, zipSize, characteristics);
return (a.isParallel() || b.isParallel())
? StreamSupport.stream(split, true)
: StreamSupport.stream(split, false);
【讨论】:
如果任意一个流是SIZED
而不是两者,结果流不应该是SIZED
吗?
我不这么认为。两个流都必须是 SIZED
才能使此实现工作。这实际上取决于您如何定义压缩。例如,您是否应该能够压缩两个不同大小的流?那么生成的流会是什么样子呢?我相信这就是 API 实际上省略了这个函数的原因。有很多方法可以做到这一点,由用户决定什么行为应该是“正确的”行为。您会丢弃较长流中的元素还是填充较短的列表?如果有,价值是多少?
除非我遗漏了什么,否则不需要任何演员表(例如Spliterator<A>
)。
是否有托管 Java 8 b93 源代码的网站?我找不到它。【参考方案8】:
这很棒。我必须将两个流压缩到一个 Map 中,一个流是键,另一个是值
Stream<String> streamA = Stream.of("A", "B", "C");
Stream<String> streamB = Stream.of("Apple", "Banana", "Carrot", "Doughnut");
final Stream<Map.Entry<String, String>> s = StreamUtils.zip(streamA,
streamB,
(a, b) ->
final Map.Entry<String, String> entry = new AbstractMap.SimpleEntry<String, String>(a, b);
return entry;
);
System.out.println(s.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())));
输出: A=苹果,B=香蕉,C=胡萝卜
【讨论】:
【参考方案9】:AOL 的cyclops-react,我参与其中,也提供了压缩功能,既通过extended Stream implementation,也实现反应流接口ReactiveSeq,通过StreamUtils,通过静态方法提供许多相同的功能到标准Java 流。
List<Tuple2<Integer,Integer>> list = ReactiveSeq.of(1,2,3,4,5,6)
.zip(Stream.of(100,200,300,400));
List<Tuple2<Integer,Integer>> list = StreamUtils.zip(Stream.of(1,2,3,4,5,6),
Stream.of(100,200,300,400));
它还提供更通用的基于 Applicative 的压缩。例如。
ReactiveSeq.of("a","b","c")
.ap3(this::concat)
.ap(of("1","2","3"))
.ap(of(".","?","!"))
.toList();
//List("a1.","b2?","c3!");
private String concat(String a, String b, String c)
return a+b+c;
甚至可以将一个流中的每个项目与另一个流中的每个项目配对
ReactiveSeq.of("a","b","c")
.forEach2(str->Stream.of(str+"!","2"), a->b->a+"_"+b);
//ReactiveSeq("a_a!","a_2","b_b!","b_2","c_c!","c2")
【讨论】:
【参考方案10】:使用 JDK8 和 lambda (gist) 压缩两个流。
public static <A, B, C> Stream<C> zip(Stream<A> streamA, Stream<B> streamB, BiFunction<A, B, C> zipper)
final Iterator<A> iteratorA = streamA.iterator();
final Iterator<B> iteratorB = streamB.iterator();
final Iterator<C> iteratorC = new Iterator<C>()
@Override
public boolean hasNext()
return iteratorA.hasNext() && iteratorB.hasNext();
@Override
public C next()
return zipper.apply(iteratorA.next(), iteratorB.next());
;
final boolean parallel = streamA.isParallel() || streamB.isParallel();
return iteratorToFiniteStream(iteratorC, parallel);
public static <T> Stream<T> iteratorToFiniteStream(Iterator<T> iterator, boolean parallel)
final Iterable<T> iterable = () -> iterator;
return StreamSupport.stream(iterable.spliterator(), parallel);
【讨论】:
不错的解决方案和(相对)紧凑!要求您将import java.util.function.*;
和 import java.util.stream.*;
放在文件顶部。
请注意,这是对流的终端操作。这意味着对于无限流,这种方法会失效
这么多无用的包装器:这里是() -> iterator
,又是这里:iterable.spliterator()
。为什么不直接实现Spliterator
而不是Iterator
?检查@Doradus 回答***.com/a/46230233/1140754【参考方案11】:
public class Tuple<S,T>
private final S object1;
private final T object2;
public Tuple(S object1, T object2)
this.object1 = object1;
this.object2 = object2;
public S getObject1()
return object1;
public T getObject2()
return object2;
public class StreamUtils
private StreamUtils()
public static <T> Stream<Tuple<Integer,T>> zipWithIndex(Stream<T> stream)
Stream<Integer> integerStream = IntStream.range(0, Integer.MAX_VALUE).boxed();
Iterator<Integer> integerIterator = integerStream.iterator();
return stream.map(x -> new Tuple<>(integerIterator.next(), x));
【讨论】:
【参考方案12】:zip是protonpack library提供的功能之一。
Stream<String> streamA = Stream.of("A", "B", "C");
Stream<String> streamB = Stream.of("Apple", "Banana", "Carrot", "Doughnut");
List<String> zipped = StreamUtils.zip(streamA,
streamB,
(a, b) -> a + " is for " + b)
.collect(Collectors.toList());
assertThat(zipped,
contains("A is for Apple", "B is for Banana", "C is for Carrot"));
【讨论】:
也可以在 StreamEx 中找到:amaembo.github.io/streamex/javadoc/one/util/streamex/…【参考方案13】:您提到的类的方法已移至Stream
接口本身,以支持默认方法。但似乎 zip
方法已被删除。可能是因为不清楚不同大小的流的默认行为应该是什么。但是实现所需的行为是直截了当的:
static <T> boolean every(
Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred)
Iterator<T> it=c2.iterator();
return c1.stream().allMatch(x->!it.hasNext()||pred.test(x, it.next()));
static <T> T find(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred)
Iterator<T> it=c2.iterator();
return c1.stream().filter(x->it.hasNext()&&pred.test(x, it.next()))
.findFirst().orElse(null);
【讨论】:
你传递给过滤器的predicate
不是有状态的吗?这违反了方法契约,尤其是在并行处理流时不起作用。
@Andreas:这里的解决方案都不支持并行处理。由于我的方法不返回流,它们确保流不会并行运行。同样,接受答案的代码返回一个流,该流可以变成并行,但实际上不会并行执行任何操作。也就是说,不鼓励有状态的谓词但不违反合同。如果您确保状态更新是线程安全的,它们甚至可以在并行上下文中使用。在某些情况下,它们是不可避免的,例如将流变成不同的流是一个有状态的谓词本身。
@Andreas:您可能会猜到为什么这些操作已从 Java API 中删除……【参考方案14】:
Lazy-Seq 库提供 zip 功能。
https://github.com/nurkiewicz/LazySeq
这个库深受scala.collection.immutable.Stream
的启发,旨在提供不可变、线程安全且易于使用的惰性序列实现,可能是无限的。
【讨论】:
以上是关于使用 JDK8 和 lambda (java.util.stream.Streams.zip) 压缩流的主要内容,如果未能解决你的问题,请参考以下文章