在 Pyspark 中将结构转换为数组

Posted

技术标签:

【中文标题】在 Pyspark 中将结构转换为数组【英文标题】:Converting a Struct to an Array in Pyspark 【发布时间】:2021-11-30 00:39:39 【问题描述】:

这是我的目标: 我尝试分析微软 Azure 数据工厂创建的 json 文件。我想将它们转换成一组关系表。

为了解释我的问题,我尝试创建一个复杂性降低的示例。 您可以使用以下 python 代码生成两个示例记录:

sample1 = """
    "name": "Pipeline1",
    "properties": 
        "parameters": 
            "a": "type": "string", "default": "",
            "b": "type": "string", "default": "chris",
            "c": "type": "string", "default": "columbus",
            "d": "type": "integer", "default": "0"
        ,
        "annotations": ["Test","Sample"]
    
"""

sample2 = """
    "name": "Pipeline2",
    "properties": 
        "parameters": 
            "x": "type": "string", "default": "X",
            "y": "type": "string", "default": "Y",
        ,
        "annotations": ["another sample"]
    

我加载这些数据的第一种方法当然是将它们读取为 json 结构:

df = spark.read.json(sc.parallelize([sample1,sample2]))
df.printSchema()
df.show()

但这会返回:

root
 |-- _corrupt_record: string (nullable = true)
 |-- name: string (nullable = true)
 |-- properties: struct (nullable = true)
 |    |-- annotations: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- parameters: struct (nullable = true)
 |    |    |-- a: struct (nullable = true)
 |    |    |    |-- default: string (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |    |    |-- b: struct (nullable = true)
 |    |    |    |-- default: string (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |    |    |-- c: struct (nullable = true)
 |    |    |    |-- default: string (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |    |    |-- d: struct (nullable = true)
 |    |    |    |-- default: string (nullable = true)
 |    |    |    |-- type: string (nullable = true)

+--------------------+---------+--------------------+
|     _corrupt_record|     name|          properties|
+--------------------+---------+--------------------+
|                null|Pipeline1|[Test, Sample], ...|
|
    "name": "Pipel...|Pipeline2|                null|
+--------------------+---------+--------------------+

如您所见,第二个示例没有加载,显然是因为 sample1 和 sample2 的架构不同(参数名称不同)。 我不知道,为什么微软决定将参数元素设为结构而不是数组 - 但我无法更改。

让我回到我的目标:我想从这些样本中创建两个数据框: 第一个数据框应包含注释(带有列 pipeline_name 和注释),另一个数据框应包含参数(带有列 pipeline_name、parameter_name、parameter_type 和 parameter_default)。

有人知道一种简单的方法,将结构(不是数组)的元素转换为数据帧的行吗? 首先,我在考虑一个用户定义的函数,它一个一个地转换 json 代码并循环遍历“参数”结构的元素以将它们作为数组的元素返回。但我没有确切地知道如何实现这一目标。我试过了:

import json
from pyspark.sql.types import *

# create a dataframe with the json data as strings
df = spark.createDataFrame([Row(json=sample1), Row(json=sample2)])

#define desired schema
new_schema = StructType([
   StructField("pipeline", StructType([
     StructField("name", StringType(), True)
    ,StructField("params", ArrayType(StructType([
       StructField("paramname", StringType(), True)
      ,StructField("type", StringType(), True)
      ,StructField("default", StringType(), True)
      ])), None)
    ,StructField("annotations", ArrayType(StringType()), True)
    ]), True)
  ])

def parse_pipeline(source:str):
  dict = json.loads(source)
  name = dict["name"]
  props = dict["properties"]
  paramlist = [ ( key,  value.get('type'), value.get('default')) for key, value in props.get("parameters",).items() ]
  annotations = props.get("annotations")
  return 'pipleine':  'name':name, 'params':paramlist, 'annotations': annotations

parse_pipeline_udf = udf(parse_pipeline, new_schema)
df = df.withColumn("data", parse_pipeline_udf(F.col("json")))

但这会返回错误消息:无法转换 JSON 字符串 '"metadata":,"name":"params","nullable":null,"type":"containsNull":true, "elementType":"fields":["metadata":,"name":"paramname","nullable":true,"type":"string","metadata":," name":"type","nullable":true,"type":"string","metadata":,"name":"default","nullable":true,"type":"string "],"type":"struct","type":"array"' 到一个字段。

也许错误来自我的udf的返回值。但如果是这个原因,我应该如何通过结果。 感谢您的帮助。

【问题讨论】:

你想要的是地图,而不是数组 请用这样的网站检查你的 jsons jsonformatter.curiousconcept.com/# 【参考方案1】:

首先,我修复了您的数据样本:""" 缺失,额外的,

sample1 = """
    "name": "Pipeline1",
    "properties": 
        "parameters": 
            "a": "type": "string", "default": "",
            "b": "type": "string", "default": "chris",
            "c": "type": "string", "default": "columbus",
            "d": "type": "integer", "default": "0"
        ,
        "annotations": ["Test","Sample"]
    
"""

sample2 = """
    "name": "Pipeline2",
    "properties": 
        "parameters": 
            "x": "type": "string", "default": "X",
            "y": "type": "string", "default": "Y"
        ,
        "annotations": ["another sample"]
    
"""

只要解决这个问题,您应该在使用基本代码时包含 sample2。 但如果你想要“数组”,实际上,你需要一个map type。

new_schema = T.StructType([
    T.StructField("name", T.StringType()),
    T.StructField("properties", T.StructType([
        T.StructField("annotations", T.ArrayType(T.StringType())),
        T.StructField("parameters", T.MapType(T.StringType(), T.StructType([
            T.StructField("default", T.StringType()),
            T.StructField("type", T.StringType()),
        ])))
    ]))
])

df = spark.read.json(sc.parallelize([sample1, sample2]), new_schema)

结果:

df.show(truncate=False)
+---------+-----------------------------------------------------------------------------------------------------+
|name     |properties                                                                                           |
+---------+-----------------------------------------------------------------------------------------------------+
|Pipeline1|[[Test, Sample], [a -> [, string], b -> [chris, string], c -> [columbus, string], d -> [0, integer]]]|
|Pipeline2|[[another sample], [x -> [X, string], y -> [Y, string]]]                                             |
+---------+-----------------------------------------------------------------------------------------------------+

df.printSchema()
root
 |-- name: string (nullable = true)
 |-- properties: struct (nullable = true)
 |    |-- annotations: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- parameters: map (nullable = true)
 |    |    |-- key: string
 |    |    |-- value: struct (valueContainsNull = true)
 |    |    |    |-- default: string (nullable = true)
 |    |    |    |-- type: string (nullable = true)

【讨论】:

对于示例中的错误以及修复它们带来的不便,我们深表歉意。并感谢您的解决方案。这正是我想要的。

以上是关于在 Pyspark 中将结构转换为数组的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark 中将 URI 查询字符串转换为结构键值数组

pyspark 在循环中将数组转换为字符串

如何在 PySpark 中将 Vector 类型的列转换为数组/字符串类型?

如何在pyspark中将rdd行转换为带有json结构的数据框?

如何在pyspark中将字符串列转换为ArrayType

pyspark pandas 对象作为数据框 - TypeError