Scala - 如何将 Dataset[Row] 转换为可添加到 Dataframe 的列

Posted

技术标签:

【中文标题】Scala - 如何将 Dataset[Row] 转换为可添加到 Dataframe 的列【英文标题】:Scala - How to convert a Dataset[Row] to a column that can be added to a Dataframe 【发布时间】:2018-10-30 08:07:00 【问题描述】:

我正在尝试将一列的数据框添加到更大的数据框,但是第一个数据框的问题是在创建它并尝试通过命令将其添加到主数据框之后:

  df.withColumn("name", dataframe)

我得到错误:

 **found   : org.apache.spark.sql.DataFrame
 (which expands to)  org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
 required: org.apache.spark.sql.Column**

我知道 Dataset[Row] 应该是 Dataframe 的同义词,但我不知道如何解决这个错误。

对于上下文,我的代码的(真正)淡化版本如下:

// test function - will be used as part of the main script below
def Test(inputone: Double, inputtwo: Double): Double =  
 var test = (2 * inputone) + inputtwo
 test 

对于主脚本(即问题所在)

//Importing the data via CSV
var df = sqlContext.read.format("csv").option("header",     "true").option("inferSchema", "true").load("/root/file.csv")

给出数据的上下文:

df: org.apache.spark.sql.DataFrame = [ID: int, blue: int ... 8 more fields]

+---+----+------+-----+------+------+----+---+-----+-----+
| ID|blue|purple|green|yellow|orange|pink|red|white|black|
+---+----+------+-----+------+------+----+---+-----+-----+
|  1| 500|    44|    0|     0|     3|   0|  5|   43|    2|
|  2| 560|    33|    1|     0|     4|   0| 22|   33|    4|
|  3| 744|    44|    1|    99|     3|1000| 78|   90|    0|
+---+----+------+-----+------+------+----+---+-----+-----+

root
 |-- ID: integer (nullable = true)
 |-- blue: integer (nullable = true)
 |-- purple: integer (nullable = true)
 |-- green: integer (nullable = true)
 |-- yellow: integer (nullable = true)
 |-- orange: integer (nullable = true)
 |-- pink: integer (nullable = true)
 |-- red: integer (nullable = true)
 |-- white: integer (nullable = true)
 |-- black: integer (nullable = true)

从那时起,脚本继续

// Creating a list for which columns to draw from the main dataframe
val a = List("green", "blue")

// Creating the mini dataframe to perform the function upon
val test_df = df.select(a.map(col): _*)

// The new dataframe will now go through the 'Test' function defined above
val df_function = test_df.rdd.map(col => Test(col(0).toString.toDouble, col(1).toString.toDouble))

// Converting the RDD output back to a dataframe (of one column)
val df_convert = df_function.toDF

作为参考,输出如下所示

+-----+
|value|
+-----+
|500.0|
|562.0|
|746.0|
+-----+

脚本的最后一行是将其添加到主数据框中,如下所示

 df = df.withColumn("new column", df_convert)

但如上所述,我收到以下错误:

found   : org.apache.spark.sql.DataFrame

   (which expands to)  org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]

required: org.apache.spark.sql.Column

/////////编辑///////////

@user9819212 解决方案适用于简单的方法,但是当调用一个更复杂的方法时,我收到以下错误

    test2_udf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function5>,DoubleType,Some(List(DoubleType, IntegerType, StringType, DoubleType, DoubleType)))
    java.lang.ClassCastException: $anonfun$1 cannot be cast to scala.Function1

所以我尝试创建另一个简化版本的代码,并对调用的测试函数进行一些额外的更改

// test function - will be used as part of the main script below
def Test (valueone: Double, valuetwo: Integer): Double = 
    val test = if(valuetwo > 2000) valueone + 4000 else valueone
    val fakeList = List(3000,4000,500000000)
    val index = fakeList.indexWhere(x => x>=valueone)
    val test2 = fakeList(index - 1) * valueone
    test2


val test_udf = udf(Test _)

df = df.withColumn(
   "new column", 
   test_udf(col("green").cast("double"), col("blue").cast("integer"))
)

起初这似乎可行,但是当我尝试使用命令查看数据框时

df.show

我收到以下错误

    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 153.0 failed 1 times, most recent failure: Lost task 0.0 in stage 153.0 (TID 192, localhost, executor driver): 
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (double, int) => double)

【问题讨论】:

@Andrey Tyukin 我改了标题,现在更合适还是不应该引用 Dataset[Row] 类型? 标题不错,感谢更新。 查看您编辑的 Test 函数,只有一个错误,即当索引为 0 时(当 fakeList.indexWhere(x =&gt; x&gt;=valueone) 时)返回 0。您必须处理它。否则 udf 函数看起来很完美。但是您可以使用内置函数重构代码 @Ramesh Maharjan 啊,我可能又简化了太多,我的错 - 我真的只是遇到以下错误 ==> test2_udf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(,DoubleType,Some(List(DoubleType, IntegerType, StringType, DoubleType, DoubleType))) java.lang.ClassCastException: $anonfun$1 cannot be cast to scala.Function1 - 当我把它放到我的主脚本中时,如果需要,我可以提供更多详细信息 cmets 不适合代码。你能更新一下错误和test2_udf函数定义的问题以及你是如何调用它的吗? 【参考方案1】:

您不能以这种方式添加来自另一个DataFrame(或DataFrame)的列。只需使用UserDefinedFunction

import org.apache.spark.sql.functions.udf._

val test_udf = udf(Test _)

df.withColumn(
   "new column", 
   test_udf(col("green").cast("double"), col("blue").cast("double"))
)

或者用这么简单的功能:

df.withColumn(
   "new column", 
   2 * col("green").cast("double") + col("blue").cast("double")
)

【讨论】:

@user9819212 你的解决方案适用于我提供的简单函数,但是当我尝试将它实现到我的真实脚本时,我得到错误(这是我的错,因为我试图简化脚本太多了,所以在 StackExchange 上发帖会更容易)我将编辑我的原始帖子以在一分钟内解释我的意思【参考方案2】:

如果你去api document,它会被明确提到

public DataFrame withColumn(java.lang.String colName, Column col) Returns a new DataFrame by adding a column or replacing the existing column that has the same name.

如您所见,第二个参数应该是Column,而您一直在传递DataFrame

这就是问题的原因。

您正在尝试将一列从df_convert 添加到df。这两个数据帧完全不同。对于这种情况,如果您想分离数据框,您将不得不查看join

或spark functions 与 withColumn api 一起使用作为列。

更新

查看您的第一个日志

test2_udf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(&lt;function5&gt;,DoubleType,Some(List(DoubleType, IntegerType, StringType, DoubleType, DoubleType)))

建议您将udf 函数定义为

def Test(valueone: Double, valuetwo: Integer, valuethree: String, valuefour: Double, valuefive: Double): Double = 
  ???
  //calculation parts

val test2_udf = udf(Test _)
//Test: Test[](val valueone: Double,val valuetwo: Integer,val valuethree: String,val valuefour: Double,val valuefive: Double) => Double
//test2_udf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function5>,DoubleType,Some(List(DoubleType, IntegerType, StringType, DoubleType, DoubleType)))

还有你的第二个日志

java.lang.ClassCastException: $anonfun$1 cannot be cast to scala.Function1

建议您在 test2_udf 调用中只传递一个参数

df.withColumn("new column", test2_udf(col("green").cast("double"))).show(false)
//java.lang.ClassCastException: A$A30$A$A30$$anonfun$test2_udf$1 cannot be cast to scala.Function1

如果您关注错误消息的cannot be cast to scala.Function1 部分,它会清楚地提示传递给 udf 函数的列数

如果你传递三个参数,那么你会得到关注

df.withColumn("new column", test2_udf(col("green").cast("double"),col("green").cast("double"),col("green").cast("double"))).show(false)
//java.lang.ClassCastException: A$A31$A$A31$$anonfun$test2_udf$1 cannot be cast to scala.Function3

【讨论】:

我应该早点提到这一点,但你更新的部分是我问题第二部分的正确答案(即当我开始遇到错误时) - 所以谢谢:) 很高兴它对您有所帮助。当你有资格时,你总是可以投票;)

以上是关于Scala - 如何将 Dataset[Row] 转换为可添加到 Dataframe 的列的主要内容,如果未能解决你的问题,请参考以下文章

如何将 seq[row] 转换为 scala 中的数据框

将 Json 的 Dataset 列解析为 Dataset<Row>

Scala - 当 Row.get(i) 将检索 null 时如何避免 java.lang.IllegalArgumentException

如何在 Spark 2 Scala 中将 Row 转换为 json

如何使用来自另一个 Dataset<Row> 的记录更新 Dataset<Row>,这些记录在 Spark 中使用 JAVA API 具有相同的模式?

Spark---Dataset