使用spark数据帧/数据集/ RDD使用内部联接进行更新

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用spark数据帧/数据集/ RDD使用内部联接进行更新相关的知识,希望对你有一定的参考价值。

我正在将ms sql server查询逻辑转换为spark。要转换的查询如下:

Update enc set PrUid=m.OriginalPrUid
FROM CachePatDemo enc 
inner join #MergePreMap m on enc.PrUid=m.NewPrUid
WHERE StatusId is null

我正在使用数据框进行转换,我在两个数据帧中有两个表,我作为内部联接加入。我需要找到一种方法来获取表1的所有列和更新的列(这两列都是通用的)。

我试过用这个:

val result = CachePatDemo.as("df123").
  join(MergePreMap.as("df321"), CachePatDemo("prUid") === MergePreMap("prUid"),"inner").where("StatusId is null")
  select($"df123.pId", 
         $"df321.provFname".as("firstName"), 
         $"df123.lastName", 
         $"df123.prUid")

它似乎没有解决我的问题。有人可以帮忙吗?

答案

在Spark 2.1这有效

case class TestModel(x1: Int, x2: String, x3: Int)

object JoinDataFrames extends App {
  import org.apache.spark.sql.{DataFrame, SparkSession}
  val spark = SparkSession.builder.appName("GroupOperations").master("local[2]").enableHiveSupport.getOrCreate

  import spark.implicits._
  import org.apache.spark.sql.functions._

  val list1 = (3 to 10).toList.map(i => new TestModel(i, "This is df1 " + i, i * 3))
  val list2 = (0 to 5).toList.map(i => new TestModel(i, "This is df2 " + i, i * 13))
  val df1: DataFrame = spark.sqlContext.createDataFrame[TestModel](list1)
  val df2: DataFrame = spark.sqlContext.createDataFrame[TestModel](list2)
  val res = df1.join(df2, Seq("x1"), "inner")
  println("from DF1")
  res.select(df1("x2")).show()
  println("from DF2")
  res.select(df2("x2")).show()
}

以上是关于使用spark数据帧/数据集/ RDD使用内部联接进行更新的主要内容,如果未能解决你的问题,请参考以下文章

内部联接在使用 Spark 2.1 的 DataFrame 中不起作用

从 pandas 数据帧转换为 LabeledPoint RDD

Spark 2.0 数据集与数据帧

使用另一个 RDD/df 在 Spark RDD 或数据帧中执行查找/翻译

Spark 中的数据框和数据集

Spark RDD 详细介绍