提取数组元素并将它们映射到案例类

Posted

技术标签:

【中文标题】提取数组元素并将它们映射到案例类【英文标题】:Extracting array elements and mapping them to case class 【发布时间】:2021-12-24 11:38:28 【问题描述】:

以下是我在 caseclass 数据集上执行 groupByKey、mapGroups 和 joinWith 操作后得到的输出:

+------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|_1                            |_2                                                                                                                                                        |
+------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[IND0001,Christopher,Black]   |null                                                                                                                                                            |
|[IND0002,Madeleine,Kerr]      |[IND0002,WrappedArray([IND0002,ACC0155,323], [IND0002,ACC0262,60])]                                                                                             |
|[IND0003,Sarah,Skinner]       |[IND0003,WrappedArray([IND0003,ACC0235,631], [IND0003,ACC0486,400], [IND0003,ACC0540,53])]                                                                      |
|[IND0004,Rachel,Parsons]      |[IND0004,WrappedArray([IND0004,ACC0116,965])]                                                                                                                   |
|[IND0005,Oliver,Johnston]     |[IND0005,WrappedArray([IND0005,ACC0146,378], [IND0005,ACC0201,34], [IND0005,ACC0450,329])]                                                                      |
|[IND0006,Carl,Metcalfe]       |[IND0006,WrappedArray([IND0006,ACC0052,57], [IND0006,ACC0597,547])]                                                                                             |

代码如下:

val test = accountDS.groupByKey(_.customerId).mapGroups case (id, xs) => (id, xs.toSeq)
  test.show(false)

val newTest = customerDS.joinWith(test, customerDS("customerId") === test("_1"), "leftouter")
newTest.show(500,false)

现在我想获取数组并以如下格式输出:

 +----------+-----------+----------+---------------------------------------------------------------------+--------------+------------+-----------------+
 * |customerId|forename   |surname   |accounts                                                             |numberAccounts|totalBalance|averageBalance   |
 * +----------+-----------+----------+---------------------------------------------------------------------+--------------+------------+-----------------+
 * |IND0001   |Christopher|Black     |[]                                                                   |0             |0           |0.0              |
 * |IND0002   |Madeleine  |Kerr      |[[IND0002,ACC0155,323], [IND0002,ACC0262,60]]                        |2             |383         |191.5            |
 * |IND0003   |Sarah      |Skinner   |[[IND0003,ACC0235,631], [IND0003,ACC0486,400], [IND0003,ACC0540,53]] |3             |1084        |361.3333333333333|

注意:我根本不能使用 spark.sql.functions._ --> 培训学院规则 :(

如何获得上述输出,该输出应映射到案例类,如下所示:

case class CustomerAccountOutput(
                                    customerId: String,
                                    forename: String,
                                    surname: String,
                                    //Accounts for this customer
                                    accounts: Seq[AccountData],
                                    //Statistics of the accounts
                                    numberAccounts: Int,
                                    totalBalance: Long,
                                    averageBalance: Double
                                  )

我真的需要这方面的帮助。在没有有效解决方案的情况下坚持了数周。

【问题讨论】:

【参考方案1】:

假设您有以下 DataFrame:

val sourceDf = Seq((1, Array("aa", "CA")), (2, Array("bb", "OH"))).toDF("id", "data_arr")
sourceDf.show()

// output:
+---+--------+
| id|data_arr|
+---+--------+
|  1|[aa, CA]|
|  2|[bb, OH]|
+---+--------+

并且您想将其转换为以下架构:

val destSchema = StructType(Array(
  StructField("id", IntegerType, true),
  StructField("name", StringType, true),
  StructField("state", StringType, true)
))

你可以这样做:

val destDf: DataFrame = sourceDf
.map  sourceRow =>
  Row(sourceRow(0), sourceRow.getAs[mutable.WrappedArray[String]](1)(0), sourceRow.getAs[mutable.WrappedArray[String]](1)(1))
(RowEncoder(destSchema))

destDf.show()

// output:
+---+----+-----+
| id|name|state|
+---+----+-----+
|  1|  aa|   CA| 
|  2|  bb|   OH|
+---+----+-----+

【讨论】:

出现错误:无法解析符号 getAs。 这个 sn-p 是使用 spark 3.0.2 和 scala 2.12 测试的。您是否尝试按原样运行它? 编辑了问题,让您更好地了解正在发生的事情。 sn-p 工作正常,但是当我尝试按照我的代码实现它时,它无法识别 getAs。

以上是关于提取数组元素并将它们映射到案例类的主要内容,如果未能解决你的问题,请参考以下文章

scala 将元组解包到案例类参数和附加的 zip 两个序列中

如何从字符串中提取整个字符串范围的子字符串并将它们放在一个数组中?

pandas索引取数

从字符串中提取单词并将它们移动到数组中

如何比较字典值中的多个数组,并将每个数组元素的字典键映射到新数组/列表中

slice()方法:截取数组中的元素