将具有地图值的多个地图合并为自定义案例类实例

Posted

技术标签:

【中文标题】将具有地图值的多个地图合并为自定义案例类实例【英文标题】:Merging multiple maps with map value as custom case class instance 【发布时间】:2019-09-03 05:08:43 【问题描述】:

我想使用 Spark/Scala 合并多个地图。这些地图有一个案例类实例作为值。

以下是相关代码:

case class SampleClass(value1:Int,value2:Int)

val sampleDataDs = Seq(
      ("a",25,Map(1->SampleClass(1,2))),
      ("a",32,Map(1->SampleClass(3,4),2->SampleClass(1,2))),
      ("b",16,Map(1->SampleClass(1,2))),
      ("b",18,Map(2->SampleClass(10,15)))).toDF("letter","number","maps")

输出:

+------+-------+--------------------------+
|letter|number |maps                      |
+------+-------+--------------------------+
|a     |  25   | [1-> [1,2]]              |
|a     |  32   | [1-> [3,4], 2 -> [1,2]]  |
|b     |  16   | [1 -> [1,2]]             |
|b     |  18   | [2 -> [10,15]]           |
+------+-------+--------------------------+

我想根据“字母”列对数据进行分组,以便最终数据集应具有以下预期的最终输出:

+------+---------------------------------+
|letter| maps                            |
+------+---------------------------------+
|a     | [1-> [4,6], 2 -> [1,2]]         |
|b     | [1-> [1,2], 2 -> [10,15]]       |                 
+------+---------------------------------+

我尝试按“字母”分组,然后应用 udf 来聚合地图中的值。以下是我尝试过的:

val aggregatedDs = SampleDataDs.groupBy("letter").agg(collect_list(SampleDataDs("maps")).alias("mapList")) 

输出:

+------+----------------------------------------+
|letter| mapList                                |
+------+-------+--------------------------------+
|a     | [[1-> [1,2]],[1-> [3,4], 2 -> [1,2]]]  |
|b     | [[1-> [1,2]],[2 -> [10,15]]]           |                 
+------+----------------------------------------+ 

在这之后我尝试写一个udf来合并collect_list的输出并得到最终输出:

def mergeMap = udf  valSeq:Seq[Map[Int,SampleClass]] =>
valSeq.flatten.groupBy(_._1).mapValues(x=>(x.map(_._2.value1).reduce(_ + _),x.map(_._2.value2).reduce(_ + _)))


val aggMapDs = aggregatedDs.withColumn("aggValues",mergeMap(col("mapList")))

但是它失败并显示错误消息:

Failed to execute user defined function Caused by :java.lang.classCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to SampleClass

我的 Spark 版本是 2.3.1。任何想法如何获得预期的最终输出?

【问题讨论】:

【参考方案1】:

问题是由于UDF 无法接受案例类作为输入。 Spark 数据框将在内部将您的案例类表示为 Row 对象。因此,可以通过如下更改UDF 输入类型来避免该问题:

val mergeMap = udf((valSeq:Seq[Map[Int, Row]]) => 
  valSeq.flatten
    .groupBy(_._1)
    .mapValues(x=> 
      SampleClass(
        x.map(_._2.getAs[Int]("value1")).reduce(_ + _),
        x.map(_._2.getAs[Int]("value2")).reduce(_ + _)
      )
    )
)

请注意,处理 Row 对象需要进行一些小的额外更改。

运行此代码将导致:

val aggMapDs = aggregatedDs.withColumn("aggValues",mergeMap(col("mapList")))

+------+----------------------------------------------+-----------------------------+
|letter|mapList                                       |aggValues                    |
+------+----------------------------------------------+-----------------------------+
|b     |[Map(1 -> [1,2]), Map(2 -> [10,15])]          |Map(2 -> [10,15], 1 -> [1,2])|
|a     |[Map(1 -> [1,2]), Map(1 -> [3,4], 2 -> [1,2])]|Map(2 -> [1,2], 1 -> [4,6])  |
+------+----------------------------------------------+-----------------------------+

【讨论】:

@fresher:没问题,很乐意提供帮助。 :)【参考方案2】:

DataframeDataset 之间存在细微差别。

Dataset 具有两个不同的 API 特征:强类型 API 和无类型 API,如下表所示。从概念上讲,将 DataFrame 视为通用对象 Dataset[Row] 集合的别名,其中 Row 是通用的无类型 JVM 对象。相比之下,数据集是强类型 JVM 对象的集合,由您在 Scala 中定义的案例类或 Java 中的类决定

当您将Seq 转换为Dataframe 时,类型信息会丢失。

val df: Dataframe = Seq(...).toDf() <-- here

您可以做的是将Seq 转换为Dataset

val typedDs: Dataset[(String, Int, Map[Int, SampleClass])] = Seq(...).toDS()

+---+---+--------------------+
| _1| _2|                  _3|
+---+---+--------------------+
|  a| 25|       [1 -> [1, 2]]|
|  a| 32|[1 -> [3, 4], 2 -...|
|  b| 16|       [1 -> [1, 2]]|
|  b| 18|     [2 -> [10, 15]]|
+---+---+--------------------+

因为Seq 中的***对象是Tuple,Spark 会生成虚拟列名。

现在您应该注意返回类型,类型化数据集上的函数会丢失类型信息。

val untyped: Dataframe = typedDs
  .groupBy("_1")
  .agg(collect_list(typedDs("_3")).alias("mapList"))

为了使用类型化 API,您应该明确定义类型

val aggregatedDs = sampleDataDs
      .groupBy("letter")
      .agg(collect_list(sampleDataDs("maps")).alias("mapList"))

val toTypedAgg: Dataset[(String, Array[Map[Int, SampleClass]])] = aggregatedDs
 .as[(String, Array[Map[Int, SampleClass]])] //<- here

很遗憾,udf 不能像 there are 那样工作,Spark 可以为其推断架构的有限类型。

toTypedAgg.withColumn("aggValues", mergeMap1(col("mapList"))).show()

Schema for type org.apache.spark.sql.Dataset[(String, Array[Map[Int,SampleClass]])] is not supported

你可以做的是通过Datasetmap

val mapped = toTypedAgg.map(v => 
  (v._1, v._2.flatten.groupBy(_._1).mapValues(x=>(x.map(_._2.value1).sum,x.map(_._2.value2).sum)))
)

+---+----------------------------+
|_1 |_2                          |
+---+----------------------------+
|b  |[2 -> [10, 15], 1 -> [1, 2]]|
|a  |[2 -> [1, 2], 1 -> [4, 6]]  |
+---+----------------------------+

【讨论】:

以上是关于将具有地图值的多个地图合并为自定义案例类实例的主要内容,如果未能解决你的问题,请参考以下文章

安卓高德地图开发java如何调用js自定义图层

php Wordpress yoast seo插件,为自定义URL生成自定义站点地图

php Wordpress yoast seo插件,为自定义URL生成自定义站点地图

Flutter 多自定义图标未显示在地图视图上

在高德地图上添加自定义Marker

如何从 Hadoop 中的地图程序中输出具有列表等数据结构的自定义类