在 Apache Spark SQL 中对多行进行操作
Posted
技术标签:
【中文标题】在 Apache Spark SQL 中对多行进行操作【英文标题】:Operating on Multiple Rows in Apache Spark SQL 【发布时间】:2015-12-07 18:16:10 【问题描述】:我正在尝试在 Apache Spark SQL 中创建一个函数,该函数对多行数据进行操作,但无法在 Spark 中找到直接执行此操作的方法 - 在 Java 中。
我目前的解决方案是从 Spark DataFrame 中提取数据并放入 Java 列表中进行处理,然后再返回到 Spark DataFrame。这在性能方面并不理想。
最好的选择似乎是Window functions,但不幸的是,这些需要 Hive 上下文,我无权访问。 explode() function 似乎是另一种选择,但同样,这是 Scala 特有的,我无法让它在 Java 中工作。
也许这可以通过将 DataFrame 转换回 RDD 来完成?
如果有人对如何在 Java 中为 Apache Spark SQL 完成此操作有任何提示或建议,我们将不胜感激。谢谢你。
更新:提供示例:
+----------+-----------+------------+
| Item | Timestamp | Difference |
+----------+-----------+------------+
| A | 11:00 | 02:00 |
| A | 13:00 | - |
+----------+-----------+------------+
| B | 09:00 | - |
+----------+-----------+------------+
| C | 15:15 | 00:20 |
| C | 15:35 | 01:30 |
| C | 17:05 | - |
+----------+-----------+------------+
所以在示例中,我尝试对按项目分组的行对进行操作,以计算每个项目行之间的时间差。
使用 SQL 中的 LAG() 和 LEAD() 函数可以完成这样的任务,但这些需要 Spark 中的 Hive。
【问题讨论】:
您能提供一个您希望它如何工作的示例吗?您想要一个函数应用于整个行集还是仅应用于一部分?我的第一个想法是取决于听起来像带有 UDF 的 GROUPBY 的复杂性。 抱歉不清楚 - 我正在尝试对多行而不是整个行集进行操作。我已按要求提供了上面的示例。 【参考方案1】:从 Spark 1.5 开始,您现在可以定义 UDAF 或用户定义的聚合函数,以便您对输入数据组执行自定义聚合。我认为这可能是我所见过的最接近您正在寻找的东西。
通常,您需要创建一个扩展 UserDefinedAggregateFunction
的类并实现涉及初始化、合并和聚合的所需方法。
创建后,您可以将其实例化、注册,然后在 SQL 中使用它。
val myAggregation = new MyAggregation
sqlContext.udf.register("MY_AGG", myAggregation)
https://databricks.com/blog/2015/09/16/spark-1-5-dataframe-api-highlights-datetimestring-handling-time-intervals-and-udafs.html
【讨论】:
谢谢。我熟悉 UDAF,但我的印象是他们对所有数据而不是子集进行操作。我想这可以通过创建所有数据的列表然后在evaluate()
函数中执行时间差的计算来完成 - 但这与从 Spark 中提取数据到列表中的效果不同并且用 Java 处理它?
@ab853 好吧,是的,它们对整个数据集进行操作,但是如果将其与 GROUP BY Item
结合使用,它不会应用于每个子集吗?即使没有分组,您也可以根据字段进行组合,对吗?我对 UDAF 很陌生,但我认为 Spark 会比纯列表更好地优化它。
UserDefinedAggregateFunction 已弃用。 Aggregator[IN, BUF, OUT] 现在应该通过 functions.udaf(agg) 方法注册为 UDF。以上是关于在 Apache Spark SQL 中对多行进行操作的主要内容,如果未能解决你的问题,请参考以下文章
是否可以在 Pyspark 中对 DataFrame 进行子类化?
Apache Spark 在 DataFrame 中插入多行