如何从aws glue pyspark作业中的嵌套数组中提取数据

Posted

技术标签:

【中文标题】如何从aws glue pyspark作业中的嵌套数组中提取数据【英文标题】:How to extract data from nested arrays in aws glue pyspark job 【发布时间】:2021-06-17 21:37:44 【问题描述】:

我有一个如下的数据框

root
      |-- pid: string (nullable = true)
      |-- grouping: array (nullable = true)
      |    |-- element: struct (containsNull = true)
      |    |    |-- id: string (nullable = true)
      |    |    |-- definition: struct (nullable = true)
      |    |    |    |-- type: string (nullable = true)
      |    |    |    |-- name: struct string (nullable = true)
      |    |    |    |-- description: string (nullable = true)
      

如下图,

     pid  grouping
     1    [[id1,[def_type1,name1,desc1]],[id2[def_type2,name2,desc2]]]
     2    [[id3,[def_type3,name3,desc3]],[id4[def_type4,name4,desc4]]]
     
     pid:1
     grouping[
         
            id:id1,
            definition
            type:def_type1,
            name: name1,
            description: desc1
         ,
         
            id:id2,
            definition
            type:def_type2,
            name: name2,
            description: desc2
         
      ]
     
     
     pid:2
     grouping[
         
            id:id3,
            definition
            type:def_type3,
            name: name3,
            description: desc3
         ,
         
            id:id3,
            definition
            type:def_type3,
            name: name3,
            description: desc3
         
       ]
     

预期输出:

     root
      |-- pid: string (nullable = true)
      |-- pos: integer (nullable = false)
      |-- name: string (nullable = true)
      |-- deftype: string (nullable = true)
      |-- id: string (nullable = true)
      |-- desc: string (nullable = true)

      pid  pos  name   deftype     id    desc
      ----------------------------------------
      1    0    name1  def_type1   id1   desc1
      1    1    name2  def_type2   id2   desc2
      2    0    name3  def_type3   id3   desc3
      2    1    name4  def_type4   id4   desc4

是否可以将所有元素的每个数组项与上面的 pid 展开? pid pos name deftype id desc

1 0 name1 def_type1 id1 desc1 1 1 名称2 def_type2 id2 desc2 2 0 name3 def_type3 id3 desc3 2 1 name4 def_type4 id4 desc4

我用下面的方法来获取输出表,但是还有其他方法吗?

enter code here
from pyspark.sql.types import StructType,StructField, StringType, ArrayType
     from pyspark.sql.functions import split, explode, posexplode
     from pyspark.sql import functions as sf
     df1= Df.select(sf.col('_id'),(sf.col('grouping')))
     df2= df1.select('pid',posexplode(sf.col('grouping.definition.name').alias('name')))
     df2= df2.withColumnRenamed("col","name")
     df3= df1.select(sf.col('pid').alias('pid3'),posexplode(sf.col('grouping.definition.type').alias('deftype')))
     df3= df3.withColumnRenamed("col","deftype")
     df4= df1.select(sf.col('pid').alias('pid4'),posexplode(sf.col('grouping.id').alias('id')))
     df4= df4.withColumnRenamed("col","id")
     df6= df1.select(sf.col('pid').alias('pid5'),posexplode(sf.col('grouping.definition.description').alias('desengb')))
     df6= df6.withColumnRenamed("col","desc")
     df5= df2.join(df3,(df2["pos"]==df3["pos"]) & (df2["pid"]==df3["pid3"]),'inner').join(df4,(df2["pos"] == df4["pos"]) & (df2["pid"]==df4["pid4"]),'inner').join(df6,(df2["pos"] == df6["pos"]) & (df2["pid"]==df6["pid5"]),'inner').select(df2["*"],df3["deftype"],df4["id"],df6["desc"])
     #df2.show(15,False)
     df5.printSchema()

  root
  |-- pid: string (nullable = true)
  |-- pos: integer (nullable = false)
  |-- name: string (nullable = true)
  |-- deftype: string (nullable = true)
  |-- id: string (nullable = true)
  |-- desc: string (nullable = true)

【问题讨论】:

【参考方案1】:

我怀疑您可以在这里使用我的答案中的解决方案:py4j.protocol.Py4JJavaError: An error occurred while calling o133.pyWriteDynamicFrame 基本上,您似乎想要展平嵌套对象,为此,请创建如下函数:

    def flatten(schema, prefix=None):
        """Flattens out nested schema
        NOTE: If different nested schemas have same named columns,the last one found will overwrite any earlier instances of that column"""
        fields = []
        for field in schema.fields:
            name = f"prefix.field.name" if prefix else field.name
            dtype = field.dataType
            if isinstance(dtype, ArrayType):
                dtype = dtype.elementType
            if isinstance(dtype, StructType):
                fields += flatten(dtype, prefix=name)
            else:
                fields.append(name)
        return fields

然后像这样调用它:

in your imports:
from pyspark.context import SparkContext
from awsglue.context import GlueContext

#in your process:
spark_context = SparkContext.getOrCreate()
glue_context = GlueContext(spark_context)
flattened_frame = your_frame.select(flatten(your_frame.schema))

#if needed you can keep just the columns you want like:
flattened_frame = flattened_frame.select("columnNameToKeep","columnName2ToKeep")#put the name of each column you want to keep in here

#if needed you can rename all the columns like this:
flattened_frame = flattend_frame.toDF("newColName1","newColName2")# Important: put a name for each column in here.

【讨论】:

我想这不适用于我的情况,我的架构已经扁平化了,即使像上面那样对我的架构使用扁平化函数,我在 flattened_frame 中得到的架构与我的架构相同跨度>

以上是关于如何从aws glue pyspark作业中的嵌套数组中提取数据的主要内容,如果未能解决你的问题,请参考以下文章

如何克服 AWS Glue 作业中的 Spark“设备上没有剩余空间”错误

AWS Glue ETL 作业中的 Boto3 Glue

Python/Pyspark 迭代代码(用于 AWS Glue ETL 作业)

如何在 AWS Glue PySpark 中运行并行线程?

Pyspark 数据框删除 AWS Glue 脚本中的重复项

需要从 AWS GLUE 作业调用存储过程