在 Pyspark 中将结构转换为数组
Posted
技术标签:
【中文标题】在 Pyspark 中将结构转换为数组【英文标题】:Converting a Struct to an Array in Pyspark 【发布时间】:2021-11-30 00:39:39 【问题描述】:这是我的目标: 我尝试分析微软 Azure 数据工厂创建的 json 文件。我想将它们转换成一组关系表。
为了解释我的问题,我尝试创建一个复杂性降低的示例。 您可以使用以下 python 代码生成两个示例记录:
sample1 = """
"name": "Pipeline1",
"properties":
"parameters":
"a": "type": "string", "default": "",
"b": "type": "string", "default": "chris",
"c": "type": "string", "default": "columbus",
"d": "type": "integer", "default": "0"
,
"annotations": ["Test","Sample"]
"""
sample2 = """
"name": "Pipeline2",
"properties":
"parameters":
"x": "type": "string", "default": "X",
"y": "type": "string", "default": "Y",
,
"annotations": ["another sample"]
我加载这些数据的第一种方法当然是将它们读取为 json 结构:
df = spark.read.json(sc.parallelize([sample1,sample2]))
df.printSchema()
df.show()
但这会返回:
root
|-- _corrupt_record: string (nullable = true)
|-- name: string (nullable = true)
|-- properties: struct (nullable = true)
| |-- annotations: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- parameters: struct (nullable = true)
| | |-- a: struct (nullable = true)
| | | |-- default: string (nullable = true)
| | | |-- type: string (nullable = true)
| | |-- b: struct (nullable = true)
| | | |-- default: string (nullable = true)
| | | |-- type: string (nullable = true)
| | |-- c: struct (nullable = true)
| | | |-- default: string (nullable = true)
| | | |-- type: string (nullable = true)
| | |-- d: struct (nullable = true)
| | | |-- default: string (nullable = true)
| | | |-- type: string (nullable = true)
+--------------------+---------+--------------------+
| _corrupt_record| name| properties|
+--------------------+---------+--------------------+
| null|Pipeline1|[Test, Sample], ...|
|
"name": "Pipel...|Pipeline2| null|
+--------------------+---------+--------------------+
如您所见,第二个示例没有加载,显然是因为 sample1 和 sample2 的架构不同(参数名称不同)。 我不知道,为什么微软决定将参数元素设为结构而不是数组 - 但我无法更改。
让我回到我的目标:我想从这些样本中创建两个数据框: 第一个数据框应包含注释(带有列 pipeline_name 和注释),另一个数据框应包含参数(带有列 pipeline_name、parameter_name、parameter_type 和 parameter_default)。
有人知道一种简单的方法,将结构(不是数组)的元素转换为数据帧的行吗? 首先,我在考虑一个用户定义的函数,它一个一个地转换 json 代码并循环遍历“参数”结构的元素以将它们作为数组的元素返回。但我没有确切地知道如何实现这一目标。我试过了:
import json
from pyspark.sql.types import *
# create a dataframe with the json data as strings
df = spark.createDataFrame([Row(json=sample1), Row(json=sample2)])
#define desired schema
new_schema = StructType([
StructField("pipeline", StructType([
StructField("name", StringType(), True)
,StructField("params", ArrayType(StructType([
StructField("paramname", StringType(), True)
,StructField("type", StringType(), True)
,StructField("default", StringType(), True)
])), None)
,StructField("annotations", ArrayType(StringType()), True)
]), True)
])
def parse_pipeline(source:str):
dict = json.loads(source)
name = dict["name"]
props = dict["properties"]
paramlist = [ ( key, value.get('type'), value.get('default')) for key, value in props.get("parameters",).items() ]
annotations = props.get("annotations")
return 'pipleine': 'name':name, 'params':paramlist, 'annotations': annotations
parse_pipeline_udf = udf(parse_pipeline, new_schema)
df = df.withColumn("data", parse_pipeline_udf(F.col("json")))
但这会返回错误消息:无法转换 JSON 字符串 '"metadata":,"name":"params","nullable":null,"type":"containsNull":true, "elementType":"fields":["metadata":,"name":"paramname","nullable":true,"type":"string","metadata":," name":"type","nullable":true,"type":"string","metadata":,"name":"default","nullable":true,"type":"string "],"type":"struct","type":"array"' 到一个字段。
也许错误来自我的udf的返回值。但如果是这个原因,我应该如何通过结果。 感谢您的帮助。
【问题讨论】:
你想要的是地图,而不是数组 请用这样的网站检查你的 jsons jsonformatter.curiousconcept.com/# 【参考方案1】:首先,我修复了您的数据样本:"""
和 缺失,额外的
,
:
sample1 = """
"name": "Pipeline1",
"properties":
"parameters":
"a": "type": "string", "default": "",
"b": "type": "string", "default": "chris",
"c": "type": "string", "default": "columbus",
"d": "type": "integer", "default": "0"
,
"annotations": ["Test","Sample"]
"""
sample2 = """
"name": "Pipeline2",
"properties":
"parameters":
"x": "type": "string", "default": "X",
"y": "type": "string", "default": "Y"
,
"annotations": ["another sample"]
"""
只要解决这个问题,您应该在使用基本代码时包含 sample2。 但如果你想要“数组”,实际上,你需要一个map type。
new_schema = T.StructType([
T.StructField("name", T.StringType()),
T.StructField("properties", T.StructType([
T.StructField("annotations", T.ArrayType(T.StringType())),
T.StructField("parameters", T.MapType(T.StringType(), T.StructType([
T.StructField("default", T.StringType()),
T.StructField("type", T.StringType()),
])))
]))
])
df = spark.read.json(sc.parallelize([sample1, sample2]), new_schema)
结果:
df.show(truncate=False)
+---------+-----------------------------------------------------------------------------------------------------+
|name |properties |
+---------+-----------------------------------------------------------------------------------------------------+
|Pipeline1|[[Test, Sample], [a -> [, string], b -> [chris, string], c -> [columbus, string], d -> [0, integer]]]|
|Pipeline2|[[another sample], [x -> [X, string], y -> [Y, string]]] |
+---------+-----------------------------------------------------------------------------------------------------+
df.printSchema()
root
|-- name: string (nullable = true)
|-- properties: struct (nullable = true)
| |-- annotations: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- parameters: map (nullable = true)
| | |-- key: string
| | |-- value: struct (valueContainsNull = true)
| | | |-- default: string (nullable = true)
| | | |-- type: string (nullable = true)
【讨论】:
对于示例中的错误以及修复它们带来的不便,我们深表歉意。并感谢您的解决方案。这正是我想要的。以上是关于在 Pyspark 中将结构转换为数组的主要内容,如果未能解决你的问题,请参考以下文章
在 PySpark 中将 URI 查询字符串转换为结构键值数组
如何在 PySpark 中将 Vector 类型的列转换为数组/字符串类型?