在 Spark SQL 中使用 collect_list 和 collect_set
Posted
技术标签:
【中文标题】在 Spark SQL 中使用 collect_list 和 collect_set【英文标题】:Use collect_list and collect_set in Spark SQL 【发布时间】:2016-02-20 21:01:30 【问题描述】:根据docs,collect_set
和collect_list
函数应该在 Spark SQL 中可用。但是,我无法让它工作。我正在使用Docker image 运行 Spark 1.6.0。
我正在尝试在 Scala 中执行此操作:
import org.apache.spark.sql.functions._
df.groupBy("column1")
.agg(collect_set("column2"))
.show()
并在运行时收到以下错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException: undefined function collect_set;
也尝试使用pyspark
,但也失败了。文档说明这些函数是 Hive UDAF 的别名,但我无法确定启用这些函数。
如何解决这个问题?谢谢!
【问题讨论】:
【参考方案1】:Spark 2.0+:
SPARK-10605 引入了原生 collect_list
和 collect_set
实现。不再需要具有 Hive 支持的 SparkSession
或 HiveContext
。
Spark 2.0-SNAPSHOT (2016-05-03 之前):
您必须为给定的SparkSession
启用 Hive 支持:
在 Scala 中:
val spark = SparkSession.builder
.master("local")
.appName("testing")
.enableHiveSupport() // <- enable Hive support.
.getOrCreate()
在 Python 中:
spark = (SparkSession.builder
.enableHiveSupport()
.getOrCreate())
Spark :
为了能够使用 Hive UDF(请参阅 https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF),您已经使用了支持 Hive 构建的 Spark(当您使用预构建的二进制文件时已经涵盖了这一点,这里似乎就是这种情况)并使用初始化 SparkContext
HiveContext
.
在 Scala 中:
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.SQLContext
val sqlContext: SQLContext = new HiveContext(sc)
在 Python 中:
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
【讨论】:
那么文档中的 1.6.1 是可用的“@since 1.6.0”,但我仍然收到该错误 嘿@zero323,我正在尝试使用 Spark 1.5.0 中的 'collect_list' 功能。我已经创建了配置单元上下文,但不知道如何导入该函数。这不会编译: .groupBy(providerData("PRVSEQ"), providerData("PROV_NUM")) .agg(collect_list(regexp_replace(triggerReport("match_type"), "_(Individual|Practice)Model.", "" ))) @VijayRatnagiri 已在 1.6 中引入。据我记得,您应该能够在 1.5 中对已注册的临时表使用原始 SQL 查询。 @KhachaturStepanyan 在 1.6 中你仍然需要 Hive 支持。以上是关于在 Spark SQL 中使用 collect_list 和 collect_set的主要内容,如果未能解决你的问题,请参考以下文章
如何在 spark dataframes/spark sql 中使用模式读取 json