PySpark - 使用 withColumnRenamed 重命名多个列
Posted
技术标签:
【中文标题】PySpark - 使用 withColumnRenamed 重命名多个列【英文标题】:PySpark - rename more than one column using withColumnRenamed 【发布时间】:2016-12-12 10:05:00 【问题描述】:我想使用 spark withColumnRenamed 函数更改两列的名称。当然,我可以写:
data = sqlContext.createDataFrame([(1,2), (3,4)], ['x1', 'x2'])
data = (data
.withColumnRenamed('x1','x3')
.withColumnRenamed('x2', 'x4'))
但我想一步完成(拥有新名称的列表/元组)。不幸的是,这两个都不是:
data = data.withColumnRenamed(['x1', 'x2'], ['x3', 'x4'])
也不是这个:
data = data.withColumnRenamed(('x1', 'x2'), ('x3', 'x4'))
正在工作。这样可以吗?
【问题讨论】:
接受的答案是有效的,但请注意其他建议多次致电withColumnRenamed
的答案。对于reasons outlined in this blog post,应避免使用withColumnRenamed
方法。请参阅我的答案以了解更多详细信息。
【参考方案1】:
不能使用单个withColumnRenamed
调用。
你可以使用DataFrame.toDF
方法*
data.toDF('x3', 'x4')
或
new_names = ['x3', 'x4']
data.toDF(*new_names)
也可以用简单的select
重命名:
from pyspark.sql.functions import col
mapping = dict(zip(['x1', 'x2'], ['x3', 'x4']))
data.select([col(c).alias(mapping.get(c, c)) for c in data.columns])
在 Scala 中您也可以:
重命名所有列:
val newNames = Seq("x3", "x4")
data.toDF(newNames: _*)
从映射重命名select
:
val mapping = Map("x1" -> "x3", "x2" -> "x4")
df.select(
df.columns.map(c => df(c).alias(mapping.get(c).getOrElse(c))): _*
)
或foldLeft
+ withColumnRenamed
mapping.foldLeft(data)
case (data, (oldName, newName)) => data.withColumnRenamed(oldName, newName)
* 不要与RDD.toDF
混淆,RDD.toDF
不是可变参数函数,而是将列名作为列表,
【讨论】:
在您的第三个示例data.select([col(c).alias(mapping.get(c, c)) for c in data.columns])
:如果您进行方法链接,您将如何编写data.columns
(类似于 col 但引用数据框?
使用 df.select
是在 spark (scala/python) 中执行此操作的正确方法。看看这个:***.com/a/62728542/8551891【参考方案2】:
我也找不到简单的 pyspark 解决方案,所以我自己构建了一个,类似于 pandas 的 df.rename(columns='old_name_1':'new_name_1', 'old_name_2':'new_name_2')
。
import pyspark.sql.functions as F
def rename_columns(df, columns):
if isinstance(columns, dict):
return df.select(*[F.col(col_name).alias(columns.get(col_name, col_name)) for col_name in df.columns])
else:
raise ValueError("'columns' should be a dict, like 'old_name_1':'new_name_1', 'old_name_2':'new_name_2'")
所以你的解决方案看起来像data = rename_columns(data, 'x1': 'x3', 'x2': 'x4')
如果你想链接你的方法调用,Spark 3.0 引入了pyspark.sql.DataFrame.transform,你可以通过以下方式使用:
my_df.transform(lambda df: rename_columns(df, 'old_name_1':'new_name_1', 'old_name_2':'new_name_2'))
它为我节省了几行代码,希望对你也有帮助。
【讨论】:
在我不得不在 for 循环中使用 .withColumnRenamed() 修复许多作业的性能之后,3 年后更新了此响应,并使用了更有效的解决方案。抱歉耽搁了。【参考方案3】:为什么要在一行中执行 如果您打印执行计划,它实际上只在单行中完成
data = spark.createDataFrame([(1,2), (3,4)], ['x1', 'x2'])
data = (data
.withColumnRenamed('x1','x3')
.withColumnRenamed('x2', 'x4'))
data.explain()
输出
== Physical Plan ==
*(1) Project [x1#1548L AS x3#1552L, x2#1549L AS x4#1555L]
+- Scan ExistingRDD[x1#1548L,x2#1549L]
如果你想用列表元组来做 你可以使用一个简单的地图功能
data = spark.createDataFrame([(1,2), (3,4)], ['x1', 'x2'])
new_names = [("x1","x3"),("x2","x4")]
data = data.select(list(
map(lambda old,new:F.col(old).alias(new),*zip(*new_names))
))
data.explain()
还是有同样的打算
输出
== Physical Plan ==
*(1) Project [x1#1650L AS x3#1654L, x2#1651L AS x4#1655L]
+- Scan ExistingRDD[x1#1650L,x2#1651L]
【讨论】:
【参考方案4】:如果您想使用带有前缀的相同列名重命名多个列,这应该可以工作
df.select([f.col(c).alias(PREFIX + c) for c in df.columns])
【讨论】:
你写了for c in columns
所以不是df.columns
这给了我一个错误,但如果它有效(对于方法链接)会非常有用。您是如何完成这项工作的?
@corianne1234 要链接和更改列名,请使用transform
df.transform(lambda df2: df2.select([col(acol).alias(acol + '_tmp') for acol in df2.columns]))
【参考方案5】:
最简单的方法如下:
解释:
-
使用 df.columns 获取 pyspark 数据框中的所有列
创建一个循环遍历步骤 1 中每一列的列表
列表将输出:col("col1").alias("col1_x")。仅对所需列执行此操作
*[list] 将解压 pypsark 中 select 语句的列表
from pyspark.sql import functions as F
(df
.select(*[F.col(c).alias(f"c_x") for c in df.columns])
.toPandas().head()
)
希望对你有帮助
【讨论】:
【参考方案6】:我的所有 pyspark 程序都有这个 hack:
import pyspark
def rename_sdf(df, mapper=, **kwargs_mapper):
''' Rename column names of a dataframe
mapper: a dict mapping from the old column names to new names
Usage:
df.rename('old_col_name': 'new_col_name', 'old_col_name2': 'new_col_name2')
df.rename(old_col_name=new_col_name)
'''
for before, after in mapper.items():
df = df.withColumnRenamed(before, after)
for before, after in kwargs_mapper.items():
df = df.withColumnRenamed(before, after)
return df
pyspark.sql.dataframe.DataFrame.rename = rename_sdf
现在您可以轻松地以 pandas 方式重命名任何 spark 数据框!
df.rename('old1':'new1', 'old2':'new2')
【讨论】:
【参考方案7】:zero323 接受的答案是有效的。应该避免大多数其他答案。
这是另一个利用quinn 库并且非常适合生产代码库的高效解决方案:
df = spark.createDataFrame([(1,2), (3,4)], ['x1', 'x2'])
def rename_col(s):
mapping = 'x1': 'x3', 'x2': 'x4'
return mapping[s]
actual_df = df.transform(quinn.with_columns_renamed(rename_col))
actual_df.show()
这是输出的 DataFrame:
+---+---+
| x3| x4|
+---+---+
| 1| 2|
| 3| 4|
+---+---+
让我们看一下actual_df.explain(True)
输出的逻辑计划并验证它们是否有效:
== Parsed Logical Plan ==
'Project ['x1 AS x3#52, 'x2 AS x4#53]
+- LogicalRDD [x1#48L, x2#49L], false
== Analyzed Logical Plan ==
x3: bigint, x4: bigint
Project [x1#48L AS x3#52L, x2#49L AS x4#53L]
+- LogicalRDD [x1#48L, x2#49L], false
== Optimized Logical Plan ==
Project [x1#48L AS x3#52L, x2#49L AS x4#53L]
+- LogicalRDD [x1#48L, x2#49L], false
== Physical Plan ==
*(1) Project [x1#48L AS x3#52L, x2#49L AS x4#53L]
解析后的逻辑计划和物理计划基本相等,因此 Catalyst 并没有做任何繁重的工作来优化计划。
应避免多次调用withColumnRenamed
,因为它会创建一个需要优化的低效解析计划。
让我们看一个不必要的复杂解析计划:
def rename_columns(df, columns):
for old_name, new_name in columns.items():
df = df.withColumnRenamed(old_name, new_name)
return df
def rename_col(s):
mapping = 'x1': 'x3', 'x2': 'x4'
return mapping[s]
actual_df = rename_columns(df, 'x1': 'x3', 'x2': 'x4')
actual_df.explain(True)
== Parsed Logical Plan ==
Project [x3#52L, x2#49L AS x4#55L]
+- Project [x1#48L AS x3#52L, x2#49L]
+- LogicalRDD [x1#48L, x2#49L], false
== Analyzed Logical Plan ==
x3: bigint, x4: bigint
Project [x3#52L, x2#49L AS x4#55L]
+- Project [x1#48L AS x3#52L, x2#49L]
+- LogicalRDD [x1#48L, x2#49L], false
== Optimized Logical Plan ==
Project [x1#48L AS x3#52L, x2#49L AS x4#55L]
+- LogicalRDD [x1#48L, x2#49L], false
== Physical Plan ==
*(1) Project [x1#48L AS x3#52L, x2#49L AS x4#55L]
【讨论】:
【参考方案8】:您还可以使用Dictionary 遍历要重命名的列。
示例
a_dict = 'sum_gb': 'sum_mbUsed', 'number_call': 'sum_call_date'
for key, value in a_dict.items():
df= df.withColumnRenamed(value,key)
【讨论】:
【参考方案9】:你应该使用这个函数:
def spark_rename_from_dict(df, rename_dict):
newcols = [rename_dict.get(i,i) for i in df.columns]
df = df.toDF(*newcols)
在这里,您的重命名字典是对 df.columns
子集的映射。推荐这种方法,因为它不会创建多个数据帧
【讨论】:
以上是关于PySpark - 使用 withColumnRenamed 重命名多个列的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark-SQL 与 Pyspark 使用 Delta 格式的查询表有啥区别?
Pyspark - 使用 python 或 pyspark 转换 excel 文件的行和列