如何在不从 DataFrame 转换并访问它的情况下将列添加到 Dataset?

Posted

技术标签:

【中文标题】如何在不从 DataFrame 转换并访问它的情况下将列添加到 Dataset?【英文标题】:How to add a column to Dataset without converting from a DataFrame and accessing it? 【发布时间】:2016-11-15 11:08:22 【问题描述】:

我知道使用.withColumn()UDF 向Spark DataSet 添加新列的方法,它返回一个DataFrame。我也知道,我们可以将生成的 DataFrame 转换为 DataSet。

我的问题是:

    如果我们仍然遵循传统的 DF 方法(即将列名作为字符串传递给 UDF 的输入),DataSet 的类型安全如何发挥作用 是否有一种“面向对象的方式”来访问列(不将列名作为字符串传递),就像我们过去对 RDD 所做的那样,用于追加新列。 如何在 map、filter 等正常操作中访问新列?

例如:

    scala> case class Temp(a : Int, b : String)    //creating case class
    scala> val df = Seq((1,"1str"),(2,"2str),(3,"3str")).toDS    // creating DS
    scala> val appendUDF = udf( (b : String) => b + "ing")      // sample UDF

    scala> df.withColumn("c",df("b"))   // adding a new column
    res5: org.apache.spark.sql.DataFrame = [a: int, b: string ... 1 more field]

    scala> res5.as[Temp]   // converting to DS
    res6: org.apache.spark.sql.Dataset[Temp] = [a: int, b: string ... 1 more field]

    scala> res6.map( x =>x.  
    // list of autosuggestion :
    a   canEqual   equals     productArity     productIterator   toString   
    b   copy       hashCode   productElement   productPrefix 

我使用.withColumn() 添加的新列c 不可访问,因为c 列不在案例类Temp 中(它仅包含ab)在使用res5.as[Temp] 将其转换为 DS 的瞬间。

如何访问列c

【问题讨论】:

【参考方案1】:

Datasets 的类型安全世界中,您可以将一个结构映射到另一个结构。

也就是说,对于每个转换,我们都需要数据的模式表示(因为它是 RDD 所需要的)。要访问上面的“c”,我们需要创建一个新的模式来提供对它的访问。

case class A(a:String)
case class BC(b:String, c:String)
val f:A => BC = a=> BC(a.a,"c") // Transforms an A into a BC

val data = (1 to 10).map(i => A(i.toString))
val dsa = spark.createDataset(data)
// dsa: org.apache.spark.sql.Dataset[A] = [a: string]

val dsb = dsa.map(f)
//dsb: org.apache.spark.sql.Dataset[BC] = [b: string, c: string]

【讨论】:

有没有其他方法可以在不传递字符串的情况下添加列? @vdep String 只是跟随问题脉络的一个例子。 不,我的意思是我们可以在不将列名b 作为字符串传递的情况下做到这一点:df.withColumn("c",df("b")) @vdep 你能想象一个 API 吗?如果您负责数据集的 API,会怎么做?我很好奇你为什么这么需要与众不同。 @JacekLaskowski 例如,如果我可以像这样使用它:df.withColumn("c",df.b) 而不是 df.withColumn("c",df("b")),我将能够在编译时捕获诸如拼写错误的列名或访问不存在的列等错误阶段本身,而不是在运行时获得“列未找到异常”。这只是我的拙见/意见,不是主要需求或要求。【参考方案2】:

只是为了添加到@maasg 的出色答案......

如果我们仍然遵循传统的 DF 方法(即将列名作为字符串传递给 UDF 的输入),DataSet 的类型安全如何发挥作用

让我用另一个问题来回答这个问题“我们在'我们仍在关注......'”中是谁?如果您认为我,我不同意,并且仅在我懒得创建案例类来描述要使用的数据集时才使用 DataFrames。

我对 UDF 的回答是远离 UDF,除非它们非常简单并且 Spark Optimizer 无法优化。是的,我确实相信 UDF 太容易定义和使用,以至于我自己被冲昏了太多次(过度)使用它们。 Spark SQL 2.0 中提供了大约 239 个函数,您可能很难(呃)想出一个没有 UDF 而是标准函数的解决方案。

scala> spark.version
res0: String = 2.1.0-SNAPSHOT

scala> spark.catalog.listFunctions.count
res1: Long = 240

(上面240是因为我注册了一个UDF)。

您应该始终使用标准函数,因为它们可以进行优化。 Spark 可以控制您正在执行的操作,从而优化您的查询。

您还应该使用数据集(不是Dataset[Row],即DataFrame),因为它们使您可以访问对字段的类型安全访问。

(然而,一些数据集“好东西”也无法优化,因为数据集编程都是关于 Scala 自定义代码,Spark 无法像基于 DataFrame 的代码那样优化)。

是否有一种“面向对象的方式”来访问列(不将列名作为字符串传递),就像我们过去对 RDD 所做的那样,用于附加新列。

是的。当然。用例类定义您的数据集架构并使用字段。访问和添加(@maasg 对此做出了很好的回应,所以我不会在这里重复他的话)。

如何在 map、filter 等正常操作中访问新列?

简单...再次。使用描述数据集(架构)的案例类。如何向现有对象添加新的“东西”?你不能,除非它以某种方式已经接受了一个新的专栏,不是吗?

在“访问列或追加新列的“面向对象方式”中。”如果您的列是案例类的属性,则不能说“这是一个描述数据的类,同时又说这是一个可能具有新属性的类”。这在 OOP/FP 中是不可能的,是吗?

这就是为什么添加新列归结为使用另一个案例类或使用withColumn。那有什么问题?我认为……只是……没有错。

【讨论】:

以上是关于如何在不从 DataFrame 转换并访问它的情况下将列添加到 Dataset?的主要内容,如果未能解决你的问题,请参考以下文章

如何在不从参考节点获取所有数据的情况下获取 Firebase 数据库中的随机键?

如何在不从当前活动选项卡中获取焦点的情况下将子窗口添加到 QMdiArea(设置为 TAB 模式)?

如何在不从表单中删除数据的情况下重置 BootstrapValidator? [复制]

PySpark DataFrames - 在不转换为 Pandas 的情况下进行枚举的方法?

PySpark DataFrames - 在不转换为 Pandas 的情况下进行枚举的方法?

如何在不从磁盘中删除文件的情况下 git rm 文件? [复制]