如何融化 Spark DataFrame?
Posted
技术标签:
【中文标题】如何融化 Spark DataFrame?【英文标题】:How to melt Spark DataFrame? 【发布时间】:2017-05-30 22:05:33 【问题描述】:在 PySpark 或至少在 Scala 中,Apache Spark 中是否有相当于 Pandas Melt 的函数?
到目前为止,我一直在 Python 中运行一个示例数据集,现在我想对整个数据集使用 Spark。
【问题讨论】:
另见unpivot in spark-sql/pyspark和Transpose column to row with Spark 【参考方案1】:投票支持 user6910411 的回答。它按预期工作,但是,它不能很好地处理 None 值。因此我将他的 melt 函数重构为以下内容:
from pyspark.sql.functions import array, col, explode, lit
from pyspark.sql.functions import create_map
from pyspark.sql import DataFrame
from typing import Iterable
from itertools import chain
def melt(
df: DataFrame,
id_vars: Iterable[str], value_vars: Iterable[str],
var_name: str="variable", value_name: str="value") -> DataFrame:
"""Convert :class:`DataFrame` from wide to long format."""
# Create map<key: value>
_vars_and_vals = create_map(
list(chain.from_iterable([
[lit(c), col(c)] for c in value_vars]
))
)
_tmp = df.select(*id_vars, explode(_vars_and_vals)) \
.withColumnRenamed('key', var_name) \
.withColumnRenamed('value', value_name)
return _tmp
测试使用以下数据框:
import pandas as pd
pdf = pd.DataFrame('A': 0: 'a', 1: 'b', 2: 'c',
'B': 0: 1, 1: 3, 2: 5,
'C': 0: 2, 1: 4, 2: 6,
'D': 1: 7, 2: 9)
pd.melt(pdf, id_vars=['A'], value_vars=['B', 'C', 'D'])
A variable value
0 a B 1.0
1 b B 3.0
2 c B 5.0
3 a C 2.0
4 b C 4.0
5 c C 6.0
6 a D NaN
7 b D 7.0
8 c D 9.0
sdf = spark.createDataFrame(pdf)
melt(sdf, id_vars=['A'], value_vars=['B', 'C', 'D']).show()
+---+--------+-----+
| A|variable|value|
+---+--------+-----+
| a| B| 1.0|
| a| C| 2.0|
| a| D| NaN|
| b| B| 3.0|
| b| C| 4.0|
| b| D| 7.0|
| c| B| 5.0|
| c| C| 6.0|
| c| D| 9.0|
+---+--------+-----+
【讨论】:
如果我试图传入一个列表,即 value_vars 的“someColumns”,这将如何工作?我收到“不支持的文字类型类”错误。 它非常适合我,干得好! @Budyn:你到底传递了什么 value_vars 参数?我传递了一个字符串列表(列名),如下所示:df_long = melt(df_wide, id_vars=['id', 'date'], value_vars=['t1', 't2', 't3', 't4'])
【参考方案2】:
UPD
最后我找到了最有效的实现方式。它在我的纱线配置中使用集群的所有资源。
from pyspark.sql.functions import explode
def melt(df):
sp = df.columns[1:]
return (df
.rdd
.map(lambda x: [str(x[0]), [(str(i[0]),
float(i[1] if i[1] else 0)) for i in zip(sp, x[1:])]],
preservesPartitioning = True)
.toDF()
.withColumn('_2', explode('_2'))
.rdd.map(lambda x: [str(x[0]),
str(x[1][0]),
float(x[1][1] if x[1][1] else 0)],
preservesPartitioning = True)
.toDF()
)
对于非常宽的数据框,我在 user6910411 答案的 _vars_and_vals 生成时性能下降。
通过 selectExpr 实现熔化很有用
columns=['a', 'b', 'c', 'd', 'e', 'f']
pd_df = pd.DataFrame([[1,2,3,4,5,6], [4,5,6,7,9,8], [7,8,9,1,2,4], [8,3,9,8,7,4]], columns=columns)
df = spark.createDataFrame(pd_df)
+---+---+---+---+---+---+
| a| b| c| d| e| f|
+---+---+---+---+---+---+
| 1| 2| 3| 4| 5| 6|
| 4| 5| 6| 7| 9| 8|
| 7| 8| 9| 1| 2| 4|
| 8| 3| 9| 8| 7| 4|
+---+---+---+---+---+---+
cols = df.columns[1:]
df.selectExpr('a', "stack(, )".format(len(cols), ', '.join(("'', ".format(i, i) for i in cols))))
+---+----+----+
| a|col0|col1|
+---+----+----+
| 1| b| 2|
| 1| c| 3|
| 1| d| 4|
| 1| e| 5|
| 1| f| 6|
| 4| b| 5|
| 4| c| 6|
| 4| d| 7|
| 4| e| 9|
| 4| f| 8|
| 7| b| 8|
| 7| c| 9|
...
【讨论】:
我有一些类型不匹配 无法解决.. 由于数据类型不匹配:参数 2 (DoubleType) != 参数 6 (LongType);第 1 行 pos 0; 。测试表明,堆栈似乎暗示您 col1 的类型基于 col0 的前几个元素。当我们说 col0 的 d 或 f 的值进来时,输入 mismatch。你会怎么解决?我正在尝试 stack(, )".format(len(cols), ', '.join(("'', cast( as bigint)"... 这似乎可行,但不确定它是否是正确且有效的方法。堆叠数百列时我遇到了性能问题,因此效率很重要。 @Kenny 在这种情况下我从未遇到过这样的问题。但是您的解决方案听起来合乎逻辑。您也可以从更新中尝试我的解决方案。【参考方案3】:没有内置函数(如果您使用 SQL 并且启用了 Hive 支持,您可以使用 stack
function,但它没有在 Spark 中公开并且没有本机实现)但是您自己的滚动是微不足道的。所需的进口:
from pyspark.sql.functions import array, col, explode, lit, struct
from pyspark.sql import DataFrame
from typing import Iterable
示例实现:
def melt(
df: DataFrame,
id_vars: Iterable[str], value_vars: Iterable[str],
var_name: str="variable", value_name: str="value") -> DataFrame:
"""Convert :class:`DataFrame` from wide to long format."""
# Create array<struct<variable: str, value: ...>>
_vars_and_vals = array(*(
struct(lit(c).alias(var_name), col(c).alias(value_name))
for c in value_vars))
# Add to the DataFrame and explode
_tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))
cols = id_vars + [
col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
return _tmp.select(*cols)
还有一些测试(基于Pandas doctests):
import pandas as pd
pdf = pd.DataFrame('A': 0: 'a', 1: 'b', 2: 'c',
'B': 0: 1, 1: 3, 2: 5,
'C': 0: 2, 1: 4, 2: 6)
pd.melt(pdf, id_vars=['A'], value_vars=['B', 'C'])
A variable value
0 a B 1
1 b B 3
2 c B 5
3 a C 2
4 b C 4
5 c C 6
sdf = spark.createDataFrame(pdf)
melt(sdf, id_vars=['A'], value_vars=['B', 'C']).show()
+---+--------+-----+
| A|variable|value|
+---+--------+-----+
| a| B| 1|
| a| C| 2|
| b| B| 3|
| b| C| 4|
| c| B| 5|
| c| C| 6|
+---+--------+-----+
注意:要与旧版 Python 版本一起使用,请删除类型注释。
相关:
r sparkR - equivalent to melt function Gather in sparklyr【讨论】:
您的代码将反引号添加到列名,然后在withColumn
调用时失败。更多参考在这里(***.com/questions/55781796/…)
与stack
选项相比,它的基准如何?如:df.selectExpr('col1', 'stack(2, "col2", col2, "col3", col3) as (cols, values)')
这不是一个简单的答案。这是一个天才!
惊人的答案。这个功能我用过很多次都没有问题。
这真是太棒了。 explode
适用于包含列表的列,但创建数组 "_vars_and_vals" 作为结构的键值对数组,然后在 explode 内的 withColumn 语句中使用它是一种非常有趣的行为。 @BICube 说了什么!【参考方案4】:
在我在 Spark for Scala 中搜索 melt
的实现时遇到了这个问题。
张贴我的 Scala 端口,以防有人也偶然发现。
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
/** Extends the [[org.apache.spark.sql.DataFrame]] class
*
* @param df the data frame to melt
*/
implicit class DataFrameFunctions(df: DataFrame)
/** Convert [[org.apache.spark.sql.DataFrame]] from wide to long format.
*
* melt is (kind of) the inverse of pivot
* melt is currently (02/2017) not implemented in spark
*
* @see reshape packe in R (https://cran.r-project.org/web/packages/reshape/index.html)
* @see this is a scala adaptation of http://***.com/questions/41670103/pandas-melt-function-in-apache-spark
*
* @todo method overloading for simple calling
*
* @param id_vars the columns to preserve
* @param value_vars the columns to melt
* @param var_name the name for the column holding the melted columns names
* @param value_name the name for the column holding the values of the melted columns
*
*/
def melt(
id_vars: Seq[String], value_vars: Seq[String],
var_name: String = "variable", value_name: String = "value") : DataFrame =
// Create array<struct<variable: str, value: ...>>
val _vars_and_vals = array((for (c <- value_vars) yield struct(lit(c).alias(var_name), col(c).alias(value_name)) ): _*)
// Add to the DataFrame and explode
val _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))
val cols = id_vars.map(col _) ++ for (x <- List(var_name, value_name)) yield col("_vars_and_vals")(x).alias(x)
return _tmp.select(cols: _*)
考虑到Scala
,我还没有那么先进,我相信还有改进的余地。
欢迎任何cmets。
【讨论】:
您的代码没问题,但我建议将for-yield
构造替换为 map
函数,例如: for (x <- List(var_name, value_name)) yield col("_vars_and_vals")(x).alias(x)
与 List(var_name, value_name).map(x => col("_vars_and_vals")(x).alias(x))
相同,for (c <- value_vars) yield struct(lit(c).alias(var_name), col(c).alias(value_name))
可以这样写:value_vars.map(c => struct(lit(c).alias(var_name), col(c).alias(value_name)))
。 scala 中的 for-yield 比 python 中的 for-comprehension 更通用。以上是关于如何融化 Spark DataFrame?的主要内容,如果未能解决你的问题,请参考以下文章