使用 PySpark 删除 Dataframe 的嵌套列

Posted

技术标签:

【中文标题】使用 PySpark 删除 Dataframe 的嵌套列【英文标题】:Dropping nested column of Dataframe with PySpark 【发布时间】:2017-07-12 14:55:52 【问题描述】:

我正在尝试使用 pyspark 在 Spark 数据框中删除一些嵌套列。 我为 Scala 找到了这个,它似乎正在做我想做的事,但我不熟悉 Scala,也不知道如何用 Python 编写它。

https://***.com/a/39943812/5706548

非常感谢您的帮助。

谢谢,

【问题讨论】:

【参考方案1】:

pyspark 示例:

def drop_col(df, struct_nm, delete_struct_child_col_nm):
    fields_to_keep = filter(lambda x:  x != delete_struct_child_col_nm, df.select(".*".format(struct_nm)).columns)
    fields_to_keep = list(map(lambda x:  ".".format(struct_nm, x), fields_to_keep))
    return df.withColumn(struct_nm, struct(fields_to_keep))

【讨论】:

能解释一下参数吗? 这似乎对我有用。 df = 数据框 col_nm = 父列名 delete_col_nm = 要删除的目标子列【参考方案2】:

我发现使用 pyspark 的一种方法是首先将嵌套列转换为 json,然后使用新的嵌套模式解析转换后的 json,并过滤掉不需要的列。

假设我有以下架构,我想从数据框中删除 deja.b.da.ea.h.j):

root
 |-- a: struct (nullable = true)
 |    |-- b: struct (nullable = true)
 |    |    |-- c: long (nullable = true)
 |    |    |-- d: string (nullable = true)
 |    |-- e: struct (nullable = true)
 |    |    |-- f: long (nullable = true)
 |    |    |-- g: string (nullable = true)
 |    |-- h: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- i: string (nullable = true)
 |    |    |    |-- j: string (nullable = true)
 |-- k: string (nullable = true)

我使用了以下方法:

    通过排除deja 创建新架构。一种快速的方法是从df.select("a").schema 中手动选择所需的字段,然后使用StructType 从选定的字段中创建一个新模式。或者,您可以通过遍历模式树并排除不需要的字段以编程方式执行此操作,例如:

    def exclude_nested_field(schema, unwanted_fields, parent=""):
        new_schema = []
    
        for field in schema:
            full_field_name = field.name
            if parent:
                full_field_name = parent + "." + full_field_name
    
            if full_field_name not in unwanted_fields:
                if isinstance(field.dataType, StructType):
                    inner_schema = exclude_nested_field(field.dataType, unwanted_fields, full_field_name)
                    new_schema.append(StructField(field.name, inner_schema))
                elif isinstance(field.dataType, ArrayType):
                    inner_schema = exclude_nested_field(field.dataType.elementType, unwanted_fields, full_field_name)
                    new_schema.append(StructField(field.name, ArrayType(inner_schema)))
                else:
                    new_schema.append(StructField(field.name, field.dataType))
    
        return StructType(new_schema)
    
    new_schema = exclude_nested_field(df.schema["a"].dataType, ["b.d", "e", "h.j"])
    

    a 列转换为json:.withColumn("json", F.to_json("a")).drop("a")

    使用在步骤 1 中找到的新模式解析步骤 2 中的 json 转换的 a 列:.withColumn("a", F.from_json("json", new_schema)).drop("json")

【讨论】:

我正在尝试使用此函数,但我的结构有一个数组> 并且我收到 "TypeError: 'ArrayType' object is not iterable" 。任何想法如何解决这个问题? 我们现在在 Spark 3.1.1 中有一些东西可以更好地处理嵌套字段,并且能够在不触及其他字段的情况下编辑或删除它们germanschiavon.medium.com/…【参考方案3】:

我们现在可以使用 Spark 版本 >= 3.1 原生地做到这一点

https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.Column.dropFields.html

【讨论】:

不工作:文件“C:\spark-3.1.2-bin-hadoop2.7\python\pyspark\sql\dataframe.py”,第 1643 行,在 getattr raise AttributeError(AttributeError: 'DataFrame' 对象没有属性 'dropFields' 这是 Column.dropFields,而不是 DataFrame.dropFields @deathrace【参考方案4】:

尽管我没有 PySpark 的解决方案,但将其转换为 python 可能更容易。考虑一个带有架构的数据框df

root
 |-- employee: struct (nullable = false)
 |    |-- name: string (nullable = false)
 |    |-- age: integer (nullable = false)

然后,如果您想要,例如放弃name, 你可以这样做:

val fieldsToKeep = df.select($"employee.*").columns
.filter(_!="name") // the nested column you want to drop
.map(n => "employee."+n)

// overwite column with subset of fields
df
.withColumn("employee",struct(fieldsToKeep.head,fieldsToKeep.tail:_*)) 

【讨论】:

【参考方案5】:

Raphaels Scala 答案的 Pyspark 版本。

这会在一定深度运行,丢弃高于该深度的所有内容并过滤其下方的行。

def remove_columns(df,root):
  from pyspark.sql.functions import col
  cols = df.select(root).columns
  fields_filter = filter(lambda x: x[0]!= "$", cols) # use your own lambda here. 
  fieldsToKeep = list(map(lambda x: root[:-1] + x, fields_filter)) 
  return df.select(fieldsToKeep)

df = remove_columns(raw_df, root="level1.level2.*")

【讨论】:

以上是关于使用 PySpark 删除 Dataframe 的嵌套列的主要内容,如果未能解决你的问题,请参考以下文章

如何从 PySpark Dataframe 中删除重复项并将剩余列值更改为 null

pyspark dataframe数据连接(join)转化为pandas dataframe基于多个字段删除冗余数据

PySpark DataFrame 无法删除重复项

从 PySpark DataFrame 中的列表列表中删除列表

pyspark dataframe api速览

pyspark中dataframe的清理操作