在 Pyspark 中的多个列上使用相同的函数重复调用 withColumn()

Posted

技术标签:

【中文标题】在 Pyspark 中的多个列上使用相同的函数重复调用 withColumn()【英文标题】:Repeated calls to withColumn() using the same function on multiple columns in Pyspark 【发布时间】:2020-05-26 19:34:06 【问题描述】:

我有一个数据框,在其中我通过多个 .withColumn 链重复将相同的过程应用于多个列,就像这样

   df=     dt.withColumn('PL_start', from_unixtime('PL_start', "yyyy-MM-dd HH:mm:ss")) \
            .withColumn('PL_end??'  , from_unixtime('PL_end', "yyyy-MM-dd HH:mm:ss"))   \
            .withColumn('MU_start', from_unixtime('MU_start', "yyyy-MM-dd HH:mm:ss")) \
            .withColumn('MU_end'  , from_unixtime('MU_end', "yyyy-MM-dd HH:mm:ss"))   \
            .withColumn('PU_start', from_unixtime('PU_start', "yyyy-MM-dd HH:mm:ss")) \
            .withColumn('PU_end'  , from_unixtime('PU_end', "yyyy-MM-dd HH:mm:ss"))   \
            .withColumn('RE_start', from_unixtime('RE_start', "yyyy-MM-dd HH:mm:ss")) \
            .withColumn('RE_end'  , from_unixtime('RE_end', "yyyy-MM-dd HH:mm:ss"))   \
            .withColumn(...)

我发现了这个repeated calls to withColumn() using the same function on multiple columns 这个问题有点老了。想知道在新版本的 Spark 2.4.3 中是否有解决此问题的方法?

【问题讨论】:

试试cols=['MU_end','PU_start','PU_end','RE_start']dt.select(*[from_unixtime(x).alias(x) for x in cols]) 如果您希望所有其他列都包含所有新更改的列,cols=['MU_end','PU_start','PU_end','RE_start']dt.select(*[x for x in dt.columns if x not in cols],*[from_unixtime(x).alias(x) for x in cols])) 【参考方案1】:

您可以使用 python reduce 函数来遍历列:

new_df = (reduce(
lambda memo_df, col_name: memo_df.withColumn(col_name, from_unixtime(col(col_name), "yyyy-MM-dd HH:mm:ss")),
df.columns,
df))

如果您的 df 有一些您不需要转换的列 - 将 df.columns 替换为您需要的列序列。

【讨论】:

以上是关于在 Pyspark 中的多个列上使用相同的函数重复调用 withColumn()的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark 中的多个列上应用 MinMaxScaler

在 PySpark 数据框中的组中的列上应用函数

Pyspark - 一次聚合数据框的所有列[重复]

pyspark:groupby 和聚合 avg 和 first 在多个列上

基于另一列中的值的一列上的pyspark滞后函数

当列数未知时,如何在多个列上连接两个表(pyspark)