如何“扁平化”具有可变列数的 Spark 模式?

Posted

技术标签:

【中文标题】如何“扁平化”具有可变列数的 Spark 模式?【英文标题】:How to 'flatten' a Spark schema with a variable number of columns? 【发布时间】:2019-02-26 18:26:29 【问题描述】:

这是我创建的 Spark DataFrame 的架构:

root
 |-- id: double (nullable = true)
 |-- sim_scores: struct (nullable = true)
 |    |-- scores: map (nullable = true)
 |    |    |-- key: string
 |    |    |-- value: map (valueContainsNull = true)
 |    |    |    |-- key: integer
 |    |    |    |-- value: vector (valueContainsNull = true)

“sim_scores”结构代表我用于聚合目的的 Scala 案例类。我定制了一个旨在合并这些结构的 UDAF。为了使它们对所有边缘情况都是安全的,它们看起来就像它们一样。让我们假设这个问题,他们必须保持这种状态。

我想将此 DataFrame '扁平化'为:

root
 |-- id: double (nullable = true)
 |-- score_1: map (valueContainsNull = true)
 |    |-- key: integer
 |    |-- value: vector (valueContainsNull = true)
 |-- score_2: map (valueContainsNull = true)
 |    |-- key: integer
 |    |-- value: vector (valueContainsNull = true)
 |-- score_3: map (valueContainsNull = true)
 |    |-- key: integer
 |    |-- value: vector (valueContainsNull = true)
...

'scores' 结构中的外部 MapType 将分数主题映射到文档;表示文档的内部映射将文档中的句子位置映射到向量分数。 'score_1', 'score_2', ... 表示初始 DF 中 'scores' MapType 的所有可能键。

在 json-ish 术语中,如果我的输入如下所示:

 "id": 739874.0,
  "sim_scores": 
    "firstTopicName": 
      1: [1,9,1,0,1,1,4,6],
      2: [5,7,8,2,4,3,1,3],
      ...
    ,
    "anotherTopic": 
      1: [6,8,4,1,3,4,2,0],
      2: [0,1,3,2,4,5,6,2],
      ...
    
  

然后我会得到一个输出

 "id": 739874.0,
  "firstTopicName": 
    1: [1,9,1,0,1,1,4,6],
    2: [5,7,8,2,4,3,1,3],
    ...
  
  "anotherTopic": 
    1: [6,8,4,1,3,4,2,0],
    2: [0,1,3,2,4,5,6,2],
    ...
  

如果我知道主题栏的总数,这将很容易;但我不。主题的数量由用户在运行时设置,输出 DataFrame 具有可变数量的列。保证 >=1,但我需要对其进行设计,以便它可以在必要时与 100 个不同的主题列一起使用。

我该如何实现?

最后一点:我被 Spark 1.6.3 卡住了;因此适用于该版本的解决方案是最好的。但是,我会采取任何方式来做这件事,以期将来实施。

【问题讨论】:

How to get keys and values from MapType column in SparkSQL DataFrame的可能重复 【参考方案1】:

在高层次上,我认为您有两个选择:

    使用数据框 API 切换到 RDD

如果你想继续使用 spark SQL,那么你可以使用selectExpr 并生成选择查询:

it("should flatten using dataframes and spark sql") 
  val sqlContext = new SQLContext(sc)
  val df = sqlContext.createDataFrame(sc.parallelize(rows), schema)
  df.printSchema()
  df.show()
  val numTopics = 3 // input from user
  // fancy logic to generate the select expression
  val selectColumns: Seq[String] = "id" +: 1.to(numTopics).map(i => s"sim_scores['scores']['topic$i']")
  val df2 = df.selectExpr(selectColumns:_*)
  df2.printSchema()
  df2.show()

鉴于此示例数据:

val schema = sql.types.StructType(List(
  sql.types.StructField("id", sql.types.DoubleType, nullable = true),
  sql.types.StructField("sim_scores", sql.types.StructType(List(
    sql.types.StructField("scores", sql.types.MapType(sql.types.StringType, sql.types.MapType(sql.types.IntegerType, sql.types.StringType)), nullable = true)
  )), nullable = true)
))
val rows = Seq(
  sql.Row(1d, sql.Row(Map("topic1" -> Map(1 -> "scores1"), "topic2" -> Map(1 -> "scores2")))),
  sql.Row(2d, sql.Row(Map("topic1" -> Map(1 -> "scores1"), "topic2" -> Map(1 -> "scores2")))),
  sql.Row(3d, sql.Row(Map("topic1" -> Map(1 -> "scores1"), "topic2" -> Map(1 -> "scores2"), "topic3" -> Map(1 -> "scores3"))))
)

你得到这个结果:

root
 |-- id: double (nullable = true)
 |-- sim_scores.scores[topic1]: map (nullable = true)
 |    |-- key: integer
 |    |-- value: string (valueContainsNull = true)
 |-- sim_scores.scores[topic2]: map (nullable = true)
 |    |-- key: integer
 |    |-- value: string (valueContainsNull = true)
 |-- sim_scores.scores[topic3]: map (nullable = true)
 |    |-- key: integer
 |    |-- value: string (valueContainsNull = true)

+---+-------------------------+-------------------------+-------------------------+
| id|sim_scores.scores[topic1]|sim_scores.scores[topic2]|sim_scores.scores[topic3]|
+---+-------------------------+-------------------------+-------------------------+
|1.0|        Map(1 -> scores1)|        Map(1 -> scores2)|                     null|
|2.0|        Map(1 -> scores1)|        Map(1 -> scores2)|                     null|
|3.0|        Map(1 -> scores1)|        Map(1 -> scores2)|        Map(1 -> scores3)|
+---+-------------------------+-------------------------+-------------------------+

另一个选项是切换到处理 RDD,您可以在其中根据地图中的键添加更强大的展平逻辑。

【讨论】:

以上是关于如何“扁平化”具有可变列数的 Spark 模式?的主要内容,如果未能解决你的问题,请参考以下文章

组合具有不同列数的 Spark 数据帧

Rcpp 创建具有可变列数的 DataFrame

T-SQL 查询将数据插入到具有可变列数的表中

具有动态列数的 QML TableView

spark如何设置列数的数据集

Android:具有动态列数的列表视图