将 spark 数据帧聚合转换为 SQL 查询; window、groupby 的问题,以及如何聚合?
Posted
技术标签:
【中文标题】将 spark 数据帧聚合转换为 SQL 查询; window、groupby 的问题,以及如何聚合?【英文标题】:Translating spark dataframe aggregations to SQL query; problems with window, groupby, and how to aggregate? 【发布时间】:2018-05-28 00:29:01 【问题描述】:我正在处理 Spark: The Definitive Guide 中的数据,我使用 Java 只是为了全面发展。
我正在从 CSV 中读取数据并创建一个临时视图表,如下所示:
Dataset<Row> staticDataFrame = spark.read().format("csv").option("header","true").option("inferSchema","true").load("/data/retail-data/by-day/*.csv");
staticDataFrame.createOrReplaceTempView("SalesInfo");
spark.sql("SELECT CustomerID, (UnitPrice * Quantity) AS total_cost, InvoiceDate from SalesInfo").show(10);
这工作正常并返回以下数据:
+----------+------------------+--------------------+
|CustomerID| total_cost| InvoiceDate|
+----------+------------------+--------------------+
| 14075.0| 85.92|2011-12-05 08:38:...|
| 14075.0| 25.0|2011-12-05 08:38:...|
| 14075.0|39.599999999999994|2011-12-05 08:38:...|
| 14075.0| 30.0|2011-12-05 08:38:...|
| 14075.0|15.299999999999999|2011-12-05 08:38:...|
| 14075.0| 40.8|2011-12-05 08:38:...|
| 14075.0| 39.6|2011-12-05 08:38:...|
| 14075.0| 40.56|2011-12-05 08:38:...|
| 18180.0| 17.0|2011-12-05 08:39:...|
| 18180.0| 17.0|2011-12-05 08:39:...|
+----------+------------------+--------------------+
only showing top 10 rows
当我尝试按 CustomerID 对其进行分组时遇到问题,但是当我尝试按 CustomerID 对其进行分组时,
spark.sql("SELECT CustomerID, (UnitPrice * Quantity) AS total_cost, InvoiceDate from SalesInfo GROUP BY CustomerID").show(10);
我明白了:
Exception in thread "main" org.apache.spark.sql.AnalysisException: expression 'salesinfo.`UnitPrice`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.
我知道我做错了什么,即它不知道如何汇总 total_cost 和发票日期,但我不知道如何在 SQL 方面做到这一点;这本书使用火花聚合函数并这样做:
selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")
.groupBy( col("CustomerId"), window(col("InvoiceDate"), "1 day")) .sum("total_cost")
但我试图了解如何将 SQL 语句作为学习练习来做到这一点。
感谢任何有关如何通过查询执行此操作的帮助。
我试图弄清楚如何做到这一点,我只获得每个客户 ID 的总计,然后如何获得本书 spark 语句的全部功能,其中总金额按客户 ID 分解为小时。
谢谢
编辑:这是数据的来源;我只是将其全部读入一个数据集
https://github.com/databricks/Spark-The-Definitive-Guide/tree/master/data/retail-data/by-day
【问题讨论】:
您是否在某处获得了指向数据的链接,以便我们可以将其导入以尝试重现您正在做的事情? 好主意,我会从作者的网站上添加它 【参考方案1】:所以,我在 SQL 中将您所说的解释为 UnitPrice * 每个客户每小时的数量:
select
customerid,
sum(unitprice * quantity) as total_cost,
cast(cast(InvoiceDate as date) as varchar) + ' ' + cast(DATEPART(HH, InvoiceDate) as varchar) + ':00'
from [retail-data]
group by CustomerID, cast(cast(InvoiceDate as date) as varchar) + ' ' + cast(DATEPART(HH, InvoiceDate) as varchar) + ':00'
【讨论】:
【参考方案2】:要完成 Geoff Hacker 的回答,您可以在 DataFrame 对象上使用explain(true)
方法来查看执行计划:
== Physical Plan ==
*(2) HashAggregate(keys=[CustomerId#16, window#41], functions= [sum(total_cost#26)], output=[CustomerId#16, window#41, sum(total_cost)#35])
+- Exchange hashpartitioning(CustomerId#16, window#41, 200)
+- *(1) HashAggregate(keys=[CustomerId#16, window#41], functions=[partial_sum(total_cost#26)], output=[CustomerId#16, window#41, sum#43])
+- *(1) Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(InvoiceDate#14, TimestampType, LongType) - 0) as double) / 8.64E10)) as double) = (cast((precisetimestampconversion(InvoiceDate#14, TimestampType, LongType) - 0) as double) / 8.64E10)) THEN (CEIL((cast((precisetimestampconversion(InvoiceDate#14, TimestampType, LongType) - 0) as double) / 8.64E10)) + 1) ELSE CEIL((cast((precisetimestampconversion(InvoiceDate#14, TimestampType, LongType) - 0) as double) / 8.64E10)) END + 0) - 1) * 86400000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(InvoiceDate#14, TimestampType, LongType) - 0) as double) / 8.64E10)) as double) = (cast((precisetimestampconversion(InvoiceDate#14, TimestampType, LongType) - 0) as double) / 8.64E10)) THEN (CEIL((cast((precisetimestampconversion(InvoiceDate#14, TimestampType, LongType) - 0) as double) / 8.64E10)) + 1) ELSE CEIL((cast((precisetimestampconversion(InvoiceDate#14, TimestampType, LongType) - 0) as double) / 8.64E10)) END + 0) - 1) * 86400000000) + 86400000000), LongType, TimestampType)) AS window#41, CustomerId#16, (UnitPrice#15 * cast(Quantity#13 as double)) AS total_cost#26]
+- *(1) Filter isnotnull(InvoiceDate#14)
+- *(1) FileScan csv [Quantity#13,InvoiceDate#14,UnitPrice#15,CustomerID#16] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/tmp/spark/retail/2010-12-01.csv, file:/tmp/spar..., PartitionFilters: [], PushedFilters: [IsNotNull(InvoiceDate)], ReadSchema: struct<Quantity:int,InvoiceDate:timestamp,UnitPrice:double,CustomerID:double>
如您所见,Spark 从 CustomerId 和窗口(每天 00:00:00 - 23:59:59)[HashAggregate(keys=[CustomerId#16, window#41]
] 创建一个聚合键,并将具有此类键的所有行移动到单个分区(@ 987654324@)。在分区之间移动数据的这一事实称为随机操作。稍后它会对这些累积的数据执行 SUM(...) 函数。
也就是说,带有 1 个键的 GROUP BY 表达式应该只为该键生成 1 行。因此,如果在初始查询中将 CustomerID 定义为键,并且在投影中使用 InvoiceDate 定义 total_cost,则引擎将无法为 CustomerID 获取 1 行,因为 1 个 CustomerID 可以有多个 InvoiceDate。 SQL语言也不例外。
【讨论】:
以上是关于将 spark 数据帧聚合转换为 SQL 查询; window、groupby 的问题,以及如何聚合?的主要内容,如果未能解决你的问题,请参考以下文章
如何将 scala spark.sql.dataFrame 转换为 Pandas 数据框
Spark多个动态聚合函数,countDistinct不起作用
将 spark.sql 查询转换为 spark/scala 查询
在 Spark 中执行聚合函数时出错:ArrayType 无法转换为 org.apache.spark.sql.types.StructType