PySpark DataFrames 是不是具有像 Pandas 中的“管道”功能?
Posted
技术标签:
【中文标题】PySpark DataFrames 是不是具有像 Pandas 中的“管道”功能?【英文标题】:Do PySpark DataFrames have a "pipe" function like in Pandas?PySpark DataFrames 是否具有像 Pandas 中的“管道”功能? 【发布时间】:2020-08-08 00:40:40 【问题描述】:例如在 Pandas 中我会这样做
data_df = (
pd.DataFrame(dict(col1=['a', 'b', 'c'], col2=['1', '2', '3']))
.pipe(lambda df: df[df.col1 != 'a'])
)
这类似于R的管道%>%
PySpark 中有类似的东西吗?
【问题讨论】:
我不这么认为。至少,这不是一个很好的使用方法。在 PySpark 中,您的 DataFrame 分布在多个服务器上。如果有一种行为类似于 Panda 的管道的方法,它需要将来自所有服务器的数据收集到一个单独的服务器中,然后调用 lambda 函数。为什么要使用pipe
?你想创建一个新列吗?添加新的?对行/组/整个数据框进行转换或聚合?
@MkWTF 此时我真的只想重命名列。就是这样。
你有多种方法可以做到这一点,check this site,学习 spark 真的很好。我也留下pyspark docs here,以备不时之需。
@MkWTF 我有几百列,需要在循环中重命名它们,所以在 pandas 中使用pipe(standardize_col_names)
之类的东西是一个很好的情况。我的主要问题是关于@someshwar-kale 回答的pipe
。管道是 Spark 中对应的东西
【参考方案1】:
您可以定义一个“类似pandas”的pipe
方法并将其绑定到DataFrame 类:
from pyspark.sql import DataFrame
def pipe(self, func, *args, **kwargs):
return func(self, *args, **kwargs)
DataFrame.pipe = pipe
然后,您可以将函数传递给pipe
方法以应用于pyspark DataFrame。例如,假设您想在更改其列之后从 DataFrame my_df
中选择所有列,除了最后两列。您可以为此使用pipe
:
my_new_df = (
my_df
# Perform some operations to add and/or remove columns
...
# At this point the list of columns is different
# from `my_df.columns`
.pipe(lambda df: df.select(*df.columns[:-2]))
)
【讨论】:
【参考方案2】:我认为,在pyspark
中,您可以借助pipeline
轻松实现此管道功能。
-
将每个管道函数转换为转换器。 spark提供了一些预定义的转换器,我们也可以使用它
使用转换器创建管道
运行管道以转换提供的数据帧
Example: Let's take the example you provided
输入要转换的数据框
val df = Seq(("a", 1), ("b", 2), ("c", 3)).toDF("col1", "col2")
df.show(false)
df.printSchema()
/**
* +----+----+
* |col1|col2|
* +----+----+
* |a |1 |
* |b |2 |
* |c |3 |
* +----+----+
*
* root
* |-- col1: string (nullable = true)
* |-- col2: integer (nullable = false)
*/
1。将每个管道函数转换为转换器
对于.pipe(lambda df: df[df.col1 != 'a'])
,我们可以轻松使用spark SQLTransformer
。所以不需要创建自定义转换器
2。使用转换器创建管道
val transform1 = new SQLTransformer()
.setStatement("select * from __THIS__ where col1 != 'a'")
val transform2 = new SQLTransformer()
.setStatement("select col1, col2, SQRT(col2) as col3 from __THIS__")
val pipeline = new Pipeline()
.setStages(Array(transform1, transform2))
3。运行管道以转换提供的数据帧
pipeline.fit(df).transform(df)
.show(false)
/**
* +----+----+------------------+
* |col1|col2|col3 |
* +----+----+------------------+
* |b |2 |1.4142135623730951|
* |c |3 |1.7320508075688772|
* +----+----+------------------+
*/
【讨论】:
@0111001101110000 你检查过这个吗? 这让我相信管道是与 pandas 管道功能最好的并行。我想我可以为我的转换制作自己的转换器,但不认为这会增加我的代码的可读性,这是管道函数的要点。【参考方案3】:在 PySpark 中,管道函数称为转换,文档为 here
行为与 Pandas 管道运算符相同。
所以 PySpark 中的示例看起来像
data_df = (
spark.createDataFrame(pd.DataFrame(dict(col1=['a', 'b', 'c'], col2=['1', '2', '3'])))
.transform(lambda df: df.filter("col1 != 'a'"))
)
【讨论】:
请注意transform
是在 Spark 3.0 中实现的
这是currently working link 到DataFrame.transform
文档。
pyspark.sql.DataFrame.transform 只接受并返回一个 Dataframe,而使用来自 @luiz-otavio-v-b-oliveira 的管道函数扩展 Dataframe 也可以采用任意参数。以上是关于PySpark DataFrames 是不是具有像 Pandas 中的“管道”功能?的主要内容,如果未能解决你的问题,请参考以下文章
PySpark DataFrames - 在不转换为 Pandas 的情况下进行枚举的方法?
PySpark DataFrames - 在不转换为 Pandas 的情况下进行枚举的方法?
在 PySpark 的两个不同 pyspark.sql.dataframes 中的两列中创建一个 pyspark.sql.dataframe
Pyspark Dataframes:创建要在 python 中的聚类中使用的特征列