在 Spark DataFrame 中展平嵌套数组
Posted
技术标签:
【中文标题】在 Spark DataFrame 中展平嵌套数组【英文标题】:Flatten nested array in Spark DataFrame 【发布时间】:2021-03-04 14:23:20 【问题描述】:我正在阅读来自以下地址的一些 JSON:
"a": ["b": "c": 1, "d": 2]
也就是说,数组项是不必要的嵌套。现在,由于这发生在数组内部,How to flatten a struct in a Spark dataframe? 中给出的答案并不直接适用。
这是数据框在解析时的样子:
root
|-- a: array
| |-- element: struct
| | |-- b: struct
| | | |-- c: integer
| | | |-- d: integer
我希望将数据框转换为:
root
|-- a: array
| |-- element: struct
| | |-- b_c: integer
| | |-- b_d: integer
如何对数组中的列进行别名处理以有效地取消嵌套?
【问题讨论】:
【参考方案1】:你可以使用transform
:
df2 = df.selectExpr("transform(a, x -> struct(x.b.c as b_c, x.b.d as b_d)) as a")
【讨论】:
【参考方案2】:使用accepted answer 中介绍的方法,我编写了一个函数来递归地取消嵌套数据帧(也递归到嵌套数组中):
from pyspark.sql.types import ArrayType, StructType
def flatten(df, sentinel="x"):
def _gen_flatten_expr(schema, indent, parents, last, transform=False):
def handle(field, last):
path = parents + (field.name,)
alias = (
" as "
+ "_".join(path[1:] if transform else path)
+ ("," if not last else "")
)
if isinstance(field.dataType, StructType):
yield from _gen_flatten_expr(
field.dataType, indent, path, last, transform
)
elif (
isinstance(field.dataType, ArrayType) and
isinstance(field.dataType.elementType, StructType)
):
yield indent, "transform("
yield indent + 1, ".".join(path) + ","
yield indent + 1, sentinel + " -> struct("
yield from _gen_flatten_expr(
field.dataType.elementType,
indent + 2,
(sentinel,),
True,
True
)
yield indent + 1, ")"
yield indent, ")" + alias
else:
yield (indent, ".".join(path) + alias)
try:
*fields, last_field = schema.fields
except ValueError:
pass
else:
for field in fields:
yield from handle(field, False)
yield from handle(last_field, last)
lines = []
for indent, line in _gen_flatten_expr(df.schema, 0, (), True):
spaces = " " * 4 * indent
lines.append(spaces + line)
expr = "struct(" + "\n".join(lines) + ") as " + sentinel
return df.selectExpr(expr).select(sentinel + ".*")
【讨论】:
这对我没有任何作用 @Garglesoap 你能把你的问题简化成一个可以在这里分享的简短例子吗? 对不起,我很沮丧。我发现这样的工作: newdf = result.withColumn("sentiment", explode("sentiment")).select("",col("sentiment.")).drop("文档","sentence","tokens","word_embeddings","sentence_embeddings","sentiment")【参考方案3】:简化方法:
from pyspark.sql.functions import col
def flatten_df(nested_df):
stack = [((), nested_df)]
columns = []
while len(stack) > 0:
parents, df = stack.pop()
flat_cols = [
col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
for c in df.dtypes
if c[1][:6] != "struct"
]
nested_cols = [
c[0]
for c in df.dtypes
if c[1][:6] == "struct"
]
columns.extend(flat_cols)
for nested_col in nested_cols:
projected_df = df.select(nested_col + ".*")
stack.append((parents + (nested_col,), projected_df))
return nested_df.select(columns)
参考:https://docs.microsoft.com/en-us/azure/synapse-analytics/how-to-analyze-complex-schema
【讨论】:
以上是关于在 Spark DataFrame 中展平嵌套数组的主要内容,如果未能解决你的问题,请参考以下文章
Spark使用DataFrame读取复杂JSON中的嵌套数组
从嵌套的 json 列表中展平 Pandas DataFrame
将嵌套的 dict 列表展平为 Pandas Dataframe