如何从aws glue pyspark作业中的嵌套数组中提取数据
Posted
技术标签:
【中文标题】如何从aws glue pyspark作业中的嵌套数组中提取数据【英文标题】:How to extract data from nested arrays in aws glue pyspark job 【发布时间】:2021-06-17 21:37:44 【问题描述】:我有一个如下的数据框
root
|-- pid: string (nullable = true)
|-- grouping: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: string (nullable = true)
| | |-- definition: struct (nullable = true)
| | | |-- type: string (nullable = true)
| | | |-- name: struct string (nullable = true)
| | | |-- description: string (nullable = true)
如下图,
pid grouping
1 [[id1,[def_type1,name1,desc1]],[id2[def_type2,name2,desc2]]]
2 [[id3,[def_type3,name3,desc3]],[id4[def_type4,name4,desc4]]]
pid:1
grouping[
id:id1,
definition
type:def_type1,
name: name1,
description: desc1
,
id:id2,
definition
type:def_type2,
name: name2,
description: desc2
]
pid:2
grouping[
id:id3,
definition
type:def_type3,
name: name3,
description: desc3
,
id:id3,
definition
type:def_type3,
name: name3,
description: desc3
]
预期输出:
root
|-- pid: string (nullable = true)
|-- pos: integer (nullable = false)
|-- name: string (nullable = true)
|-- deftype: string (nullable = true)
|-- id: string (nullable = true)
|-- desc: string (nullable = true)
pid pos name deftype id desc
----------------------------------------
1 0 name1 def_type1 id1 desc1
1 1 name2 def_type2 id2 desc2
2 0 name3 def_type3 id3 desc3
2 1 name4 def_type4 id4 desc4
是否可以将所有元素的每个数组项与上面的 pid 展开? pid pos name deftype id desc
1 0 name1 def_type1 id1 desc1 1 1 名称2 def_type2 id2 desc2 2 0 name3 def_type3 id3 desc3 2 1 name4 def_type4 id4 desc4
我用下面的方法来获取输出表,但是还有其他方法吗?
enter code here
from pyspark.sql.types import StructType,StructField, StringType, ArrayType
from pyspark.sql.functions import split, explode, posexplode
from pyspark.sql import functions as sf
df1= Df.select(sf.col('_id'),(sf.col('grouping')))
df2= df1.select('pid',posexplode(sf.col('grouping.definition.name').alias('name')))
df2= df2.withColumnRenamed("col","name")
df3= df1.select(sf.col('pid').alias('pid3'),posexplode(sf.col('grouping.definition.type').alias('deftype')))
df3= df3.withColumnRenamed("col","deftype")
df4= df1.select(sf.col('pid').alias('pid4'),posexplode(sf.col('grouping.id').alias('id')))
df4= df4.withColumnRenamed("col","id")
df6= df1.select(sf.col('pid').alias('pid5'),posexplode(sf.col('grouping.definition.description').alias('desengb')))
df6= df6.withColumnRenamed("col","desc")
df5= df2.join(df3,(df2["pos"]==df3["pos"]) & (df2["pid"]==df3["pid3"]),'inner').join(df4,(df2["pos"] == df4["pos"]) & (df2["pid"]==df4["pid4"]),'inner').join(df6,(df2["pos"] == df6["pos"]) & (df2["pid"]==df6["pid5"]),'inner').select(df2["*"],df3["deftype"],df4["id"],df6["desc"])
#df2.show(15,False)
df5.printSchema()
root
|-- pid: string (nullable = true)
|-- pos: integer (nullable = false)
|-- name: string (nullable = true)
|-- deftype: string (nullable = true)
|-- id: string (nullable = true)
|-- desc: string (nullable = true)
【问题讨论】:
【参考方案1】:我怀疑您可以在这里使用我的答案中的解决方案:py4j.protocol.Py4JJavaError: An error occurred while calling o133.pyWriteDynamicFrame 基本上,您似乎想要展平嵌套对象,为此,请创建如下函数:
def flatten(schema, prefix=None):
"""Flattens out nested schema
NOTE: If different nested schemas have same named columns,the last one found will overwrite any earlier instances of that column"""
fields = []
for field in schema.fields:
name = f"prefix.field.name" if prefix else field.name
dtype = field.dataType
if isinstance(dtype, ArrayType):
dtype = dtype.elementType
if isinstance(dtype, StructType):
fields += flatten(dtype, prefix=name)
else:
fields.append(name)
return fields
然后像这样调用它:
in your imports:
from pyspark.context import SparkContext
from awsglue.context import GlueContext
#in your process:
spark_context = SparkContext.getOrCreate()
glue_context = GlueContext(spark_context)
flattened_frame = your_frame.select(flatten(your_frame.schema))
#if needed you can keep just the columns you want like:
flattened_frame = flattened_frame.select("columnNameToKeep","columnName2ToKeep")#put the name of each column you want to keep in here
#if needed you can rename all the columns like this:
flattened_frame = flattend_frame.toDF("newColName1","newColName2")# Important: put a name for each column in here.
【讨论】:
我想这不适用于我的情况,我的架构已经扁平化了,即使像上面那样对我的架构使用扁平化函数,我在 flattened_frame 中得到的架构与我的架构相同跨度>以上是关于如何从aws glue pyspark作业中的嵌套数组中提取数据的主要内容,如果未能解决你的问题,请参考以下文章
如何克服 AWS Glue 作业中的 Spark“设备上没有剩余空间”错误
Python/Pyspark 迭代代码(用于 AWS Glue ETL 作业)