在 Spark SQL 中使用 collect_list 和 collect_set

Posted

技术标签:

【中文标题】在 Spark SQL 中使用 collect_list 和 collect_set【英文标题】:Use collect_list and collect_set in Spark SQL 【发布时间】:2016-02-20 21:01:30 【问题描述】:

根据docs,collect_setcollect_list 函数应该在 Spark SQL 中可用。但是,我无法让它工作。我正在使用Docker image 运行 Spark 1.6.0。

我正在尝试在 Scala 中执行此操作:

import org.apache.spark.sql.functions._ 

df.groupBy("column1") 
  .agg(collect_set("column2")) 
  .show() 

并在运行时收到以下错误:

Exception in thread "main" org.apache.spark.sql.AnalysisException: undefined function collect_set; 

也尝试使用pyspark,但也失败了。文档说明这些函数是 Hive UDAF 的别名,但我无法确定启用这些函数。

如何解决这个问题?谢谢!

【问题讨论】:

【参考方案1】:

Spark 2.0+

SPARK-10605 引入了原生 collect_listcollect_set 实现。不再需要具有 Hive 支持的 SparkSessionHiveContext

Spark 2.0-SNAPSHOT (2016-05-03 之前)

您必须为给定的SparkSession 启用 Hive 支持:

在 Scala 中:

val spark = SparkSession.builder
  .master("local")
  .appName("testing")
  .enableHiveSupport()  // <- enable Hive support.
  .getOrCreate()

在 Python 中:

spark = (SparkSession.builder
    .enableHiveSupport()
    .getOrCreate())

Spark :

为了能够使用 Hive UDF(请参阅 https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF),您已经使用了支持 Hive 构建的 Spark(当您使用预构建的二进制文件时已经涵盖了这一点,这里似乎就是这种情况)并使用初始化 SparkContext HiveContext.

在 Scala 中:

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.SQLContext

val sqlContext: SQLContext = new HiveContext(sc) 

在 Python 中:

from pyspark.sql import HiveContext

sqlContext = HiveContext(sc)

【讨论】:

那么文档中的 1.6.1 是可用的“@since 1.6.0”,但我仍然收到该错误 嘿@zero323,我正在尝试使用 Spark 1.5.0 中的 'collect_list' 功能。我已经创建了配置单元上下文,但不知道如何导入该函数。这不会编译: .groupBy(providerData("PRVSEQ"), providerData("PROV_NUM")) .agg(collect_list(regexp_replace(triggerReport("match_type"), "_(Individual|Practice)Model.", "" ))) @VijayRatnagiri 已在 1.6 中引入。据我记得,您应该能够在 1.5 中对已注册的临时表使用原始 SQL 查询。 @KhachaturStepanyan 在 1.6 中你仍然需要 Hive 支持。

以上是关于在 Spark SQL 中使用 collect_list 和 collect_set的主要内容,如果未能解决你的问题,请参考以下文章

如何在 spark dataframes/spark sql 中使用模式读取 json

在查询中使用通配符在 Spark.SQL() 中转义字符

无法在 spark 中使用 hive 支持创建 sql 上下文

在 spark.sql 的选择中使用 cast()

sql 在 spark sql 数据帧查询中使用分区

如何在数据块中使用 Spark sql 连接 Spark 数据框列