pyspark:将结构分解成列

Posted

技术标签:

【中文标题】pyspark:将结构分解成列【英文标题】:pyspark: Explode struct into columns 【发布时间】:2018-05-25 10:30:34 【问题描述】:

我创建了一个 udf,它返回一个未嵌套的 StructType。只是一个带有字段名称的混合类型(int、float)的数组。我想将它们分解/拆分成单独的列。请注意,这将创建大约 50 个新列。通过谷歌搜索,我找到了这个解决方案:

df_split = df.select('ID', 'my_struct.*')

这行得通。但是性能绝对糟糕,例如。无法使用。检查集群节点,这也仅使用 1 个核心。但这只能解释问题的一小部分。

那么什么是实现我的目标的好方法,为什么上面的解决方案这么慢?

编辑:

这似乎是导致性能不佳的 udf 和拆分的特定组合。这很慢:

df_udf = df.withColumn('udf', my_udf(df.input))
df_exploded = df_udf.select('input', 'udf.*')
df_exploded.show(5)

这很快:

df_udf = df.withColumn('udf', my_udf(df.input))
df_udf.cache()
df_exploded = df_udf.select('input', 'udf.*')
df_exploded.show(5)

【问题讨论】:

您在“编辑”中输入的内容是否解决了您的问题?如果是这样,您可以将其作为答案并接受它,这样人们就不会花时间尝试解决已经完成的问题。 嗯,它确实特别适合我的脚趾问题,但我不确定这是一个普遍可行的解决方案。这是一个教育项目,运行在一个只有我使用和完全控制的小型集群上。大型数据集上的 AFAIK 缓存会使集群承受严重的内存压力? 【参考方案1】:

根据要求,我正在编辑答案。请考虑到,这在我的情况下适用于一个小型测试集群(5 个节点),只有我使用相对较小的数据集(5000 万)处理它。

似乎是udf和split的特定组合导致性能不佳。这很慢:

df_udf = df.withColumn('udf', my_udf(df.input))
df_exploded = df_udf.select('input', 'udf.*')
df_exploded.show(5)

这很快:

df_udf = df.withColumn('udf', my_udf(df.input))
df_udf.cache()
df_exploded = df_udf.select('input', 'udf.*')
df_exploded.show(5)

【讨论】:

以上是关于pyspark:将结构分解成列的主要内容,如果未能解决你的问题,请参考以下文章

PySpark:如何将列表分解为具有顺序命名的多列?

PySpark 根据名称将列表分解为多列

使用 PySpark 分解数组值

Pyspark 将数组列分解为带有滑动窗口的子列表

PySpark 将 JSON 字符串分解为多列

通过 pyspark.ml CrossValidator 调整隐式 pyspark.ml ALS 矩阵分解模型的参数