PySpark DataFrames 是不是具有像 Pandas 中的“管道”功能?

Posted

技术标签:

【中文标题】PySpark DataFrames 是不是具有像 Pandas 中的“管道”功能?【英文标题】:Do PySpark DataFrames have a "pipe" function like in Pandas?PySpark DataFrames 是否具有像 Pandas 中的“管道”功能? 【发布时间】:2020-08-08 00:40:40 【问题描述】:

例如在 Pandas 中我会这样做

data_df = (
     pd.DataFrame(dict(col1=['a', 'b', 'c'], col2=['1', '2', '3']))
     .pipe(lambda df: df[df.col1 != 'a'])
 )   

这类似于R的管道%>%

PySpark 中有类似的东西吗?

【问题讨论】:

我不这么认为。至少,这不是一个很好的使用方法。在 PySpark 中,您的 DataFrame 分布在多个服务器上。如果有一种行为类似于 Panda 的管道的方法,它需要将来自所有服务器的数据收集到一个单独的服务器中,然后调用 lambda 函数。为什么要使用pipe?你想创建一个新列吗?添加新的?对行/组/整个数据框进行转换或聚合? @MkWTF 此时我真的只想重命名列。就是这样。 你有多种方法可以做到这一点,check this site,学习 spark 真的很好。我也留下pyspark docs here,以备不时之需。 @MkWTF 我有几百列,需要在循环中重命名它们,所以在 pandas 中使用pipe(standardize_col_names) 之类的东西是一个很好的情况。我的主要问题是关于@someshwar-kale 回答的pipe。管道是 Spark 中对应的东西 【参考方案1】:

您可以定义一个“类似pandas”的pipe 方法并将其绑定到DataFrame 类:

from pyspark.sql import DataFrame

def pipe(self, func, *args, **kwargs):
    return func(self, *args, **kwargs)

DataFrame.pipe = pipe 

然后,您可以将函数传递给pipe 方法以应用于pyspark DataFrame。例如,假设您想在更改其列之后从 DataFrame my_df 中选择所有列,除了最后两列。您可以为此使用pipe

my_new_df = (
    my_df
    # Perform some operations to add and/or remove columns
    ... 
    # At this point the list of columns is different 
    # from `my_df.columns`
    .pipe(lambda df: df.select(*df.columns[:-2]))
)

【讨论】:

【参考方案2】:

我认为,在pyspark 中,您可以借助pipeline 轻松实现此管道功能。

    将每个管道函数转换为转换器。 spark提供了一些预定义的转换器,我们也可以使用它 使用转换器创建管道 运行管道以转换提供的数据帧

Example: Let's take the example you provided

输入要转换的数据框

 val df = Seq(("a", 1), ("b", 2), ("c", 3)).toDF("col1", "col2")
    df.show(false)
    df.printSchema()
    /**
      * +----+----+
      * |col1|col2|
      * +----+----+
      * |a   |1   |
      * |b   |2   |
      * |c   |3   |
      * +----+----+
      *
      * root
      * |-- col1: string (nullable = true)
      * |-- col2: integer (nullable = false)
      */

1。将每个管道函数转换为转换器

对于.pipe(lambda df: df[df.col1 != 'a']),我们可以轻松使用spark SQLTransformer。所以不需要创建自定义转换器

2。使用转换器创建管道

 val transform1 = new SQLTransformer()
      .setStatement("select * from __THIS__ where col1 != 'a'")
    val transform2 = new SQLTransformer()
      .setStatement("select col1, col2, SQRT(col2) as col3 from __THIS__")

    val pipeline = new Pipeline()
      .setStages(Array(transform1, transform2))

3。运行管道以转换提供的数据帧

pipeline.fit(df).transform(df)
      .show(false)

    /**
      * +----+----+------------------+
      * |col1|col2|col3              |
      * +----+----+------------------+
      * |b   |2   |1.4142135623730951|
      * |c   |3   |1.7320508075688772|
      * +----+----+------------------+
      */

【讨论】:

@0111001101110000 你检查过这个吗? 这让我相信管道是与 pandas 管道功能最好的并行。我想我可以为我的转换制作自己的转换器,但不认为这会增加我的代码的可读性,这是管道函数的要点。【参考方案3】:

在 PySpark 中,管道函数称为转换,文档为 here

行为与 Pandas 管道运算符相同。

所以 PySpark 中的示例看起来像

data_df = (
  spark.createDataFrame(pd.DataFrame(dict(col1=['a', 'b', 'c'], col2=['1', '2', '3'])))
  .transform(lambda df: df.filter("col1 != 'a'"))
)

【讨论】:

请注意transform 是在 Spark 3.0 中实现的 这是currently working link 到DataFrame.transform 文档。 pyspark.sql.DataFrame.transform 只接受并返回一个 Dataframe,而使用来自 @luiz-otavio-v-b-oliveira 的管道函数扩展 Dataframe 也可以采用任意参数。

以上是关于PySpark DataFrames 是不是具有像 Pandas 中的“管道”功能?的主要内容,如果未能解决你的问题,请参考以下文章

PySpark DataFrames - 在不转换为 Pandas 的情况下进行枚举的方法?

PySpark DataFrames - 在不转换为 Pandas 的情况下进行枚举的方法?

在 PySpark 的两个不同 pyspark.sql.dataframes 中的两列中创建一个 pyspark.sql.dataframe

Pyspark Dataframes:创建要在 python 中的聚类中使用的特征列

如何使用 Pyspark 和 Dataframes 查询 Elasticsearch 索引

Pyspark(Dataframes)逐行读取文件(将行转换为字符串)