在 Pyspark 中的多个列上使用相同的函数重复调用 withColumn()
Posted
技术标签:
【中文标题】在 Pyspark 中的多个列上使用相同的函数重复调用 withColumn()【英文标题】:Repeated calls to withColumn() using the same function on multiple columns in Pyspark 【发布时间】:2020-05-26 19:34:06 【问题描述】:我有一个数据框,在其中我通过多个 .withColumn 链重复将相同的过程应用于多个列,就像这样
df= dt.withColumn('PL_start', from_unixtime('PL_start', "yyyy-MM-dd HH:mm:ss")) \
.withColumn('PL_end??' , from_unixtime('PL_end', "yyyy-MM-dd HH:mm:ss")) \
.withColumn('MU_start', from_unixtime('MU_start', "yyyy-MM-dd HH:mm:ss")) \
.withColumn('MU_end' , from_unixtime('MU_end', "yyyy-MM-dd HH:mm:ss")) \
.withColumn('PU_start', from_unixtime('PU_start', "yyyy-MM-dd HH:mm:ss")) \
.withColumn('PU_end' , from_unixtime('PU_end', "yyyy-MM-dd HH:mm:ss")) \
.withColumn('RE_start', from_unixtime('RE_start', "yyyy-MM-dd HH:mm:ss")) \
.withColumn('RE_end' , from_unixtime('RE_end', "yyyy-MM-dd HH:mm:ss")) \
.withColumn(...)
我发现了这个repeated calls to withColumn() using the same function on multiple columns 这个问题有点老了。想知道在新版本的 Spark 2.4.3 中是否有解决此问题的方法?
【问题讨论】:
试试cols=['MU_end','PU_start','PU_end','RE_start']
dt.select(*[from_unixtime(x).alias(x) for x in cols])
如果您希望所有其他列都包含所有新更改的列,cols=['MU_end','PU_start','PU_end','RE_start']
dt.select(*[x for x in dt.columns if x not in cols],*[from_unixtime(x).alias(x) for x in cols]))
【参考方案1】:
您可以使用 python reduce 函数来遍历列:
new_df = (reduce(
lambda memo_df, col_name: memo_df.withColumn(col_name, from_unixtime(col(col_name), "yyyy-MM-dd HH:mm:ss")),
df.columns,
df))
如果您的 df 有一些您不需要转换的列 - 将 df.columns 替换为您需要的列序列。
【讨论】:
以上是关于在 Pyspark 中的多个列上使用相同的函数重复调用 withColumn()的主要内容,如果未能解决你的问题,请参考以下文章
在 PySpark 中的多个列上应用 MinMaxScaler