Spark:根据 s3 文件中的字段动态生成查询

Posted

技术标签:

【中文标题】Spark:根据 s3 文件中的字段动态生成查询【英文标题】:Spark : Dynamic generation of the query based on the fields in s3 file 【发布时间】:2020-03-19 13:28:12 【问题描述】:

过于简单的场景: 在 s3 文件中生成每月数据的过程。每个月运行的字段数可能不同。基于 s3 中的这些数据,我们将数据加载到表中,然后我们手动(因为每次运行中的字段数量可能会随着添加或删除几列而改变)运行 SQL 以获取少数指标。对此有更多的计算/转换数据,但要让初学者展示用例的更简单版本。

方法: 考虑到无模式的性质,因为每次运行时 s3 文件中的字段数量可能会有所不同,添加/删除少量字段,这需要每次在 SQL 中手动更改,我计划探索 Spark/Scala,以便我们可以直接从s3读取,根据字段动态生成SQL。

查询: 我如何在 scala/spark-SQL/dataframe 中实现这一点? s3 文件仅包含每次运行所需的字段。因此从 s3 读取动态字段没有问题,因为它由数据帧处理。问题是我们如何生成 SQL 数据帧-API/spark-SQL 代码来处理。

我可以通过数据帧读取 s3 文件并将数据帧注册为 createOrReplaceTempView 以编写 SQL,但我认为在下次运行期间在 s3 中添加新字段期间手动更改 spark-SQL 没有帮助。动态生成 sql 的最佳方法是什么/处理问题的更好方法是什么?

用例 1:

首轮

dataframe: customer,1st_month_count(这里dataframe直接指向s3,只有需要的属性)

--sample code
SELECT customer,sum(month_1_count)
FROM dataframe
GROUP BY customer

--Dataframe API/SparkSQL
dataframe.groupBy("customer").sum("month_1_count").show()

第二次运行 - 添加了一列

dataframe: customer,month_1_count,month_2_count) (这里dataframe直接指向s3,只有需要的属性)

--Sample SQL
SELECT customer,sum(month_1_count),sum(month_2_count)
FROM dataframe
GROUP BY customer

--Dataframe API/SparkSQL
dataframe.groupBy("customer").sum("month_1_count","month_2_count").show() 

我是 Spark/Scala 的新手,如果您能提供指导以便我进一步探索,将会很有帮助。

【问题讨论】:

【参考方案1】:

听起来您想对出现在数据框架构中的新列一遍又一遍地执行相同的操作?这有效:

from pyspark.sql import functions

#search for column names you want to sum, I put in "month"

column_search = lambda col_names: 'month' in col_names

#get column names of temp dataframe w/ only the columns you want to sum

relevant_columns = original_df.select(*filter(column_search, original_df.columns)).columns

#create dictionary with relevant column names to be passed to the agg function

columns = col_names: "sum" for col_names in relevant_columns

#apply agg function with your groupBy, passing in columns dictionary

grouped_df = original_df.groupBy("customer").agg(columns)

#show result

grouped_df.show()

一些重要的概念可以帮助你学习:

    DataFrame 的数据属性存储在列表中:dataframe.columns

    可以将函数应用于列表以创建新列表,如“column_search”

    Agg 函数接受字典中的多个表达式,如 here 所述,这是我传递给“列”的内容

    Spark 是惰性的,因此在您执行 show() 之类的操作之前,它不会更改数据状态或执行操作。这意味着写出临时数据帧以使用数据帧的一个元素(如我所做的列)并不昂贵,即使如果您习惯使用 SQL,它可能看起来效率低下。

【讨论】:

感谢凯文的回答。我不确定是否完全遵循,让我阅读更多关于您提供的详细信息并返回。

以上是关于Spark:根据 s3 文件中的字段动态生成查询的主要内容,如果未能解决你的问题,请参考以下文章

如何告诉 Spark 根据范围跳过分区

在 AWS Athena 中查询第一个非空值的动态 JSON 字段

Athena 根据 S3 中引入的分区动态查询更改

spark sql 无法在 S3 中查询镶木地板分区

动静态链接库

使用 Spark 通过 s3a 将 parquet 文件写入 s3 非常慢