以编程方式将几列添加到 Spark DataFrame
Posted
技术标签:
【中文标题】以编程方式将几列添加到 Spark DataFrame【英文标题】:Programmatically adding several columns to Spark DataFrame 【发布时间】:2015-09-15 07:41:43 【问题描述】:我在 scala 中使用 spark。
我有一个包含 3 列的数据框:ID、时间、RawHexdata。 我有一个用户定义的函数,它接受 RawHexData 并将其扩展为 X 更多列。重要的是要说明每一行 X 是相同的(列不变)。但是,在收到第一个数据之前,我不知道这些列是什么。但是一旦有了头,我就可以推断出来了。
我想要第二个带有上述列的数据框:Id,Time,RawHexData,NewCol1,...,NewCol3。
我能想到的“最简单”的方法是: 1. 将每一行反序列化成json(这里每条数据tyoe都是可序列化的) 2.添加我的新栏目, 3. 从修改后的 json 中反序列化一个新的数据帧,
但是,这似乎是一种浪费,因为它涉及 2 个昂贵且冗余的 json 序列化步骤。我正在寻找更干净的模式。
使用 case-classes 似乎是个坏主意,因为我不知道列数或列名。
【问题讨论】:
您能否提供更多详细信息?RawHexdata
中可能包含的数据示例。
在满足某些条件后,您始终可以应用.withColumn()
函数
Rawhexdata 是由一堆嵌入式设备发送的巨大二进制 blob。 I 包含将被反序列化为其他平面数字数据的数据:双精度数、整数、复数等。稍后我想让分析师使用 Sparksql 查询这些数据。但是,当数据在 blob 中时,这是不可能的,所以我编写了一个 UDF“parseblob”,它接受一个 blob 并返回一个 map/json 对象(我可以更改返回类型以适应解决方案)。我希望这张地图的内容是另一个表中的列,其中每一行都与原始原始数据相关。
@niemand,withcolumn 一次允许一个 cloumn。无论如何我可以使用 withcolumn 而不为我添加的每一列重新解析整个 blob? (例如,我想添加 3 列)。如果是这样,我可以通过重复调用 withcolumn 轻松添加一个添加多个列的函数,但是,我能想到的每个 withcolumn 合成器都需要每行多次解析原始数据。我对 Scala 不是很熟悉,也许有什么办法……
【参考方案1】:
动态扩展DataFrame
可以对行RDD 进行操作,您可以通过调用dataFrame.rdd
获得该行。拥有Row
实例,您可以访问RawHexdata
列并解析包含的数据。通过将新解析的列添加到结果Row
,您几乎解决了您的问题。将RDD[Row]
转换回DataFrame
唯一需要做的就是为新列生成模式数据。为此,您可以在驱动程序上收集单个 RawHexdata
值,然后提取列类型。
以下代码说明了这种方法。
object App
case class Person(name: String, age: Int)
def main(args: Array[String])
val sparkConf = new SparkConf().setAppName("Test").setMaster("local[4]")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val input = sc.parallelize(Seq(Person("a", 1), Person("b", 2)))
val dataFrame = input.df
dataFrame.show()
// create the extended rows RDD
val rowRDD = dataFrame.rdd.map
row =>
val blob = row(1).asInstanceOf[Int]
val newColumns: Seq[Any] = Seq(blob, blob * 2, blob * 3)
Row.fromSeq(row.toSeq.init ++ newColumns)
val schema = dataFrame.schema
// we know that the new columns are all integers
val newColumns = StructType
Seq(new StructField("1", IntegerType), new StructField("2", IntegerType), new StructField("3", IntegerType))
val newSchema = StructType(schema.init ++ newColumns)
val newDataFrame = sqlContext.createDataFrame(rowRDD, newSchema)
newDataFrame.show()
【讨论】:
谢谢,虽然我不知道每个具体数值的类型。我可以添加一个“开关”并构建 Seq 函数 正是@eshalev,假设您所有的RawHexdata
包含相同的列,您可以收集一个RawHexdata
对象并计算结果列的数据类型。【参考方案2】:
SELECT
是您无需返回 RDD 即可解决问题的朋友。
case class Entry(Id: String, Time: Long)
val entries = Seq(
Entry("x1", 100L),
Entry("x2", 200L)
)
val newColumns = Seq("NC1", "NC2", "NC3")
val df = spark.createDataFrame(entries)
.select(col("*") +: (newColumns.map(c => lit(null).as(c))): _*)
df.show(false)
+---+----+----+----+----+
|Id |Time|NC1 |NC2 |NC3 |
+---+----+----+----+----+
|x1 |100 |null|null|null|
|x2 |200 |null|null|null|
+---+----+----+----+----+
【讨论】:
以上是关于以编程方式将几列添加到 Spark DataFrame的主要内容,如果未能解决你的问题,请参考以下文章
以编程方式将 UILabel 和 UIImageViews 添加到 UIScrollView
添加新行时,如何以编程方式滚动到 Flex Spark Textarea 的末尾?