如何在 AWS 胶水上将嵌套的 JSON 扩展为 Spark 数据框

Posted

技术标签:

【中文标题】如何在 AWS 胶水上将嵌套的 JSON 扩展为 Spark 数据框【英文标题】:How to expand nested JSON into Spark dataframe on AWS glue 【发布时间】:2022-01-03 07:41:45 【问题描述】:

使用以下营销 JSON 文件


    "request_id": "xx",
    "timeseries_stats": [
        
            "timeseries_stat": 
                "id": "xx",
                "timeseries": [
                    
                        "start_time": "xx",
                        "end_time": "xx",
                        "stats": 
                            "impressions": xx,
                            "swipes": xx,
                            "view_completion": xx,
                            "spend": xx
                        
                    ,
                    
                        "start_time": "xx",
                        "end_time": "xx",
                        "stats": 
                            "impressions": xx,
                            "swipes": xx,
                            "view_completion": xx,
                            "spend": xx
                        
                    

我可以很容易地使用 pandas 解析这个并获得所需格式的数据帧

start_time   end_time     impressions   swipes   view_completion    spend
    xx          xx             xx         xx            xx            xx
    xx          xx             xx         xx            xx            xx

但需要在 AWS Glue 上使用 spark。

使用创建初始 spark 数据帧 (df) 后

rdd = sc.parallelize(JSON_resp['timeseries_stats'][0]['timeseries_stat']['timeseries'])
df = rdd.toDF()

我尝试按如下方式扩展 stats

df_expanded = df.select("start_time","end_time","stats.*")

错误:

AnalysisException: 'Can only star expand struct data types. 
Attribute: `ArrayBuffer(stats)`;'

&

from pyspark.sql.functions import explode
df_expanded = df.select("start_time","end_time").withColumn("stats", explode(df.stats))

错误:

AnalysisException: 'The number of aliases supplied in the AS clause does not match the 
number of columns output by the UDTF expected 2 aliases but got stats ;

spark 很新,对于这两种方法中的任何一种,任何帮助都将不胜感激!

这是一个非常相似的问题:

parse array of dictionaries from JSON with Spark

除了我需要展平这个额外的统计键。

【问题讨论】:

【参考方案1】:

当你 explode 一个地图列时,它会给你两列,所以 .withColumn 不起作用。将explodeselect 语句一起使用。

from pyspark.sql import functions as f

df.select('start_time', 'end_time', f.explode('stats')) \
  .groupBy('start_time', 'end_time').pivot('key').agg(f.first('value')).show()

+----------+--------+-----------+-----+------+---------------+
|start_time|end_time|impressions|spend|swipes|view_completion|
+----------+--------+-----------+-----+------+---------------+
|        yy|      yy|         yy|   yy|    yy|             yy|
|        xx|      xx|         xx|   xx|    xx|             xx|
+----------+--------+-----------+-----+------+---------------+

【讨论】:

谢谢!我唯一要做的就是声明import pyspark.sql.functions as f 让它工作。

以上是关于如何在 AWS 胶水上将嵌套的 JSON 扩展为 Spark 数据框的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 pyspark 在 aws 胶水中展平嵌套 json 中的数组?

在 Redshift 中以通用方式将 JSON 数据扩展为新列

将列内包含 JSON 数组的行扩展为多行

如何将嵌套的Struct列展开为多列?

如何在Python中将列表扩展为函数参数[重复]

我可以将 Enum 扩展为 Enum.GetValues() 吗? [复制]