将 SQL 查询转换为 Spark Dataframe 结构化数据处理

Posted

技术标签:

【中文标题】将 SQL 查询转换为 Spark Dataframe 结构化数据处理【英文标题】:Converting SQL query to Spark Dataframe structured data processing 【发布时间】:2018-08-19 06:45:55 【问题描述】:

我想convert下面的queryspark dataframe

sqlContext.sql("SELECT d.dep_name,count(*) FROM employees e,department d WHERE e.dep_id = d.dep_id GROUP BY d.dep_name HAVING count(*) >= 2").show  

输出:

+---------+---+                                                                 
| dep_name|_c1|
+---------+---+
|  FINANCE|  3|
|    AUDIT|  5|
|MARKETING|  6|

我使用以下查询进行了尝试:

scala> finalEmployeesDf.as("df1").join(depDf.as("df2"), $"df1.dep_id" === $"df2.dep_id").select($"dep_name").groupBy($"dep_name").count.show()
+---------+-----+                                                               
| dep_name|count|
+---------+-----+
|  FINANCE|    3|
|    AUDIT|    5|
|MARKETING|    6|
+---------+-----+  

我知道这个isn't correct 因为假设我们有一个部门只有一个条目的情况,那么它也会列在这些结果中,但我希望只有在counts are greater than 2 时才显示结果。那么我怎样才能做到这一点???我试过谷歌搜索,但在这种情况下没有帮助。

【问题讨论】:

查询和数据帧操作之间没有性能差异,那么为什么需要这样做呢? 我只是从认证角度学习@cricket_007 【参考方案1】:

您的组和聚合部分有误。您需要选择所有相关的列,按相关的分组和聚合一次。以下是代表正确方法的未经测试的代码:

finalEmployeesDf.as("df1")
 .join(depDf.as("df2"), $"df1.dep_id" === $"df2.dep_id")
 .select($"dep_name")
 .groupBy($"dep_name")
 .agg(count($"dep_name").as("cnt"))
 .filter($"cnt" > 2)
 .show()

一般的建议是尝试将 API 调用分成几行,这会使阅读和理解大大更容易。

【讨论】:

就像一个魅力。但我必须在计数之前删除函数。那有什么意义 @Debuggerr 这没关系,count 是functions 包中的一个函数。它包含许多用于处理数据帧的内置函数,count 是其中之一,在 groupBy 之后应用。你真的应该看看他们:spark.apache.org/docs/latest/api/scala/… @Debuggerr 也请考虑接受/赞成答案,如果它有效并帮助你 但是当我尝试执行你的代码时,它会抛出一个找不到函数的错误。我正在开发 1.6 版本的 spark。那么我是否需要明确导入它才能使其正常工作?? @Debuggerr 对于 pyspark 使用 from pyspark.sql.functions import *,对于 Scala/Java 我总是使用函数。,但你可以使用等效的 import org.apache.spark.sql.functions;【参考方案2】:

试试这样的:

DF.groupBy("x").agg(count("*").alias("cnt")).where($"cnt" > 2)

【讨论】:

简单又甜美!

以上是关于将 SQL 查询转换为 Spark Dataframe 结构化数据处理的主要内容,如果未能解决你的问题,请参考以下文章

将 SQL 查询转换为 Spark Dataframe 结构化数据处理

将 spark 数据帧聚合转换为 SQL 查询; window、groupby 的问题,以及如何聚合?

sql 查询在 zeppelin 中转换为 spark lang

在 Spark SQL 中将 long 类型的列转换为 calendarinterval 类型

DataFrame编程模型初谈与Spark SQL

将转换从 hive sql 查询转移到 Spark