RDD的distinct,intersection,union和subtract的使用问题
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RDD的distinct,intersection,union和subtract的使用问题相关的知识,希望对你有一定的参考价值。
我使用以上这四个RDD的操作来实现集合的去重,交集,并集和差集。但是,在使用的时候,拿不到我想要的结果。rdd=rdd.distinct()操作rdd.collect()无变化,rdd=rdd.intersection(rdd),自己和自己做交集竟然为空,rdd=rdd.union(rdd)结果数据量翻倍了,rdd=rdd.subtract(rdd)结果数量没变化,只是顺序有变化。请问这是为什么呢,我hive表中的数据是有重复的啊。谢谢看到的各位了,哪怕是给个思路也好,谢谢谢谢
参考技术A 在你进行rdd.distinct()的时候你能确认你rdd里面的类型一致吗?比如存放了 date 和uid 如果你进行了distinct 那么类似于sql select date,uid from table group by date,uidunion类似于sql 的union 是让两个rdd union一下 数据量肯定是翻倍的,rdd1.substract(rdd2) 是rdd1减去和rdd2重复的 数据量没有变化是不应该的 给你个网址你看看会有很大收获 http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html#subtract
另外 我的解释不一定准确 你一定要做demo验证后再使用 spark还是很强大了 普通的sql可以很简单的写出来
spark 教程三 spark Map filter flatMap union distinct intersection操作
RDD的创建 spark 所有的操作都围绕着弹性分布式数据集(RDD)进行,这是一个有容错机制的并可以被并行操作的元素集合,具有只读、分区、容错、高效、无需物化、可以缓存、RDD依赖等特征
RDD的创建基础RDD
1.并行集合(Parallelized Collections):接收一个已经存在的Scala集合,然后进行各种并行运算
var sc=new SparkContext(conf) var rdd=sc.parallelize(Array(2,4,9,3,5,7,8,1,6)); rdd.foreach(println)
2.hadoop 数据集(Hadoop Datasets):在一个文件的每条记录上运行函数只要文件系统是HDFS,或者hadoop支持的任意存档存储系统即可
val file=sc.textFile("hdfs://hostname:9090/path/somefile.txt")
Map的操作
map(func) 返回一个新的RDD ,由每一个元素经过func后转换组成
//设置conf master =local 为本地模式, appNamm 是应用程序的名称 var conf=new SparkConf().setMaster("local[1]").setAppName("RDD") var sc=new SparkContext(conf) // 加载一个数组,然后给没一个元素乘以2, 并且输出 var rdd=sc.parallelize(Array(2,4,9,3,5,7,8,1,6)).map(_*2).collect().foreach(println)
filter(func) 返回一个新的RDD,由func函数计算后返回为true 的元素组成
//返回字母为b的元素 var strrdd=sc.parallelize(Array("a","b","c","d")).filter(_=="b").foreach(println)
flatMap(func) 类似于Map 但是每一个输入元素被映射成0个或多个输出元素,因此func 应该返回一个序列,而不是单一元素
//把每一个元素乘以2然后在压平,输出 var list=sc.parallelize(List(List(1,6),List(2,7),List(3,8),List(4,9),List(5,10))) list.flatMap(_.map(_*2)).foreach(println)
下面用flatMap做一个wordcount的例子,新建一个文件建立了一下几个字段
/读取本地文件然后 。用split函数 把元素拆分成单一的元素,然后再用个map 给没个元素设置一个1, 在用reduceByKey把相同key的数量相加 var txtrdd=sc.textFile("d:/517/wc.txt").flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_).foreach(println) result: (tome,1) (tom,1) (xiaoli,1) (lusi,1) (hello,7) (java,1) (jame,2)
distinct(numTasks) 返回一个数据集中不包含重复元素的新数据集 (去重)
var la=sc.parallelize(List(("A",1),("A",6),("B",2),("B",7),("A",1))) var distinct=la.distinct() result: (A,1) (A,6) (B,7) (B,2)
union(otherDataset) 返回一个新的数据集,新数据集由源数据集和参数数据集共同构成
var lb=sc.parallelize(List(("A",1),("A",6),("B",2),("B",7),("C",3))) var la=sc.parallelize(List(("A",1),("A",6),("D",2),("E",7),("A",1))) var distinct=la.union(lb) distinct.foreach(println result: (A,1) (A,6) (D,2) (E,7) (A,1) (A,1) (A,6) (B,2) (B,7) (C,3)
intersection(otherDataset) 返回一个新的数据集,新数据集由源数据集和参数数据集的交集构成
var lb=sc.parallelize(List(("A",1),("A",6),("B",2),("B",7),("C",3))) var la=sc.parallelize(List(("A",1),("A",6),("D",2),("E",7),("A",1))) var distinct=la.intersection(lb) distinct.foreach(println) result: (A,1) (A,6)
以上是关于RDD的distinct,intersection,union和subtract的使用问题的主要内容,如果未能解决你的问题,请参考以下文章
有没有办法重写 Spark RDD distinct 以使用 mapPartitions 而不是 distinct?