pyspark:从现有列创建 MapType 列
Posted
技术标签:
【中文标题】pyspark:从现有列创建 MapType 列【英文标题】:pyspark: Create MapType Column from existing columns 【发布时间】:2016-12-22 17:20:47 【问题描述】:我需要根据现有列创建一个新的 Spark DF MapType 列,其中列名是键,值是值。
作为示例 - 我有这个 DF:
rdd = sc.parallelize([('123k', 1.3, 6.3, 7.6),
('d23d', 1.5, 2.0, 2.2),
('as3d', 2.2, 4.3, 9.0)
])
schema = StructType([StructField('key', StringType(), True),
StructField('metric1', FloatType(), True),
StructField('metric2', FloatType(), True),
StructField('metric3', FloatType(), True)])
df = sqlContext.createDataFrame(rdd, schema)
+----+-------+-------+-------+
| key|metric1|metric2|metric3|
+----+-------+-------+-------+
|123k| 1.3| 6.3| 7.6|
|d23d| 1.5| 2.0| 2.2|
|as3d| 2.2| 4.3| 9.0|
+----+-------+-------+-------+
到目前为止,我已经可以从这里创建一个 structType:
nameCol = struct([name for name in df.columns if ("metric" in name)]).alias("metric")
df2 = df.select("key", nameCol)
+----+-------------+
| key| metric|
+----+-------------+
|123k|[1.3,6.3,7.6]|
|d23d|[1.5,2.0,2.2]|
|as3d|[2.2,4.3,9.0]|
+----+-------------+
但我需要的是一个带有 am MapType 的度量列,其中键是列名:
+----+-------------------------+
| key| metric|
+----+-------------------------+
|123k|Map(metric1 -> 1.3, me...|
|d23d|Map(metric1 -> 1.5, me...|
|as3d|Map(metric1 -> 2.2, me...|
+----+-------------------------+
任何提示我如何转换数据?
谢谢!
【问题讨论】:
【参考方案1】:在 Spark 2.0 或更高版本中,您可以使用 create_map
。首先是一些导入:
from pyspark.sql.functions import lit, col, create_map
from itertools import chain
create_map
需要 keys
和 values
的交错序列,例如可以这样创建:
metric = create_map(list(chain(*(
(lit(name), col(name)) for name in df.columns if "metric" in name
)))).alias("metric")
并与select
一起使用:
df.select("key", metric)
使用示例数据,结果是:
+----+---------------------------------------------------------+
|key |metric |
+----+---------------------------------------------------------+
|123k|Map(metric1 -> 1.3, metric2 -> 6.3, metric3 -> 7.6) |
|d23d|Map(metric1 -> 1.5, metric2 -> 2.0, metric3 -> 2.2) |
|as3d|Map(metric1 -> 2.2, metric2 -> 4.3, metric3 -> 9.0) |
+----+---------------------------------------------------------+
如果您使用较早版本的 Spark,则必须使用 UDF:
from pyspark.sql import Column
from pyspark.sql.functions import struct
from pyspark.sql.types import DataType, DoubleType, StringType, MapType
def as_map(*cols: str, key_type: DataType=DoubleType()) -> Column:
args = [struct(lit(name), col(name)) for name in cols]
as_map_ = udf(
lambda *args: dict(args),
MapType(StringType(), key_type)
)
return as_map_(*args)
可以这样使用:
df.select("key",
as_map(*[name for name in df.columns if "metric" in name]).alias("metric"))
【讨论】:
您的解决方案看起来不错,可以用来回答:***.com/questions/45445077/… 吗? 救了我!谢谢以上是关于pyspark:从现有列创建 MapType 列的主要内容,如果未能解决你的问题,请参考以下文章
Spark DataFrame ArrayType 或 MapType 用于检查列中的值
Pyspark - 从 DataFrame 列的操作创建新列给出错误“列不可迭代”