PySpark 将“map”类型的列转换为数据框中的多列

Posted

技术标签:

【中文标题】PySpark 将“map”类型的列转换为数据框中的多列【英文标题】:PySpark converting a column of type 'map' to multiple columns in a dataframe 【发布时间】:2016-08-20 12:43:11 【问题描述】:

输入

我有一个Parameters 类型的列map,形式为:

>>> from pyspark.sql import SQLContext
>>> sqlContext = SQLContext(sc)
>>> d = ['Parameters': 'foo': '1', 'bar': '2', 'baz': 'aaa']
>>> df = sqlContext.createDataFrame(d)
>>> df.collect()
[Row(Parameters='foo': '1', 'bar': '2', 'baz': 'aaa')]

输出

我想在 pyspark 中对其进行重塑,以便所有键(foobar 等)都是列,即:

[Row(foo='1', bar='2', baz='aaa')]

使用withColumn 有效:

(df
 .withColumn('foo', df.Parameters['foo'])
 .withColumn('bar', df.Parameters['bar'])
 .withColumn('baz', df.Parameters['baz'])
 .drop('Parameters')
).collect()

但是我需要一个不明确提及列名的解决方案,因为我有几十个。

架构

>>> df.printSchema()

root
 |-- Parameters: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

【问题讨论】:

想要的输出是什么? @eliasah 刚刚编辑了 Q 以获得所需的输出 【参考方案1】:

由于MapType 的键不是架构的一部分,您必须首先收集这些,例如:

from pyspark.sql.functions import explode

keys = (df
    .select(explode("Parameters"))
    .select("key")
    .distinct()
    .rdd.flatMap(lambda x: x)
    .collect())

当你有了这些后,剩下的就是简单的选择:

from pyspark.sql.functions import col

exprs = [col("Parameters").getItem(k).alias(k) for k in keys]
df.select(*exprs)

【讨论】:

谢谢!这对我有用,但有一个例外。当我打印数据框的模式时 - df.select(*exprs),它将所有数据类型返回为字符串。我有一种数据类型,它是键中的 struct 类型。我怎样才能访问它? @TopCoder topfield.nestedfield? 如果你有 280 个键,你必须变成列,会发生什么?我不断收到消息说它超出了 spark 的开销内存。【参考方案2】:

高效的解决方案

问题限制之一是动态确定列名,这很好,但请注意,这可能真的很慢。以下是您可以避免键入和编写可快速执行的代码的方法。

cols = list(map(
    lambda f: F.col("Parameters").getItem(f).alias(str(f)),
    ["foo", "bar", "baz"]))
df.select(cols).show()
+---+---+---+
|foo|bar|baz|
+---+---+---+
|  1|  2|aaa|
+---+---+---+

请注意,这将运行单个选择操作。不要多次运行withColumn,因为这样会更慢。

只有在您知道所有地图键的情况下才能实现快速解决方案。如果您不知道映射键的所有唯一值,则需要恢复到较慢的解决方案。

较慢的解决方案

接受的答案很好。我的解决方案性能更高一些,因为它不调用 .rddflatMap()

import pyspark.sql.functions as F

d = ['Parameters': 'foo': '1', 'bar': '2', 'baz': 'aaa']
df = spark.createDataFrame(d)

keys_df = df.select(F.explode(F.map_keys(F.col("Parameters")))).distinct()
keys = list(map(lambda row: row[0], keys_df.collect()))
key_cols = list(map(lambda f: F.col("Parameters").getItem(f).alias(str(f)), keys))
df.select(key_cols).show()
+---+---+---+
|bar|foo|baz|
+---+---+---+
|  2|  1|aaa|
+---+---+---+

将结果收集到驱动程序节点可能是性能瓶颈。最好将此代码 list(map(lambda row: row[0], keys_df.collect())) 作为单独的命令执行,以确保它不会运行得太慢。

【讨论】:

以上是关于PySpark 将“map”类型的列转换为数据框中的多列的主要内容,如果未能解决你的问题,请参考以下文章

使用 pyspark 将 Spark 数据框中的列转换为数组 [重复]

如何使用pyspark将具有多个可能值的Json数组列表转换为数据框中的列

pyspark 在循环中将数组转换为字符串

遍历 pyspark 数据框中的列,而不为单个列创建不同的数据框

将列表的列拆分为同一 PySpark 数据框中的多列

将pyspark数据框的列转换为小写