如何融化 Spark DataFrame?

Posted

技术标签:

【中文标题】如何融化 Spark DataFrame?【英文标题】:How to melt Spark DataFrame? 【发布时间】:2017-05-30 22:05:33 【问题描述】:

在 PySpark 或至少在 Scala 中,Apache Spark 中是否有相当于 Pandas Melt 的函数?

到目前为止,我一直在 Python 中运行一个示例数据集,现在我想对整个数据集使用 Spark。

【问题讨论】:

另见unpivot in spark-sql/pyspark和Transpose column to row with Spark 【参考方案1】:

投票支持 user6910411 的回答。它按预期工作,但是,它不能很好地处理 None 值。因此我将他的 melt 函数重构为以下内容:

from pyspark.sql.functions import array, col, explode, lit
from pyspark.sql.functions import create_map
from pyspark.sql import DataFrame
from typing import Iterable 
from itertools import chain

def melt(
        df: DataFrame, 
        id_vars: Iterable[str], value_vars: Iterable[str], 
        var_name: str="variable", value_name: str="value") -> DataFrame:
    """Convert :class:`DataFrame` from wide to long format."""

    # Create map<key: value>
    _vars_and_vals = create_map(
        list(chain.from_iterable([
            [lit(c), col(c)] for c in value_vars]
        ))
    )

    _tmp = df.select(*id_vars, explode(_vars_and_vals)) \
        .withColumnRenamed('key', var_name) \
        .withColumnRenamed('value', value_name)

    return _tmp

测试使用以下数据框:

import pandas as pd

pdf = pd.DataFrame('A': 0: 'a', 1: 'b', 2: 'c',
                   'B': 0: 1, 1: 3, 2: 5,
                   'C': 0: 2, 1: 4, 2: 6,
                   'D': 1: 7, 2: 9)

pd.melt(pdf, id_vars=['A'], value_vars=['B', 'C', 'D'])

A   variable    value
0   a   B   1.0
1   b   B   3.0
2   c   B   5.0
3   a   C   2.0
4   b   C   4.0
5   c   C   6.0
6   a   D   NaN
7   b   D   7.0
8   c   D   9.0

sdf = spark.createDataFrame(pdf)
melt(sdf, id_vars=['A'], value_vars=['B', 'C', 'D']).show()
+---+--------+-----+
|  A|variable|value|
+---+--------+-----+
|  a|       B|  1.0|
|  a|       C|  2.0|
|  a|       D|  NaN|
|  b|       B|  3.0|
|  b|       C|  4.0|
|  b|       D|  7.0|
|  c|       B|  5.0|
|  c|       C|  6.0|
|  c|       D|  9.0|
+---+--------+-----+

【讨论】:

如果我试图传入一个列表,即 value_vars 的“someColumns”,这将如何工作?我收到“不支持的文字类型类”错误。 它非常适合我,干得好! @Budyn:你到底传递了什么 value_vars 参数?我传递了一个字符串列表(列名),如下所示:df_long = melt(df_wide, id_vars=['id', 'date'], value_vars=['t1', 't2', 't3', 't4'])【参考方案2】:

UPD

最后我找到了最有效的实现方式。它在我的纱线配置中使用集群的所有资源。

from pyspark.sql.functions import explode
def melt(df):
    sp = df.columns[1:]
    return (df
            .rdd
            .map(lambda x: [str(x[0]), [(str(i[0]), 
                                         float(i[1] if i[1] else 0)) for i in zip(sp, x[1:])]], 
                 preservesPartitioning = True)
            .toDF()
            .withColumn('_2', explode('_2'))
            .rdd.map(lambda x: [str(x[0]), 
                                str(x[1][0]), 
                                float(x[1][1] if x[1][1] else 0)], 
                     preservesPartitioning = True)
            .toDF()
            )

对于非常宽的数据框,我在 user6910411 答案的 _vars_and_vals 生成时性能下降。

通过 selectExpr 实现熔化很有用

columns=['a', 'b', 'c', 'd', 'e', 'f']
pd_df = pd.DataFrame([[1,2,3,4,5,6], [4,5,6,7,9,8], [7,8,9,1,2,4], [8,3,9,8,7,4]], columns=columns)
df = spark.createDataFrame(pd_df)
+---+---+---+---+---+---+
|  a|  b|  c|  d|  e|  f|
+---+---+---+---+---+---+
|  1|  2|  3|  4|  5|  6|
|  4|  5|  6|  7|  9|  8|
|  7|  8|  9|  1|  2|  4|
|  8|  3|  9|  8|  7|  4|
+---+---+---+---+---+---+

cols = df.columns[1:]
df.selectExpr('a', "stack(, )".format(len(cols), ', '.join(("'', ".format(i, i) for i in cols))))
+---+----+----+
|  a|col0|col1|
+---+----+----+
|  1|   b|   2|
|  1|   c|   3|
|  1|   d|   4|
|  1|   e|   5|
|  1|   f|   6|
|  4|   b|   5|
|  4|   c|   6|
|  4|   d|   7|
|  4|   e|   9|
|  4|   f|   8|
|  7|   b|   8|
|  7|   c|   9|
...

【讨论】:

我有一些类型不匹配 无法解决.. 由于数据类型不匹配:参数 2 (DoubleType) != 参数 6 (LongType);第 1 行 pos 0; 。测试表明,堆栈似乎暗示您 col1 的类型基于 col0 的前几个元素。当我们说 col0 的 d 或 f 的值进来时,输入 mismatch。你会怎么解决?我正在尝试 stack(, )".format(len(cols), ', '.join(("'', cast( as bigint)"... 这似乎可行,但不确定它是否是正确且有效的方法。堆叠数百列时我遇到了性能问题,因此效率很重要。 @Kenny 在这种情况下我从未遇到过这样的问题。但是您的解决方案听起来合乎逻辑。您也可以从更新中尝试我的解决方案。【参考方案3】:

没有内置函数(如果您使用 SQL 并且启用了 Hive 支持,您可以使用 stack function,但它没有在 Spark 中公开并且没有本机实现)但是您自己的滚动是微不足道的。所需的进口:

from pyspark.sql.functions import array, col, explode, lit, struct
from pyspark.sql import DataFrame
from typing import Iterable 

示例实现:

def melt(
        df: DataFrame, 
        id_vars: Iterable[str], value_vars: Iterable[str], 
        var_name: str="variable", value_name: str="value") -> DataFrame:
    """Convert :class:`DataFrame` from wide to long format."""

    # Create array<struct<variable: str, value: ...>>
    _vars_and_vals = array(*(
        struct(lit(c).alias(var_name), col(c).alias(value_name)) 
        for c in value_vars))

    # Add to the DataFrame and explode
    _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))

    cols = id_vars + [
            col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
    return _tmp.select(*cols)

还有一些测试(基于Pandas doctests):

import pandas as pd

pdf = pd.DataFrame('A': 0: 'a', 1: 'b', 2: 'c',
                   'B': 0: 1, 1: 3, 2: 5,
                   'C': 0: 2, 1: 4, 2: 6)

pd.melt(pdf, id_vars=['A'], value_vars=['B', 'C'])
   A variable  value
0  a        B      1
1  b        B      3
2  c        B      5
3  a        C      2
4  b        C      4
5  c        C      6
sdf = spark.createDataFrame(pdf)
melt(sdf, id_vars=['A'], value_vars=['B', 'C']).show()
+---+--------+-----+
|  A|variable|value|
+---+--------+-----+
|  a|       B|    1|
|  a|       C|    2|
|  b|       B|    3|
|  b|       C|    4|
|  c|       B|    5|
|  c|       C|    6|
+---+--------+-----+

注意:要与旧版 Python 版本一起使用,请删除类型注释。

相关:

r sparkR - equivalent to melt function Gather in sparklyr

【讨论】:

您的代码将反引号添加到列名,然后在 withColumn 调用时失败。更多参考在这里(***.com/questions/55781796/…) stack 选项相比,它的基准如何?如:df.selectExpr('col1', 'stack(2, "col2", col2, "col3", col3) as (cols, values)') 这不是一个简单的答案。这是一个天才! 惊人的答案。这个功能我用过很多次都没有问题。 这真是太棒了。 explode 适用于包含列表的列,但创建数组 "_vars_and_vals" 作为结构的键值对数组,然后在 explode 内的 withColumn 语句中使用它是一种非常有趣的行为。 @BICube 说了什么!【参考方案4】:

在我在 Spark for Scala 中搜索 melt 的实现时遇到了这个问题。

张贴我的 Scala 端口,以防有人也偶然发现。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
/** Extends the [[org.apache.spark.sql.DataFrame]] class
 *
 *  @param df the data frame to melt
 */
implicit class DataFrameFunctions(df: DataFrame) 

    /** Convert [[org.apache.spark.sql.DataFrame]] from wide to long format.
     * 
     *  melt is (kind of) the inverse of pivot
     *  melt is currently (02/2017) not implemented in spark
     *
     *  @see reshape packe in R (https://cran.r-project.org/web/packages/reshape/index.html)
     *  @see this is a scala adaptation of http://***.com/questions/41670103/pandas-melt-function-in-apache-spark
     *  
     *  @todo method overloading for simple calling
     *
     *  @param id_vars the columns to preserve
     *  @param value_vars the columns to melt
     *  @param var_name the name for the column holding the melted columns names
     *  @param value_name the name for the column holding the values of the melted columns
     *
     */

    def melt(
            id_vars: Seq[String], value_vars: Seq[String], 
            var_name: String = "variable", value_name: String = "value") : DataFrame = 

        // Create array<struct<variable: str, value: ...>>
        val _vars_and_vals = array((for (c <- value_vars) yield  struct(lit(c).alias(var_name), col(c).alias(value_name)) ): _*)

        // Add to the DataFrame and explode
        val _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))

        val cols = id_vars.map(col _) ++  for (x <- List(var_name, value_name)) yield  col("_vars_and_vals")(x).alias(x) 

        return _tmp.select(cols: _*)

    

考虑到Scala,我还没有那么先进,我相信还有改进的余地。

欢迎任何cmets。

【讨论】:

您的代码没问题,但我建议将 for-yield 构造替换为 map 函数,例如: for (x &lt;- List(var_name, value_name)) yield col("_vars_and_vals")(x).alias(x) List(var_name, value_name).map(x =&gt; col("_vars_and_vals")(x).alias(x)) 相同,for (c &lt;- value_vars) yield struct(lit(c).alias(var_name), col(c).alias(value_name)) 可以这样写:value_vars.map(c =&gt; struct(lit(c).alias(var_name), col(c).alias(value_name)))scala 中的 for-yield 比 python 中的 for-comprehension 更通用。

以上是关于如何融化 Spark DataFrame?的主要内容,如果未能解决你的问题,请参考以下文章

如果使用融化的 data.table,如何使用 ggplot2 向条形图添加标签?

如何旋转/取消旋转(投射/融化)数据框? [复制]

如何融化 R data.frame 并按条形图绘制组

Spark2 DataFrame数据框常用操作

spark dataFrame 相关知识点

如何将 spark DataFrame 转换为 RDD mllib LabeledPoints?