如何使用 pyspark 在 aws 胶水中展平嵌套 json 中的数组?

Posted

技术标签:

【中文标题】如何使用 pyspark 在 aws 胶水中展平嵌套 json 中的数组?【英文标题】:How to flatten an array in a nested json in aws glue using pyspark? 【发布时间】:2019-10-04 15:51:11 【问题描述】:

我正在尝试将 JSON 文件展平,以便能够将其全部加载到 AWS Glue 中的 PostgreSQL 中。我正在使用 PySpark。我使用爬虫爬取 S3 JSON 并生成一个表。然后我使用 ETL Glue 脚本:

读取爬取的表格 使用“关系化”函数来展平文件 将动态帧转换为数据帧 尝试“分解” request.data 字段

到目前为止的脚本:

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = glue_source_database, table_name = glue_source_table, transformation_ctx = "datasource0")

df0 = Relationalize.apply(frame = datasource0, staging_path = glue_temp_storage, name = dfc_root_table_name, transformation_ctx = "dfc")

df1 = df0.select(dfc_root_table_name)

df2 = df1.toDF()

df2 = df1.select(explode(col('`request.data`')).alias("request_data"))

<then i write df1 to a PostgreSQL database which works fine>

我面临的问题:

“Relationalize”函数运行良好,但 request.data 字段变为 bigint,因此“explode”不起作用。

由于数据结构的原因,如果不首先在 JSON 上使用“关系化”,就无法进行分解。具体错误是:“org.apache.spark.sql.AnalysisException:由于数据类型不匹配,无法解析'explode(request.data)':函数explode的输入应该是数组或映射类型,而不是bigint”

如果我尝试先将动态帧设为数据帧,则会出现以下问题:“py4j.protocol.Py4JJavaError:调用 o72.jdbc 时发生错误。 : java.lang.IllegalArgumentException: Can't get JDBC type for struct..."

我还尝试上传一个分类器,以便数据在爬网过程中变平,但 AWS 确认这不起作用。

原始文件的 JSON 格式如下,我正在尝试规范化:

- field1
- field2
- 
  - field3
  - 
    - field4
    - field5
  - []
    - 
      - field6
      - 
        - field7
        - field8
        - 
          - field9
          - 
            - field10

【问题讨论】:

【参考方案1】:
# Flatten nested df  
def flatten_df(nested_df): 
    for col in nested_df.columns:


    array_cols = [c[0] for c in nested_df.dtypes if c[1][:5] == 'array']
    for col in array_cols:
        nested_df =nested_df.withColumn(col, F.explode_outer(nested_df[col]))

    nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']
    if len(nested_cols) == 0:
        return nested_df

    flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']

    flat_df = nested_df.select(flat_cols +
                            [F.col(nc+'.'+c).alias(nc+'_'+c)
                                for nc in nested_cols
                                for c in nested_df.select(nc+'.*').columns])

    return flatten_df(flat_df)

df=flatten_df(df)

它将用下划线替换所有点。请注意,它使用explode_outer 而不是explode 来包含 Null 值,以防数组本身为空。此功能仅在spark v2.4+ 中可用。

还请记住,爆炸数组会增加更多重复项,并且整体行大小会增加。展平结构将增加列大小。简而言之,您的原始 df 将水平和垂直爆炸。以后可能会减慢处理数据的速度。

因此,我的建议是识别与功能相关的数据,并将这些数据仅存储在 postgresql 中,并将原始 json 文件存储在 s3 中。

【讨论】:

这适用于我的大部分 json 文件。但是当结构/数组为 NULL 时出现错误 可以用posexplode_outer函数代替explode_outer吗? 不错的一个。第一行for col in nested_df.columns:需要去掉 这将在数据框中创建重复记录【参考方案2】:

一旦你合理化了 json 列,你就不需要分解它了。 Relationalize 将嵌套的 JSON 转换为 JSON 文档最外层的键值对。转换后的数据维护一个嵌套 JSON 中的原始键列表,以句点分隔。

示例:

嵌套 json :


    "player": 
        "username": "user1",
        "characteristics": 
            "race": "Human",
            "class": "Warlock",
            "subclass": "Dawnblade",
            "power": 300,
            "playercountry": "USA"
        ,
        "arsenal": 
            "kinetic": 
                "name": "Sweet Business",
                "type": "Auto Rifle",
                "power": 300,
                "element": "Kinetic"
            ,
            "energy": 
                "name": "MIDA Mini-Tool",
                "type": "Submachine Gun",
                "power": 300,
                "element": "Solar"
            ,
            "power": 
                "name": "Play of the Game",
                "type": "Grenade Launcher",
                "power": 300,
                "element": "Arc"
            
        ,
        "armor": 
            "head": "Eye of Another World",
            "arms": "Philomath Gloves",
            "chest": "Philomath Robes",
            "leg": "Philomath Boots",
            "classitem": "Philomath Bond"
        ,
        "location": 
            "map": "Titan",
            "waypoint": "The Rig"
        
    

合理化后扁平化的json:


    "player.username": "user1",
    "player.characteristics.race": "Human",
    "player.characteristics.class": "Warlock",
    "player.characteristics.subclass": "Dawnblade",
    "player.characteristics.power": 300,
    "player.characteristics.playercountry": "USA",
    "player.arsenal.kinetic.name": "Sweet Business",
    "player.arsenal.kinetic.type": "Auto Rifle",
    "player.arsenal.kinetic.power": 300,
    "player.arsenal.kinetic.element": "Kinetic",
    "player.arsenal.energy.name": "MIDA Mini-Tool",
    "player.arsenal.energy.type": "Submachine Gun",
    "player.arsenal.energy.power": 300,
    "player.arsenal.energy.element": "Solar",
    "player.arsenal.power.name": "Play of the Game",
    "player.arsenal.power.type": "Grenade Launcher",
    "player.arsenal.power.power": 300,
    "player.arsenal.power.element": "Arc",
    "player.armor.head": "Eye of Another World",
    "player.armor.arms": "Philomath Gloves",
    "player.armor.chest": "Philomath Robes",
    "player.armor.leg": "Philomath Boots",
    "player.armor.classitem": "Philomath Bond",
    "player.location.map": "Titan",
    "player.location.waypoint": "The Rig"

因此,在您的情况下,request.data 已经是从请求列扁平化的新列,其类型被 spark 解释为 bigint。

参考:Simplify/querying nested json with the aws glue relationalize transform

【讨论】:

没错,但问题是 JSON 结构 (request.data) 中有一个数组,需要展平。否则,它只返回 1 的 bigint(即省略内部的实际数据),这是不正确的。否则,合理化效果很好。 @charlesperry,你是对的。 Relationalize 仅适用于 JSON 的最外层,并且应该在文档中明确说明。我仍在尝试找出将具有 5 级嵌套数组和结构的 JSON 文件关系化的最佳方法。 @ruifgmonteiro 您是否设法解决了这个问题。我们正在尝试使用嵌套数组来合理化对象。 @Sigex 我们最终使用了一种基于 Spark SQL 的不同方法,在该方法中,我们根据预期的模式创建一个表,然后我们使用 SQL 来应用所需的转换。这一直运作良好,它比我们最初的方法简单得多,它依赖于通过层次结构级别递归地应用转换。归根结底,由于缺乏文档,我们决定不投入更多精力使用胶水库,并且我们看到使用 Spark 库的更多好处,因为我们可以轻松迁移到另一个平台,而无需大幅更改代码。

以上是关于如何使用 pyspark 在 aws 胶水中展平嵌套 json 中的数组?的主要内容,如果未能解决你的问题,请参考以下文章

使用 pyspark aws 胶水时显示 DataFrame

使用 pyspark 和 aws 胶水进行数据转置

aws 胶水 pyspark 删除数组中的结构,但保留数据并保存到 dynamodb

我可以使用 aws 胶水在 sql server 中创建表/写入表吗?

将胶水pyspark错误写入文本文件

如何在 AWS Glue PySpark 中运行并行线程?