spark1.x和2.xIterable和iterator兼容问题
Posted huiandong
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark1.x和2.xIterable和iterator兼容问题相关的知识,希望对你有一定的参考价值。
1. spark 1.x 升级到spark 2.x 对于普通的spark来说,变动不大 : 1 举一个最简单的实例: spark1.x public static JavaRDD<String> workJob(JavaRDD<String> spark1Rdd) { JavaPairRDD<String, Integer> testRdd = spark1Rdd .flatMapToPair(new PairFlatMapFunction<String, String, Integer>() { @Override public Iterable<Tuple2<String, Integer>> call(String str) throws Exception { ArrayList<Tuple2<String, Integer>> list = new ArrayList<>(); return list; } }); return spark1Rdd; } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 spark2.x public static JavaRDD<String> workJob(JavaRDD<String> spark2Rdd) { JavaPairRDD<String, Integer> testRdd2 = spark2Rdd .flatMapToPair(new PairFlatMapFunction<String, String, Integer>() { @Override public Iterator<Tuple2<String, Integer>> call(String str) throws Exception { ArrayList<Tuple2<String, Integer>> list = new ArrayList<>(); return list.iterator(); } }); return spark2Rdd; } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 需要说明的是: 上面的返回的rdd就直接用输入的 RDD显然是不合理的! 只是为了用最简洁的方式介绍代码的转换而已! 可以看到 : 区别主要在于 1. spark 1.x中的Iterable对象 变成了 spark2.x中的Iterator对象 2. 相应的,对于返回值为list的RDD, spark2.x中要返回list.iterator(); 1 2 3 还是很简单的吧 问题在于 : 如果你有几个spark程序要运行在不同的环境下,(有的现场用1.x,有的现场用2.x) 你需要同时维护两种不同版本的spark,是不是耗时又耗力呢? 这个时候就需要考虑到 spark版本的兼容性,使你的程序能成功的运行在各种集群环境下 2. spark版本的兼容 写一个简单的工具类如下 : import java.util.Iterator; public class MyIterator<T> implements Iterator, Iterable { private Iterator myIterable; public MyIterator(Iterable iterable) { myIterable = iterable.iterator(); } @Override public boolean hasNext() { return myIterable.hasNext(); } @Override public Object next() { return myIterable.next(); } @Override public void remove() { myIterable.remove(); } @Override public Iterator iterator() { return myIterable; } } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 只需要进行如上设计就可以实现版本的兼容了 那么应该如何应用呢? JavaRDD<String> flatMapRDD = lines.flatMap(new FlatMapFunction<String, String>() { @Override public MyIterator<String> call(String s) throws Exception { String[] split = s.split("\s+"); MyIterator myIterator = new MyIterator(Arrays.asList(split)); return myIterator; } });
以上是关于spark1.x和2.xIterable和iterator兼容问题的主要内容,如果未能解决你的问题,请参考以下文章
Spark(23)——Spark1.X和Spark2.X的区别
Spark ALS recommendForAll源码解析实战之Spark1.x vs Spark2.x