使用 PySpark 删除 Dataframe 的嵌套列
Posted
技术标签:
【中文标题】使用 PySpark 删除 Dataframe 的嵌套列【英文标题】:Dropping nested column of Dataframe with PySpark 【发布时间】:2017-07-12 14:55:52 【问题描述】:我正在尝试使用 pyspark 在 Spark 数据框中删除一些嵌套列。 我为 Scala 找到了这个,它似乎正在做我想做的事,但我不熟悉 Scala,也不知道如何用 Python 编写它。
https://***.com/a/39943812/5706548
非常感谢您的帮助。
谢谢,
【问题讨论】:
【参考方案1】:pyspark 示例:
def drop_col(df, struct_nm, delete_struct_child_col_nm):
fields_to_keep = filter(lambda x: x != delete_struct_child_col_nm, df.select(".*".format(struct_nm)).columns)
fields_to_keep = list(map(lambda x: ".".format(struct_nm, x), fields_to_keep))
return df.withColumn(struct_nm, struct(fields_to_keep))
【讨论】:
能解释一下参数吗? 这似乎对我有用。 df = 数据框 col_nm = 父列名 delete_col_nm = 要删除的目标子列【参考方案2】:我发现使用 pyspark 的一种方法是首先将嵌套列转换为 json,然后使用新的嵌套模式解析转换后的 json,并过滤掉不需要的列。
假设我有以下架构,我想从数据框中删除 d
、e
和 j
(a.b.d
、a.e
、a.h.j
):
root
|-- a: struct (nullable = true)
| |-- b: struct (nullable = true)
| | |-- c: long (nullable = true)
| | |-- d: string (nullable = true)
| |-- e: struct (nullable = true)
| | |-- f: long (nullable = true)
| | |-- g: string (nullable = true)
| |-- h: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- i: string (nullable = true)
| | | |-- j: string (nullable = true)
|-- k: string (nullable = true)
我使用了以下方法:
通过排除d
、e
和j
为a
创建新架构。一种快速的方法是从df.select("a").schema
中手动选择所需的字段,然后使用StructType
从选定的字段中创建一个新模式。或者,您可以通过遍历模式树并排除不需要的字段以编程方式执行此操作,例如:
def exclude_nested_field(schema, unwanted_fields, parent=""):
new_schema = []
for field in schema:
full_field_name = field.name
if parent:
full_field_name = parent + "." + full_field_name
if full_field_name not in unwanted_fields:
if isinstance(field.dataType, StructType):
inner_schema = exclude_nested_field(field.dataType, unwanted_fields, full_field_name)
new_schema.append(StructField(field.name, inner_schema))
elif isinstance(field.dataType, ArrayType):
inner_schema = exclude_nested_field(field.dataType.elementType, unwanted_fields, full_field_name)
new_schema.append(StructField(field.name, ArrayType(inner_schema)))
else:
new_schema.append(StructField(field.name, field.dataType))
return StructType(new_schema)
new_schema = exclude_nested_field(df.schema["a"].dataType, ["b.d", "e", "h.j"])
将a
列转换为json:.withColumn("json", F.to_json("a")).drop("a")
a
列:.withColumn("a", F.from_json("json", new_schema)).drop("json")
【讨论】:
我正在尝试使用此函数,但我的结构有一个数组我们现在可以使用 Spark 版本 >= 3.1 原生地做到这一点
https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.Column.dropFields.html
【讨论】:
不工作:文件“C:\spark-3.1.2-bin-hadoop2.7\python\pyspark\sql\dataframe.py”,第 1643 行,在 getattr raise AttributeError(AttributeError: 'DataFrame' 对象没有属性 'dropFields' 这是 Column.dropFields,而不是 DataFrame.dropFields @deathrace【参考方案4】:尽管我没有 PySpark 的解决方案,但将其转换为 python 可能更容易。考虑一个带有架构的数据框df
:
root
|-- employee: struct (nullable = false)
| |-- name: string (nullable = false)
| |-- age: integer (nullable = false)
然后,如果您想要,例如放弃name
,
你可以这样做:
val fieldsToKeep = df.select($"employee.*").columns
.filter(_!="name") // the nested column you want to drop
.map(n => "employee."+n)
// overwite column with subset of fields
df
.withColumn("employee",struct(fieldsToKeep.head,fieldsToKeep.tail:_*))
【讨论】:
【参考方案5】:Raphaels Scala 答案的 Pyspark 版本。
这会在一定深度运行,丢弃高于该深度的所有内容并过滤其下方的行。
def remove_columns(df,root):
from pyspark.sql.functions import col
cols = df.select(root).columns
fields_filter = filter(lambda x: x[0]!= "$", cols) # use your own lambda here.
fieldsToKeep = list(map(lambda x: root[:-1] + x, fields_filter))
return df.select(fieldsToKeep)
df = remove_columns(raw_df, root="level1.level2.*")
【讨论】:
以上是关于使用 PySpark 删除 Dataframe 的嵌套列的主要内容,如果未能解决你的问题,请参考以下文章
如何从 PySpark Dataframe 中删除重复项并将剩余列值更改为 null
pyspark dataframe数据连接(join)转化为pandas dataframe基于多个字段删除冗余数据