如何“扁平化”具有可变列数的 Spark 模式?
Posted
技术标签:
【中文标题】如何“扁平化”具有可变列数的 Spark 模式?【英文标题】:How to 'flatten' a Spark schema with a variable number of columns? 【发布时间】:2019-02-26 18:26:29 【问题描述】:这是我创建的 Spark DataFrame 的架构:
root
|-- id: double (nullable = true)
|-- sim_scores: struct (nullable = true)
| |-- scores: map (nullable = true)
| | |-- key: string
| | |-- value: map (valueContainsNull = true)
| | | |-- key: integer
| | | |-- value: vector (valueContainsNull = true)
“sim_scores”结构代表我用于聚合目的的 Scala 案例类。我定制了一个旨在合并这些结构的 UDAF。为了使它们对所有边缘情况都是安全的,它们看起来就像它们一样。让我们假设这个问题,他们必须保持这种状态。
我想将此 DataFrame '扁平化'为:
root
|-- id: double (nullable = true)
|-- score_1: map (valueContainsNull = true)
| |-- key: integer
| |-- value: vector (valueContainsNull = true)
|-- score_2: map (valueContainsNull = true)
| |-- key: integer
| |-- value: vector (valueContainsNull = true)
|-- score_3: map (valueContainsNull = true)
| |-- key: integer
| |-- value: vector (valueContainsNull = true)
...
'scores' 结构中的外部 MapType 将分数主题映射到文档;表示文档的内部映射将文档中的句子位置映射到向量分数。 'score_1', 'score_2', ... 表示初始 DF 中 'scores' MapType 的所有可能键。
在 json-ish 术语中,如果我的输入如下所示:
"id": 739874.0,
"sim_scores":
"firstTopicName":
1: [1,9,1,0,1,1,4,6],
2: [5,7,8,2,4,3,1,3],
...
,
"anotherTopic":
1: [6,8,4,1,3,4,2,0],
2: [0,1,3,2,4,5,6,2],
...
然后我会得到一个输出
"id": 739874.0,
"firstTopicName":
1: [1,9,1,0,1,1,4,6],
2: [5,7,8,2,4,3,1,3],
...
"anotherTopic":
1: [6,8,4,1,3,4,2,0],
2: [0,1,3,2,4,5,6,2],
...
如果我知道主题栏的总数,这将很容易;但我不。主题的数量由用户在运行时设置,输出 DataFrame 具有可变数量的列。保证 >=1,但我需要对其进行设计,以便它可以在必要时与 100 个不同的主题列一起使用。
我该如何实现?
最后一点:我被 Spark 1.6.3 卡住了;因此适用于该版本的解决方案是最好的。但是,我会采取任何方式来做这件事,以期将来实施。
【问题讨论】:
How to get keys and values from MapType column in SparkSQL DataFrame的可能重复 【参考方案1】:在高层次上,我认为您有两个选择:
-
使用数据框 API
切换到 RDD
如果你想继续使用 spark SQL,那么你可以使用selectExpr
并生成选择查询:
it("should flatten using dataframes and spark sql")
val sqlContext = new SQLContext(sc)
val df = sqlContext.createDataFrame(sc.parallelize(rows), schema)
df.printSchema()
df.show()
val numTopics = 3 // input from user
// fancy logic to generate the select expression
val selectColumns: Seq[String] = "id" +: 1.to(numTopics).map(i => s"sim_scores['scores']['topic$i']")
val df2 = df.selectExpr(selectColumns:_*)
df2.printSchema()
df2.show()
鉴于此示例数据:
val schema = sql.types.StructType(List(
sql.types.StructField("id", sql.types.DoubleType, nullable = true),
sql.types.StructField("sim_scores", sql.types.StructType(List(
sql.types.StructField("scores", sql.types.MapType(sql.types.StringType, sql.types.MapType(sql.types.IntegerType, sql.types.StringType)), nullable = true)
)), nullable = true)
))
val rows = Seq(
sql.Row(1d, sql.Row(Map("topic1" -> Map(1 -> "scores1"), "topic2" -> Map(1 -> "scores2")))),
sql.Row(2d, sql.Row(Map("topic1" -> Map(1 -> "scores1"), "topic2" -> Map(1 -> "scores2")))),
sql.Row(3d, sql.Row(Map("topic1" -> Map(1 -> "scores1"), "topic2" -> Map(1 -> "scores2"), "topic3" -> Map(1 -> "scores3"))))
)
你得到这个结果:
root
|-- id: double (nullable = true)
|-- sim_scores.scores[topic1]: map (nullable = true)
| |-- key: integer
| |-- value: string (valueContainsNull = true)
|-- sim_scores.scores[topic2]: map (nullable = true)
| |-- key: integer
| |-- value: string (valueContainsNull = true)
|-- sim_scores.scores[topic3]: map (nullable = true)
| |-- key: integer
| |-- value: string (valueContainsNull = true)
+---+-------------------------+-------------------------+-------------------------+
| id|sim_scores.scores[topic1]|sim_scores.scores[topic2]|sim_scores.scores[topic3]|
+---+-------------------------+-------------------------+-------------------------+
|1.0| Map(1 -> scores1)| Map(1 -> scores2)| null|
|2.0| Map(1 -> scores1)| Map(1 -> scores2)| null|
|3.0| Map(1 -> scores1)| Map(1 -> scores2)| Map(1 -> scores3)|
+---+-------------------------+-------------------------+-------------------------+
另一个选项是切换到处理 RDD,您可以在其中根据地图中的键添加更强大的展平逻辑。
【讨论】:
以上是关于如何“扁平化”具有可变列数的 Spark 模式?的主要内容,如果未能解决你的问题,请参考以下文章