使用spark数据帧/数据集/ RDD使用内部联接进行更新
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用spark数据帧/数据集/ RDD使用内部联接进行更新相关的知识,希望对你有一定的参考价值。
我正在将ms sql server查询逻辑转换为spark。要转换的查询如下:
Update enc set PrUid=m.OriginalPrUid
FROM CachePatDemo enc
inner join #MergePreMap m on enc.PrUid=m.NewPrUid
WHERE StatusId is null
我正在使用数据框进行转换,我在两个数据帧中有两个表,我作为内部联接加入。我需要找到一种方法来获取表1的所有列和更新的列(这两列都是通用的)。
我试过用这个:
val result = CachePatDemo.as("df123").
join(MergePreMap.as("df321"), CachePatDemo("prUid") === MergePreMap("prUid"),"inner").where("StatusId is null")
select($"df123.pId",
$"df321.provFname".as("firstName"),
$"df123.lastName",
$"df123.prUid")
它似乎没有解决我的问题。有人可以帮忙吗?
答案
在Spark 2.1这有效
case class TestModel(x1: Int, x2: String, x3: Int)
object JoinDataFrames extends App {
import org.apache.spark.sql.{DataFrame, SparkSession}
val spark = SparkSession.builder.appName("GroupOperations").master("local[2]").enableHiveSupport.getOrCreate
import spark.implicits._
import org.apache.spark.sql.functions._
val list1 = (3 to 10).toList.map(i => new TestModel(i, "This is df1 " + i, i * 3))
val list2 = (0 to 5).toList.map(i => new TestModel(i, "This is df2 " + i, i * 13))
val df1: DataFrame = spark.sqlContext.createDataFrame[TestModel](list1)
val df2: DataFrame = spark.sqlContext.createDataFrame[TestModel](list2)
val res = df1.join(df2, Seq("x1"), "inner")
println("from DF1")
res.select(df1("x2")).show()
println("from DF2")
res.select(df2("x2")).show()
}
以上是关于使用spark数据帧/数据集/ RDD使用内部联接进行更新的主要内容,如果未能解决你的问题,请参考以下文章
内部联接在使用 Spark 2.1 的 DataFrame 中不起作用
从 pandas 数据帧转换为 LabeledPoint RDD