使用 Spark Scala 使用现有列添加新列

Posted

技术标签:

【中文标题】使用 Spark Scala 使用现有列添加新列【英文标题】:Adding new column using existing one using Spark Scala 【发布时间】:2017-10-09 09:39:38 【问题描述】:

您好,我想在 DataFrame 的每一行中使用现有列添加新列,我正在 Spark Scala 中尝试这样的操作... df 是包含可变列数的数据帧,只能在运行时确定。

// Added new column "docid"
val df_new = appContext.sparkSession.sqlContext.createDataFrame(df.rdd, df.schema.add("docid", DataTypes.StringType))

 df_new.map(x => 
        import appContext.sparkSession.implicits._
      val allVals = (0 to x.size).map(x.get(_)).toSeq
      val values = allVals ++ allVals.mkString("_") 
      Row.fromSeq(values)
    ) 

但这给出的错误是eclipse本身

找不到存储在数据集中的类型的编码器。通过导入 spark.implicits 支持原始类型(Int、String 等)和产品类型(案例类)。未来版本中将添加对序列化其他类型的支持。 方法映射的参数不足:(隐式证据$7:org.apache.spark.sql.Encoder[org.apache.spark.sql.Row])org.apache.spark.sql.Dataset[org.apache. spark.sql.Row]。未指定值参数evidence$7。

请帮忙。

【问题讨论】:

import 应该在map 之外完成。 你能举例说明输入数据和预期输出吗?这应该可以以更有效的方式解决。 有什么理由不能按照 werner 的回答中的建议使用 df.withColumn() 吗?那将是最直接的解决方案 【参考方案1】:

functions object 中的concat_ws 可以提供帮助。

此代码添加了docid 字段

df = df.withColumn("docid", concat_ws("_", df.columns.map(df.col(_)):_*))

假设df 的所有列都是字符串。

【讨论】:

以上是关于使用 Spark Scala 使用现有列添加新列的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Scala/Spark 添加不基于数据框中现有列的新列? [复制]

如何在 Spark SQL 中向现有 Dataframe 添加新列

在现有列的基础上在 DataFrame 中添加新列

scala/spark 代码中不允许在配置单元中添加列

Spark 根据现有列的映射值创建新列

Spark和Scala,通过映射公用键添加具有来自另一个数据帧的值的新列[重复]