使用 Databricks 中的原始 SQL 直接查询存储在 Azure Data Lake 中的 avro 数据文件
Posted
技术标签:
【中文标题】使用 Databricks 中的原始 SQL 直接查询存储在 Azure Data Lake 中的 avro 数据文件【英文标题】:Querying avro data files stored in Azure Data Lake directly with raw SQL from Databricks 【发布时间】:2020-06-18 22:13:19 【问题描述】:我正在使用 Databricks Notebooks 读取存储在 Azure Data Lake Gen2 中的 avro 文件。 avro 文件由事件中心捕获创建,并呈现特定架构。从这些文件中,我只需要提取 Body 字段,我感兴趣的数据实际上存储在该字段中。
我已经在 Python 中实现了这个,它可以按预期工作:
path = 'abfss://file_system@storage_account.dfs.core.windows.net/root/YYYY/MM/DD/HH/mm/file.avro'
df0 = spark.read.format('avro').load(path) # 1
df1 = df0.select(df0.Body.cast('string')) # 2
rdd1 = df1.rdd.map(lambda x: x[0]) # 3
data = spark.read.json(rdd1) # 4
现在我需要将其转换为原始 SQL,以便直接在 SQL 查询中过滤数据。综合以上 4 步,使用 SQL 的第 1 步和第 2 步如下:
CREATE TEMPORARY VIEW file_avro
USING avro
OPTIONS (path "abfss://file_system@storage_account.dfs.core.windows.net/root/YYYY/MM/DD/HH/mm/file.avro")
WITH body_array AS (SELECT cast(Body AS STRING) FROM file_avro)
SELECT * FROM body_array
通过这个部分查询,我得到与上面的 df1 相同的结果(使用 Python 的第 2 步):
Body
["id":"a123","group":"0","value":1.0,"timestamp":"2020-01-01T00:00:00.0000000",
"id":"a123","group":"0","value":1.5,"timestamp":"2020-01-01T00:01:00.0000000",
"id":"a123","group":"0","value":2.3,"timestamp":"2020-01-01T00:02:00.0000000",
"id":"a123","group":"0","value":1.8,"timestamp":"2020-01-01T00:03:00.0000000"]
["id":"b123","group":"0","value":2.0,"timestamp":"2020-01-01T00:00:01.0000000",
"id":"b123","group":"0","value":1.2,"timestamp":"2020-01-01T00:01:01.0000000",
"id":"b123","group":"0","value":2.1,"timestamp":"2020-01-01T00:02:01.0000000",
"id":"b123","group":"0","value":1.7,"timestamp":"2020-01-01T00:03:01.0000000"]
...
我需要知道如何将第 3 步和第 4 步引入 SQL 查询,将字符串解析为 json 对象,最后获得所需的数据帧,其中包含 id、group、value 和 timestamp 列。谢谢。
【问题讨论】:
【参考方案1】:我发现使用原始 SQL 执行此操作的一种方法如下,使用 from_json Spark SQL 内置函数和 Body 字段的方案:
CREATE TEMPORARY VIEW file_avro
USING avro
OPTIONS (path "abfss://file_system@storage_account.dfs.core.windows.net/root/YYYY/MM/DD/HH/mm/file.avro")
WITH body_array AS (SELECT cast(Body AS STRING) FROM file_avro),
data1 AS (SELECT from_json(Body, 'array<struct<id:string,group:string,value:double,timestamp:timestamp>>') FROM body_array),
data2 AS (SELECT explode(*) FROM data1),
data3 AS (SELECT col.* FROM data2)
SELECT * FROM data3 WHERE id = "a123" --FILTERING BY CHANNEL ID
它比我在问题中发布的 Python 代码执行得更快,这肯定是因为使用了 from_json 和 Body 的方案来提取其中的数据。我在 PySpark 中的这种方法版本如下所示:
path = 'abfss://file_system@storage_account.dfs.core.windows.net/root/YYYY/MM/DD/HH/mm/file.avro'
df0 = spark.read.format('avro').load(path)
df1 = df0.selectExpr("cast(Body as string) as json_data")
df2 = df1.selectExpr("from_json(json_data, 'array<struct<id:string,group:string,value:double,timestamp:timestamp>>') as parsed_json")
data = df2.selectExpr("explode(parsed_json) as json").select("json.*")
【讨论】:
以上是关于使用 Databricks 中的原始 SQL 直接查询存储在 Azure Data Lake 中的 avro 数据文件的主要内容,如果未能解决你的问题,请参考以下文章
变量值必须传入 Databricks 直接 sql 查询而不是 spark.sql(""" """)
如何从 Databricks 中的 SQL 语句输出创建变量