使用 Databricks 中的原始 SQL 直接查询存储在 Azure Data Lake 中的 avro 数据文件

Posted

技术标签:

【中文标题】使用 Databricks 中的原始 SQL 直接查询存储在 Azure Data Lake 中的 avro 数据文件【英文标题】:Querying avro data files stored in Azure Data Lake directly with raw SQL from Databricks 【发布时间】:2020-06-18 22:13:19 【问题描述】:

我正在使用 Databricks Notebooks 读取存储在 Azure Data Lake Gen2 中的 avro 文件。 avro 文件由事件中心捕获创建,并呈现特定架构。从这些文件中,我只需要提取 Body 字段,我感兴趣的数据实际上存储在该字段中。

我已经在 Python 中实现了这个,它可以按预期工作:

path = 'abfss://file_system@storage_account.dfs.core.windows.net/root/YYYY/MM/DD/HH/mm/file.avro'
df0 = spark.read.format('avro').load(path) # 1
df1 = df0.select(df0.Body.cast('string')) # 2
rdd1 = df1.rdd.map(lambda x: x[0]) # 3
data = spark.read.json(rdd1) # 4

现在我需要将其转换为原始 SQL,以便直接在 SQL 查询中过滤数据。综合以上 4 步,使用 SQL 的第 1 步和第 2 步如下:

CREATE TEMPORARY VIEW file_avro
USING avro
OPTIONS (path "abfss://file_system@storage_account.dfs.core.windows.net/root/YYYY/MM/DD/HH/mm/file.avro")

WITH body_array AS (SELECT cast(Body AS STRING) FROM file_avro)

SELECT * FROM body_array

通过这个部分查询,我得到与上面的 df1 相同的结果(使用 Python 的第 2 步):

Body
["id":"a123","group":"0","value":1.0,"timestamp":"2020-01-01T00:00:00.0000000",
"id":"a123","group":"0","value":1.5,"timestamp":"2020-01-01T00:01:00.0000000",
"id":"a123","group":"0","value":2.3,"timestamp":"2020-01-01T00:02:00.0000000",
"id":"a123","group":"0","value":1.8,"timestamp":"2020-01-01T00:03:00.0000000"]
["id":"b123","group":"0","value":2.0,"timestamp":"2020-01-01T00:00:01.0000000",
"id":"b123","group":"0","value":1.2,"timestamp":"2020-01-01T00:01:01.0000000",
"id":"b123","group":"0","value":2.1,"timestamp":"2020-01-01T00:02:01.0000000",
"id":"b123","group":"0","value":1.7,"timestamp":"2020-01-01T00:03:01.0000000"]
...

我需要知道如何将第 3 步和第 4 步引入 SQL 查询,将字符串解析为 json 对象,最后获得所需的数据帧,其中包含 id、group、value 和 timestamp 列。谢谢。

【问题讨论】:

【参考方案1】:

我发现使用原始 SQL 执行此操作的一种方法如下,使用 from_json Spark SQL 内置函数和 Body 字段的方案:

CREATE TEMPORARY VIEW file_avro
USING avro
OPTIONS (path "abfss://file_system@storage_account.dfs.core.windows.net/root/YYYY/MM/DD/HH/mm/file.avro")

WITH body_array AS (SELECT cast(Body AS STRING) FROM file_avro),
data1 AS (SELECT from_json(Body, 'array<struct<id:string,group:string,value:double,timestamp:timestamp>>') FROM body_array),
data2 AS (SELECT explode(*) FROM data1),
data3 AS (SELECT col.* FROM data2)
SELECT * FROM data3 WHERE id = "a123"     --FILTERING BY CHANNEL ID

它比我在问题中发布的 Python 代码执行得更快,这肯定是因为使用了 from_json 和 Body 的方案来提取其中的数据。我在 PySpark 中的这种方法版本如下所示:

path = 'abfss://file_system@storage_account.dfs.core.windows.net/root/YYYY/MM/DD/HH/mm/file.avro'
df0 = spark.read.format('avro').load(path)
df1 = df0.selectExpr("cast(Body as string) as json_data")
df2 = df1.selectExpr("from_json(json_data, 'array<struct<id:string,group:string,value:double,timestamp:timestamp>>') as parsed_json")
data = df2.selectExpr("explode(parsed_json) as json").select("json.*")

【讨论】:

以上是关于使用 Databricks 中的原始 SQL 直接查询存储在 Azure Data Lake 中的 avro 数据文件的主要内容,如果未能解决你的问题,请参考以下文章

R中databricks中的SQL雪花查询

变量值必须传入 Databricks 直接 sql 查询而不是 spark.sql(""" """)

如何从 Databricks 中的 SQL 语句输出创建变量

使用python截断Databricks中的增量表

将 Azure Databricks 增量表迁移到 Azure Synapse SQL 池

如何计算 Spark SQL(Databricks)中表中的列数?