PySpark - 使用 withColumnRenamed 重命名多个列

Posted

技术标签:

【中文标题】PySpark - 使用 withColumnRenamed 重命名多个列【英文标题】:PySpark - rename more than one column using withColumnRenamed 【发布时间】:2016-12-12 10:05:00 【问题描述】:

我想使用 spark withColumnRenamed 函数更改两列的名称。当然,我可以写:

data = sqlContext.createDataFrame([(1,2), (3,4)], ['x1', 'x2'])
data = (data
       .withColumnRenamed('x1','x3')
       .withColumnRenamed('x2', 'x4'))

但我想一步完成(拥有新名称的列表/元组)。不幸的是,这两个都不是:

data = data.withColumnRenamed(['x1', 'x2'], ['x3', 'x4'])

也不是这个:

data = data.withColumnRenamed(('x1', 'x2'), ('x3', 'x4'))

正在工作。这样可以吗?

【问题讨论】:

接受的答案是有效的,但请注意其他建议多次致电withColumnRenamed 的答案。对于reasons outlined in this blog post,应避免使用withColumnRenamed 方法。请参阅我的答案以了解更多详细信息。 【参考方案1】:

不能使用单个withColumnRenamed 调用。

你可以使用DataFrame.toDF方法*

data.toDF('x3', 'x4')

new_names = ['x3', 'x4']
data.toDF(*new_names)

也可以用简单的select重命名:

from pyspark.sql.functions import col

mapping = dict(zip(['x1', 'x2'], ['x3', 'x4']))
data.select([col(c).alias(mapping.get(c, c)) for c in data.columns])

在 Scala 中您也可以:

重命名所有列:

val newNames = Seq("x3", "x4")

data.toDF(newNames: _*)

从映射重命名select:

val  mapping = Map("x1" -> "x3", "x2" -> "x4")

df.select(
  df.columns.map(c => df(c).alias(mapping.get(c).getOrElse(c))): _*
)

foldLeft + withColumnRenamed

mapping.foldLeft(data)
  case (data, (oldName, newName)) => data.withColumnRenamed(oldName, newName) 


* 不要与RDD.toDF 混淆,RDD.toDF 不是可变参数函数,而是将列名作为列表,

【讨论】:

在您的第三个示例data.select([col(c).alias(mapping.get(c, c)) for c in data.columns]):如果您进行方法链接,您将如何编写data.columns(类似于 col 但引用数据框? 使用 df.select 是在 spark (scala/python) 中执行此操作的正确方法。看看这个:***.com/a/62728542/8551891【参考方案2】:

我也找不到简单的 pyspark 解决方案,所以我自己构建了一个,类似于 pandas 的 df.rename(columns='old_name_1':'new_name_1', 'old_name_2':'new_name_2')

import pyspark.sql.functions as F

def rename_columns(df, columns):
    if isinstance(columns, dict):
        return df.select(*[F.col(col_name).alias(columns.get(col_name, col_name)) for col_name in df.columns])
    else:
        raise ValueError("'columns' should be a dict, like 'old_name_1':'new_name_1', 'old_name_2':'new_name_2'")

所以你的解决方案看起来像data = rename_columns(data, 'x1': 'x3', 'x2': 'x4')

如果你想链接你的方法调用,Spark 3.0 引入了pyspark.sql.DataFrame.transform,你可以通过以下方式使用:

my_df.transform(lambda df: rename_columns(df, 'old_name_1':'new_name_1', 'old_name_2':'new_name_2'))

它为我节省了几行代码,希望对你也有帮助。

【讨论】:

在我不得不在 for 循环中使用 .withColumnRenamed() 修复许多作业的性能之后,3 年后更新了此响应,并使用了更有效的解决方案。抱歉耽搁了。【参考方案3】:

为什么要在一行中执行 如果您打印执行计划,它实际上只在单行中完成

data = spark.createDataFrame([(1,2), (3,4)], ['x1', 'x2'])
data = (data
   .withColumnRenamed('x1','x3')
   .withColumnRenamed('x2', 'x4'))
data.explain()

输出

== Physical Plan ==
*(1) Project [x1#1548L AS x3#1552L, x2#1549L AS x4#1555L]
+- Scan ExistingRDD[x1#1548L,x2#1549L]

如果你想用列表元组来做 你可以使用一个简单的地图功能

data = spark.createDataFrame([(1,2), (3,4)], ['x1', 'x2'])
new_names = [("x1","x3"),("x2","x4")]
data = data.select(list(
       map(lambda old,new:F.col(old).alias(new),*zip(*new_names))
       ))

data.explain()

还是有同样的打算

输出

== Physical Plan ==
*(1) Project [x1#1650L AS x3#1654L, x2#1651L AS x4#1655L]
+- Scan ExistingRDD[x1#1650L,x2#1651L]

【讨论】:

【参考方案4】:

如果您想使用带有前缀的相同列名重命名多个列,这应该可以工作

df.select([f.col(c).alias(PREFIX + c) for c in df.columns])

【讨论】:

你写了for c in columns 所以不是df.columns 这给了我一个错误,但如果它有效(对于方法链接)会非常有用。您是如何完成这项工作的? @corianne1234 要链接和更改列名,请使用transform df.transform(lambda df2: df2.select([col(acol).alias(acol + '_tmp') for acol in df2.columns]))【参考方案5】:

最简单的方法如下:

解释:

    使用 df.columns 获取 pyspark 数据框中的所有列 创建一个循环遍历步骤 1 中每一列的列表 列表将输出:col("col1").alias("col1_x")。仅对所需列执行此操作 *[list] 将解压 pypsark 中 select 语句的列表

from pyspark.sql import functions as F (df .select(*[F.col(c).alias(f"c_x") for c in df.columns]) .toPandas().head() )

希望对你有帮助

【讨论】:

【参考方案6】:

我的所有 pyspark 程序都有这个 hack:

import pyspark
def rename_sdf(df, mapper=, **kwargs_mapper):
    ''' Rename column names of a dataframe
        mapper: a dict mapping from the old column names to new names
        Usage:
            df.rename('old_col_name': 'new_col_name', 'old_col_name2': 'new_col_name2')
            df.rename(old_col_name=new_col_name)
    '''
    for before, after in mapper.items():
        df = df.withColumnRenamed(before, after)
    for before, after in kwargs_mapper.items():
        df = df.withColumnRenamed(before, after)
    return df
pyspark.sql.dataframe.DataFrame.rename = rename_sdf

现在您可以轻松地以 pandas 方式重命名任何 spark 数据框!

df.rename('old1':'new1', 'old2':'new2')

【讨论】:

【参考方案7】:

zero323 接受的答案是有效的。应该避免大多数其他答案。

这是另一个利用quinn 库并且非常适合生产代码库的高效解决方案:

df = spark.createDataFrame([(1,2), (3,4)], ['x1', 'x2'])
def rename_col(s):
    mapping = 'x1': 'x3', 'x2': 'x4'
    return mapping[s]
actual_df = df.transform(quinn.with_columns_renamed(rename_col))
actual_df.show()

这是输出的 DataFrame:

+---+---+
| x3| x4|
+---+---+
|  1|  2|
|  3|  4|
+---+---+

让我们看一下actual_df.explain(True)输出的逻辑计划并验证它们是否有效:

== Parsed Logical Plan ==
'Project ['x1 AS x3#52, 'x2 AS x4#53]
+- LogicalRDD [x1#48L, x2#49L], false

== Analyzed Logical Plan ==
x3: bigint, x4: bigint
Project [x1#48L AS x3#52L, x2#49L AS x4#53L]
+- LogicalRDD [x1#48L, x2#49L], false

== Optimized Logical Plan ==
Project [x1#48L AS x3#52L, x2#49L AS x4#53L]
+- LogicalRDD [x1#48L, x2#49L], false

== Physical Plan ==
*(1) Project [x1#48L AS x3#52L, x2#49L AS x4#53L]

解析后的逻辑计划和物理计划基本相等,因此 Catalyst 并没有做任何繁重的工作来优化计划。

应避免多次调用withColumnRenamed,因为它会创建一个需要优化的低效解析计划。

让我们看一个不必要的复杂解析计划:

def rename_columns(df, columns):
    for old_name, new_name in columns.items():
        df = df.withColumnRenamed(old_name, new_name)
    return df

def rename_col(s):
    mapping = 'x1': 'x3', 'x2': 'x4'
    return mapping[s]
actual_df = rename_columns(df, 'x1': 'x3', 'x2': 'x4')
actual_df.explain(True)
== Parsed Logical Plan ==
Project [x3#52L, x2#49L AS x4#55L]
+- Project [x1#48L AS x3#52L, x2#49L]
   +- LogicalRDD [x1#48L, x2#49L], false

== Analyzed Logical Plan ==
x3: bigint, x4: bigint
Project [x3#52L, x2#49L AS x4#55L]
+- Project [x1#48L AS x3#52L, x2#49L]
   +- LogicalRDD [x1#48L, x2#49L], false

== Optimized Logical Plan ==
Project [x1#48L AS x3#52L, x2#49L AS x4#55L]
+- LogicalRDD [x1#48L, x2#49L], false

== Physical Plan ==
*(1) Project [x1#48L AS x3#52L, x2#49L AS x4#55L]

【讨论】:

【参考方案8】:

您还可以使用Dictionary 遍历要重命名的列。

示例

a_dict = 'sum_gb': 'sum_mbUsed', 'number_call': 'sum_call_date'

for key, value in a_dict.items():
    df= df.withColumnRenamed(value,key)

【讨论】:

【参考方案9】:

你应该使用这个函数:

def spark_rename_from_dict(df, rename_dict):
    newcols = [rename_dict.get(i,i) for i in df.columns]
    df = df.toDF(*newcols)

在这里,您的重命名字典是对 df.columns 子集的映射。推荐这种方法,因为它不会创建多个数据帧

【讨论】:

以上是关于PySpark - 使用 withColumnRenamed 重命名多个列的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark-SQL 与 Pyspark 使用 Delta 格式的查询表有啥区别?

Pyspark - 使用 python 或 pyspark 转换 excel 文件的行和列

避免在 pyspark 代码中使用 collect() 函数的最佳方法是啥?编写优化pyspark代码的最佳方法?

pyspark使用ipython

无法使用 pyspark 写入 hdfs

使用 pyspark 从 s3 位置读取镶木地板文件的文件夹到 pyspark 数据帧