Spark join和cogroup算子
Posted 小帆的帆
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark join和cogroup算子相关的知识,希望对你有一定的参考价值。
转载请标明出处:小帆的帆的专栏
join
下面的代码包括RDD和DataFrame的join操作, 注释中有详细描述
import org.apache.spark.sql.types.DataTypes, StructField, StructType
import org.apache.spark.sql.Row, SQLContext
import org.apache.spark.SparkConf, SparkContext
object Run
def main(args: Array[String])
val conf = new SparkConf().setAppName("test").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val sqlContext = new SQLContext(sc)
/**
* id name
* 1 zhangsan
* 2 lisi
* 3 wangwu
*/
val idName = sc.parallelize(Array((1, "zhangsan"), (2, "lisi"), (3, "wangwu")))
/**
* id age
* 1 30
* 2 29
* 4 21
*/
val idAge = sc.parallelize(Array((1, 30), (2, 29), (4, 21)))
/** *******************************RDD **********************************/
println("*********************************RDD**********************************")
println("\\n内关联(inner join)\\n")
// 内关联(inner join)
// 只保留两边id相等的部分
/**
* (1,(zhangsan,30))
* (2,(lisi,29))
*/
idName.join(idAge).collect().foreach(println)
println("\\n左外关联(left out join)\\n")
// 左外关联(left out join)
// 以左边的数据为标准, 左边的数据一律保留
// 右边分三情况:
// 一: 左边的id, 右边有, 则合并数据; (1,(zhangsan,Some(30)))
// 二: 左边的id, 右边没有, 则右边为空; (3,(wangwu,None))
// 三: 右边的id, 左边没有, 则不保留; 右边有id为4的行, 但结果中并未保留
/**
* (1,(zhangsan,Some(30)))
* (2,(lisi,Some(29)))
* (3,(wangwu,None))
*/
idName.leftOuterJoin(idAge).collect().foreach(println)
println("\\n右外关联(right outer join)\\n")
// 右外关联(right outer join)
// 以右边的数据为标准, 右边的数据一律保留
// 左边分三种情况:
// 一: 右边的id, 左边有, 则合并数据; (1,(Some(zhangsan),30))
// 二: 右边的id, 左边没有, 则左边为空; (4,(None,21))
// 三: 左边的id, 右边没有, 则不保留; 左边有id为3的行, 但结果中并为保留
/**
* (1,(Some(zhangsan),30))
* (2,(Some(lisi),29))
* (4,(None,21))
*/
idName.rightOuterJoin(idAge).collect().foreach(println)
println("\\n全外关联(full outer join)\\n")
// 全外关联(full outer join)
/**
*
* (1,(Some(zhangsan),Some(30)))
* (2,(Some(lisi),Some(29)))
* (3,(Some(wangwu),None))
* (4,(None,Some(21)))
*/
idName.fullOuterJoin(idAge).collect().foreach(println)
/** *******************************DataFrame **********************************/
val schema1 = StructType(Array(StructField("id", DataTypes.IntegerType, nullable = true), StructField("name", DataTypes.StringType, nullable = true)))
val idNameDF = sqlContext.createDataFrame(idName.map(t => Row(t._1, t._2)), schema1)
val schema2 = StructType(Array(StructField("id", DataTypes.IntegerType, nullable = true), StructField("age", DataTypes.IntegerType, nullable = true)))
val idAgeDF = sqlContext.createDataFrame(idAge.map(t => Row(t._1, t._2)), schema2)
println("*********************************DataFrame**********************************")
println("\\n内关联(inner join)\\n")
// 相当于调用, idNameDF.join(idAgeDF, Seq("id"), "inner").collect().foreach(println)
// 这里只是调用了封装的API
idNameDF.join(idAgeDF, "id").collect().foreach(println)
println("\\n左外关联(left out join)\\n")
idNameDF.join(idAgeDF, Seq("id"), "left_outer").collect().foreach(println)
println("\\n右外关联(right outer join)\\n")
idNameDF.join(idAgeDF, Seq("id"), "right_outer").collect().foreach(println)
println("\\n全外关联(full outer join)\\n")
idNameDF.join(idAgeDF, Seq("id"), "outer").collect().foreach(println)
println("\\nleft semi join\\n")
// left semi join
// 左边的id, 在右边有, 就保留左边的数据; 右边的数据不保留, 只有id的有意义的
/**
* [1,zhangsan]
* [2,lisi]
*/
idNameDF.join(idAgeDF, Seq("id"), "leftsemi").collect().foreach(println)
cogroup与join的笛卡尔积
当出现相同Key时, join会出现笛卡尔积, 而cogroup的处理方式不同
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf, SparkContext
object Run
def main(args: Array[String])
val conf = new SparkConf().setAppName("test").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val sqlContext = new SQLContext(sc)
/**
* id name
* 1 zhangsan
* 2 lisi
* 3 wangwu
*/
val idName = sc.parallelize(Array((1, "zhangsan"), (2, "lisi"), (3, "wangwu")))
/**
* id age
* 1 30
* 2 29
* 4 21
*/
val idAge = sc.parallelize(Array((1, 30), (2, 29), (4, 21)))
println("\\ncogroup\\n")
/**
* (1,(CompactBuffer(zhangsan),CompactBuffer(30)))
* (2,(CompactBuffer(lisi),CompactBuffer(29)))
* (3,(CompactBuffer(wangwu),CompactBuffer()))
* (4,(CompactBuffer(),CompactBuffer(21)))
*/
idName.cogroup(idAge).collect().foreach(println)
println("\\njoin\\n")
// fullOuterJoin于cogroup的结果类似, 只是数据结构不一样
/**
* (1,(Some(zhangsan),Some(30)))
* (2,(Some(lisi),Some(29)))
* (3,(Some(wangwu),None))
* (4,(None,Some(21)))
*/
idName.fullOuterJoin(idAge).collect().foreach(println)
/**
* id score
* 1 100
* 2 90
* 2 95
*/
val idScore = sc.parallelize(Array((1, 100), (2, 90), (2, 95)))
println("\\ncogroup, 出现相同id时\\n")
/**
* (1,(CompactBuffer(zhangsan),CompactBuffer(100)))
* (2,(CompactBuffer(lisi),CompactBuffer(90, 95)))
* (3,(CompactBuffer(wangwu),CompactBuffer()))
*/
idName.cogroup(idScore).collect().foreach(println)
println("\\njoin, 出现相同id时\\n")
/**
* (1,(Some(zhangsan),Some(100)))
* (2,(Some(lisi),Some(90)))
* (2,(Some(lisi),Some(95)))
* (3,(Some(wangwu),None))
*/
idName.fullOuterJoin(idScore).collect().foreach(println)
参考链接
以上是关于Spark join和cogroup算子的主要内容,如果未能解决你的问题,请参考以下文章