除非我更改列名,否则不会应用 DataFrame 用户定义的函数

Posted

技术标签:

【中文标题】除非我更改列名,否则不会应用 DataFrame 用户定义的函数【英文标题】:DataFrame user-defined function not applied unless I change column name 【发布时间】:2018-02-26 10:19:27 【问题描述】:

我想使用隐式函数定义转换我的 DataFrame 列。

我定义了我的 DataFrame 类型,其中包含附加功能:

class MyDF(df: DataFrame) 
    def bytes2String(colName: String): DataFrame = df
       .withColumn(colname + "_tmp", udf((x: Array[Byte]) => bytes2String(x)).apply(col(colname)))
       .drop(colname)
       .withColumnRenamed(colname + "_tmp", colname)

然后我定义我的隐式转换类:

object NpDataFrameImplicits 
    implicit def toNpDataFrame(df: DataFrame): NpDataFrame = new NpDataFrame(df)

最后,这是我在一个小型 FunSuite 单元测试中所做的:

test("example: call to bytes2String") 
    val df: DataFrame = ...
    df.select("header.ID").show() // (1)
    df.bytes2String("header.ID").withColumnRenamed("header.ID", "id").select("id").show() // (2)
    df.bytes2String("header.ID").select("header.ID").show() // (3)

显示 #1

+-------------------------------------------------+
|ID                                               |
+-------------------------------------------------+
|[62 BF 58 0C 6C 59 48 9C 91 13 7B 97 E7 29 C0 2F]|
|[5C 54 49 07 00 24 40 F4 B3 0E E7 2C 03 B8 06 3C]|
|[5C 3E A2 21 01 D9 4C 1B 80 4E F9 92 1D 4A FE 26]|
|[08 C1 55 89 CE 0D 45 8C 87 0A 4A 04 90 2D 51 56]|
+-------------------------------------------------+

显示 #2

+------------------------------------+
|id                                  |
+------------------------------------+
|62bf580c-6c59-489c-9113-7b97e729c02f|
|5c544907-0024-40f4-b30e-e72c03b8063c|
|5c3ea221-01d9-4c1b-804e-f9921d4afe26|
|08c15589-ce0d-458c-870a-4a04902d5156|
+------------------------------------+

显示 #3

+-------------------------------------------------+
|ID                                               |
+-------------------------------------------------+
|[62 BF 58 0C 6C 59 48 9C 91 13 7B 97 E7 29 C0 2F]|
|[5C 54 49 07 00 24 40 F4 B3 0E E7 2C 03 B8 06 3C]|
|[5C 3E A2 21 01 D9 4C 1B 80 4E F9 92 1D 4A FE 26]|
|[08 C1 55 89 CE 0D 45 8C 87 0A 4A 04 90 2D 51 56]|
+-------------------------------------------------+

正如您在这里看到的那样,第三个 show(也就是没有重命名列)没有按预期工作,并向我们展示了一个未转换的 ID 列。有人知道为什么吗?

编辑:

df.select(col("header.ID") as "ID").bytes2String("ID").show()的输出:

+------------------------------------+
|ID                                  |
+------------------------------------+
|62bf580c-6c59-489c-9113-7b97e729c02f|
|5c544907-0024-40f4-b30e-e72c03b8063c|
|5c3ea221-01d9-4c1b-804e-f9921d4afe26|
|08c15589-ce0d-458c-870a-4a04902d5156|
+------------------------------------+

【问题讨论】:

df.select("header.ID" as "ID").bytes2String("ID").show() 的输出是什么? @RumeshKrish 我附加了你所问的结果。你可知道发生了什么?为什么 3 show 没有按预期工作? 原因是复杂结构字段没有发生数据帧转换。您正在尝试隐藏 StructType 字段的字段值。 df.select("header.ID" as "ID") 将值分解为根级别,然后您正在转换字段级别。 我只是不明白“您试图隐藏 StructType 字段的字段值。” -> 这是什么意思?感谢您的帮助;) @belka希望下面的回答能帮助你找到原因。 【参考方案1】:

让我用下面的例子解释一下你的转换函数发生了什么。 首先创建数据框:

val jsonString: String =
    """
      | "employee": 
      |   "id": 12345,
      |   "name": "krishnan"
      | ,
      | "_id": 1
      |""".stripMargin

  val jsonRDD: RDD[String] = sc.parallelize(Seq(jsonString, jsonString))

  val df: DataFrame = sparkSession.read.json(jsonRDD)
  df.printSchema()

输出结构:

root
 |-- _id: long (nullable = true)
 |-- employee: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)

转换功能类似于你的:

def myConversion(myDf: DataFrame, colName: String): DataFrame = 
    myDf.withColumn(colName + "_tmp", udf((x: Long) => (x+1).toString).apply(col(colName)))
      .drop(colName)
      .withColumnRenamed(colName + "_tmp", colName)
  

场景一#root 级别字段进行转换。

myConversion(df, "_id").show()
myConversion(df, "_id").select("_id").show()

结果:

+----------------+---+
|        employee|_id|
+----------------+---+
|[12345,krishnan]|  2|
|[12345,krishnan]|  2|
+----------------+---+
+---+
|_id|
+---+
|  2|
|  2|
+---+

场景 2#employee.id 进行转换。在这里,当我们使用employee.id 表示时,数据框在root 级别添加了新字段id。这是正确的行为。

myConversion(df, "employee.id").show()
myConversion(df, "employee.id").select("employee.id").show()

结果:

+---+----------------+-----------+
|_id|        employee|employee.id|
+---+----------------+-----------+
|  1|[12345,krishnan]|      12346|
|  1|[12345,krishnan]|      12346|
+---+----------------+-----------+
+-----+
|   id|
+-----+
|12345|
|12345|
+-----+

场景3#选择内层为根级别,然后进行转换。

myConversion(df.select("employee.id"), "id").show()

结果:

+-----+
|   id|
+-----+
|12346|
|12346|
+-----+

我的新转换函数,获取结构类型字段并执行转换并将其存储到结构类型字段本身中。在这里,传递employee 字段并单独转换id 字段,但更改是在root 级别完成employee 字段。

case class Employee(id: String, name: String)

def myNewConversion(myDf: DataFrame, colName: String): DataFrame = 
    myDf.withColumn(colName + "_tmp", udf((row: Row) => Employee((row.getLong(0)+1).toString, row.getString(1))).apply(col(colName)))
      .drop(colName)
      .withColumnRenamed(colName + "_tmp", colName)
  

你的场景编号 3# 使用我的转换函数。

myNewConversion(df, "employee").show()
myNewConversion(df, "employee").select("employee.id").show()

结果#

+---+----------------+
|_id|        employee|    
+---+----------------+
|  1|[12346,krishnan]|
|  1|[12346,krishnan]|
+---+----------------+
+-----+
|   id|
+-----+
|12346|
|12346|
+-----+

【讨论】:

您好@RumeshKrish 感谢您的回答,抱歉耽搁了。在您的场景#3中,我不明白您为什么选择列“id”而不是“employee.id”。我在本地进行了测试,它确实有效,但为什么呢?而且我仍然不明白为什么我必须在实际看到更改之前重命名该列...... 你能做df.select("employee.id")看看。您将获得名为id 的单列。数据框是不可变的。您无法转换任何 coulmn 的值。您必须定义转换规则udf 或函数以传递df 并获取新的df。这是行为。选择并转换或定义employee 列的转换规则,这是获取结果的唯一方法。您的案例重命名 header.ID 与 df 中的字段分开。它没有选择 header 内的字段。

以上是关于除非我更改列名,否则不会应用 DataFrame 用户定义的函数的主要内容,如果未能解决你的问题,请参考以下文章

除非在 vuejs2 中通过热重载更改代码,否则 firebase 存储不会运行

Spark DataFrame xml更改列名

除非等于 UIView 框架,否则框架不会正确更改

除非选择了项目,否则Visual Studio不会保存构建更改

SmartEdit:除非刷新页面,否则嵌套 CMS 组件中的更改不会反映

除非重新加载页面,否则 React 应用程序路由不会加载内容