Spark中来自关系数据模型的树/嵌套结构
Posted
技术标签:
【中文标题】Spark中来自关系数据模型的树/嵌套结构【英文标题】:Tree/nested structures in Spark from relational data model 【发布时间】:2019-03-17 19:10:29 【问题描述】:如果我理解正确,我可以将 spark 数据集视为 T
类型的对象列表。如何以父包含子列表的方式连接两个数据集?但是一个孩子也会有自己孩子的名单......
解决这个问题的一种方法是根据键对子项进行groupBy
,但collect_list
只返回一列,我想有更好的方法来做到这一点。
想要的结果基本上是Customer
类型的数据集(客户对象列表?),但还有一些补充:
最终结果将类似于
case class Customer(customer_id: Int, name: String, address: String, age: Int, invoices: List[Invoices])
case class Invoice(invoice_id: Int, customer_id: Int, invoice_num:String, date: Int, invoice_type: String, items: List[Items])
为此,我需要来自以下输入:
case class Customer(customer_id: Int, name: String, address: String, age: Int)
case class Invoice(invoice_id: Int, customer_id: Int, invoice_num:String, date: Int, invoice_type: String)
case class InvoiceItem(item_id: Int, invoice_id: Int, num_of_items: Int, price: Double, total: Double)
val customers_df = Seq(
(11,"customer1", "address1", 10, "F")
,(12,"customer2", "address2", 20, "M")
,(13,"customer3", "address3", 30, "F")
).toDF("customer_id", "name", "address", "age", "sex")
val customers_ds = customers_df.as[Customer].as("c")
customers_ds.show
val invoices_df = Seq(
(21,11, "10101/1", 20181105, "manual")
,(22,11, "10101/2", 20181105, "manual")
,(23,11, "10101/3", 20181105, "manual")
,(24,12, "10101/4", 20181105, "generated")
,(25,12, "10101/5", 20181105, "pos")
).toDF("invoice_id", "customer_id", "invoice_num", "date", "invoice_type")
val invoices_ds = invoices_df.as[Invoice].as("i")
invoices_ds.show
val invoice_items_df = Seq(
(31, 21, 5, 10.0, 50.0)
,(32, 21, 3, 15.0, 45.0)
,(33, 22, 6, 11.0, 66.0)
,(34, 22, 7, 2.0, 14.0)
,(35, 23, 1, 100.0, 100.0)
,(36, 24, 4, 4.0, 16.0)
).toDF("item_id", "invoice_id", "num_of_items", "price", "total")
val invoice_items_ds = invoice_items_df.as[InvoiceItem].as("ii")
invoice_items_ds.show
在表格中是这样的:
+-----------+---------+--------+---+---+
|customer_id| name| address|age|sex|
+-----------+---------+--------+---+---+
| 11|customer1|address1| 10| F|
| 12|customer2|address2| 20| M|
| 13|customer3|address3| 30| F|
+-----------+---------+--------+---+---+
+----------+-----------+-----------+--------+------------+
|invoice_id|customer_id|invoice_num| date|invoice_type|
+----------+-----------+-----------+--------+------------+
| 21| 11| 10101/1|20181105| manual|
| 22| 11| 10101/2|20181105| manual|
| 23| 11| 10101/3|20181105| manual|
| 24| 12| 10101/4|20181105| generated|
| 25| 12| 10101/5|20181105| pos|
+----------+-----------+-----------+--------+------------+
+-------+----------+------------+-----+-----+
|item_id|invoice_id|num_of_items|price|total|
+-------+----------+------------+-----+-----+
| 31| 21| 5| 10.0| 50.0|
| 32| 21| 3| 15.0| 45.0|
| 33| 22| 6| 11.0| 66.0|
| 34| 22| 7| 2.0| 14.0|
| 35| 23| 1|100.0|100.0|
| 36| 24| 4| 4.0| 16.0|
+-------+----------+------------+-----+-----+
【问题讨论】:
【参考方案1】:您似乎正在尝试将规范化数据读入 Scala 对象树。你当然可以用 Spark 做到这一点,但 Spark 可能不是最好的工具。如果数据足够小以适合内存,我认为您的问题是正确的,那么对象关系映射(ORM)库可能更适合这项工作。
如果您仍想使用 Spark,那么 groupBy
和 collect_list
就在正确的道路上。您缺少的是 struct()
函数。
case class Customer(id: Int)
case class Invoice(id: Int, customer_id: Int)
val customers = spark.createDataset(Seq(Customer(1))).as("customers")
val invoices = spark.createDataset(Seq(Invoice(1, 1), Invoice(2, 1)))
case class CombinedCustomer(id: Int, invoices: Option[Seq[Invoice]])
customers
.join(
invoices
.groupBy('customer_id)
.agg(collect_list(struct('*)).as("invoices"))
.withColumnRenamed("customer_id", "id"),
Seq("id"), "left_outer")
.as[CombinedCustomer]
.show
struct('*)
从整行构建StructType
列。您还可以选择任何列,例如,struct('x.as("colA"), 'colB)
。
这会产生
+---+----------------+
| id| invoices|
+---+----------------+
| 1|[[1, 1], [2, 1]]|
+---+----------------+
现在,如果客户数据预计不适合内存,即不能使用简单的collect
,您可以采取多种不同的策略。
最简单的一种,您应该考虑而不是收集给司机,要求独立处理每个客户的数据是可以接受的。在这种情况下,请尝试使用 map
并将每个客户的处理逻辑分配给工作人员。
如果不能接受客户独立处理,一般策略如下:
使用上述方法根据需要将数据聚合到结构化行中。
重新分区数据以确保处理所需的所有内容都在一个分区中。
(可选)sortWithinPartitions
以确保分区中的数据按照您的需要进行排序。
使用mapPartitions
。
【讨论】:
数据量将变得“大”。这里的大意味着每天输入和处理 1 亿条记录。在不考虑模拟/预测的情况下,预计输出将大 200 倍。这就是为什么火花摆在桌面上的原因。struct('*)
不是在这里缺少报价吗?由于它可以使用所有列来创建StructType
,有没有办法根据来自数据库表的类型(int、date...)推断类成员?
输出的总大小与您的处理是否针对每个客户无关紧要。如果输出大于输入,则保留聚合的每个客户数据。如果您需要跨客户进行处理,事情会变得更加复杂,我无法在不了解细节的情况下推荐最佳方法。如果在处理过程中您只需要发票和物品,请确保您使用的是 Spark 2.4+,因为它对读取部分结构数据进行了一些优化。
这个例子很简单地说明了问题;我实际上在做的是同样的事情,但贷款。客户确实发挥了作用,并且必须以在单个工作人员上处理一个客户的所有数据的方式对数据进行分区。可以避免的是将发票(贷款)嵌套在客户对象中,我可以将所有计算基于贷款本身,然后再左连接客户信息,因为它在聚合中起作用。尽管如此,我需要嵌套其他东西以将其包含在一笔贷款中。
这可能值得自己提出问题,但这里是您如何解决问题的要点。首先,聚合到您需要的级别,如上所示。然后,按客户 ID 进行分区,以确保所有客户数据在同一个分区中,并在客户 ID(和贷款 ID 等)上使用 sortWithinPartitions
,以便所有客户数据相邻 + 按您的需要排序。如果您大量增加分区数量,您也许可以使用mapPartitions
并以这种方式轻松处理所有内容。如果你想变得花哨,你可以使用数据的排序来进行高效的迭代。
谢谢你的回答,这就是我要找的。span>
【参考方案2】:
您可以使用 Spark-SQL 并为客户、发票和商品各拥有一个数据集。 然后,您可以简单地在这些数据集之间使用连接和聚合函数来获得所需的输出。
Spark SQL 在 sql 风格和编程方式之间的性能差异可以忽略不计。
【讨论】:
以上是关于Spark中来自关系数据模型的树/嵌套结构的主要内容,如果未能解决你的问题,请参考以下文章