以编程方式将几列添加到 Spark DataFrame

Posted

技术标签:

【中文标题】以编程方式将几列添加到 Spark DataFrame【英文标题】:Programmatically adding several columns to Spark DataFrame 【发布时间】:2015-09-15 07:41:43 【问题描述】:

我在 scala 中使用 spark。

我有一个包含 3 列的数据框:ID、时间、RawHexdata。 我有一个用户定义的函数,它接受 RawHexData 并将其扩展为 X 更多列。重要的是要说明每一行 X 是相同的(列不变)。但是,在收到第一个数据之前,我不知道这些列是什么。但是一旦有了头,我就可以推断出来了。

我想要第二个带有上述列的数据框:Id,Time,RawHexData,NewCol1,...,NewCol3。

我能想到的“最简单”的方法是: 1. 将每一行反序列化成json(这里每条数据tyoe都是可序列化的) 2.添加我的新栏目, 3. 从修改后的 json 中反序列化一个新的数据帧,

但是,这似乎是一种浪费,因为它涉及 2 个昂贵且冗余的 json 序列化步骤。我正在寻找更干净的模式。

使用 case-classes 似乎是个坏主意,因为我不知道列数或列名。

【问题讨论】:

您能否提供更多详细信息? RawHexdata 中可能包含的数据示例。 在满足某些条件后,您始终可以应用.withColumn()函数 Rawhexdata 是由一堆嵌入式设备发送的巨大二进制 blob。 I 包含将被反序列化为其他平面数字数据的数据:双精度数、整数、复数等。稍后我想让分析师使用 Sparksql 查询这些数据。但是,当数据在 blob 中时,这是不可能的,所以我编写了一个 UDF“parseblob”,它接受一个 blob 并返回一个 map/json 对象(我可以更改返回类型以适应解决方案)。我希望这张地图的内容是另一个表中的列,其中每一行都与原始原始数据相关。 @niemand,withcolumn 一次允许一个 cloumn。无论如何我可以使用 withcolumn 而不为我添加的每一列重新解析整个 blob? (例如,我想添加 3 列)。如果是这样,我可以通过重复调用 withcolumn 轻松添加一个添加多个列的函数,但是,我能想到的每个 withcolumn 合成器都需要每行多次解析原始数据。我对 Scala 不是很熟悉,也许有什么办法…… 【参考方案1】:

动态扩展DataFrame 可以对行RDD 进行操作,您可以通过调用dataFrame.rdd 获得该行。拥有Row 实例,您可以访问RawHexdata 列并解析包含的数据。通过将新解析的列添加到结果Row,您几乎解决了您的问题。将RDD[Row] 转换回DataFrame 唯一需要做的就是为新列生成模式数据。为此,您可以在驱动程序上收集单个 RawHexdata 值,然后提取列类型。

以下代码说明了这种方法。

object App 

  case class Person(name: String, age: Int)

  def main(args: Array[String]) 
    val sparkConf = new SparkConf().setAppName("Test").setMaster("local[4]")
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    val input = sc.parallelize(Seq(Person("a", 1), Person("b", 2)))
    val dataFrame = input.df

    dataFrame.show()

    // create the extended rows RDD
    val rowRDD = dataFrame.rdd.map
      row =>
        val blob = row(1).asInstanceOf[Int]
        val newColumns: Seq[Any] = Seq(blob, blob * 2, blob * 3)
        Row.fromSeq(row.toSeq.init ++ newColumns)
    

    val schema = dataFrame.schema

    // we know that the new columns are all integers
    val newColumns = StructType
      Seq(new StructField("1", IntegerType), new StructField("2", IntegerType), new StructField("3", IntegerType))
    

    val newSchema = StructType(schema.init ++ newColumns)

    val newDataFrame = sqlContext.createDataFrame(rowRDD, newSchema)

    newDataFrame.show()
  

【讨论】:

谢谢,虽然我不知道每个具体数值的类型。我可以添加一个“开关”并构建 Seq 函数 正是@eshalev,假设您所有的RawHexdata 包含相同的列,您可以收集一个RawHexdata 对象并计算结果列的数据类型。【参考方案2】:

SELECT 是您无需返回 RDD 即可解决问题的朋友。

case class Entry(Id: String, Time: Long)

val entries = Seq(
  Entry("x1", 100L),
  Entry("x2", 200L)
)

val newColumns = Seq("NC1", "NC2", "NC3")

val df = spark.createDataFrame(entries)
  .select(col("*") +: (newColumns.map(c => lit(null).as(c))): _*)

df.show(false)

+---+----+----+----+----+
|Id |Time|NC1 |NC2 |NC3 |
+---+----+----+----+----+
|x1 |100 |null|null|null|
|x2 |200 |null|null|null|
+---+----+----+----+----+

【讨论】:

以上是关于以编程方式将几列添加到 Spark DataFrame的主要内容,如果未能解决你的问题,请参考以下文章

以编程方式将 UILabel 和 UIImageViews 添加到 UIScrollView

添加新行时,如何以编程方式滚动到 Flex Spark Textarea 的末尾?

如何将几列熊猫数据框转换为另一条记录中的 JSON 记录?

JMP图形图表,怎么才能将几列数据的散点图和箱线图集中体现在同一个图形中,如图所示,谢谢!

Spark SQL读写方法

如何在 Spark Structured Streaming 中向 DataFrame 添加几列(仍未填充)