pyspark将列添加到列表中已经不存在的数据框
Posted
技术标签:
【中文标题】pyspark将列添加到列表中已经不存在的数据框【英文标题】:pyspark adding columns to dataframe that are already not present from a list 【发布时间】:2021-12-01 22:19:56 【问题描述】:我有一个数据框,其中包含来自源文件的列不一致,每次加载都可以添加或删除新列
我为所需的列创建了一个列表,我正在尝试通过检查我的列表来添加数据框中已经缺少的新列
req_cols = ["A","B","C","D","E","F","G"]
df.show()
#+---+-----+---+---+----+
#| A| B | C | D | E |
#+---+-----+---+---+----+
#| 5 | 10 | 8 | 9 | 0 |
#+---+-----+---+---+----+
如果数据框存在,我现在检查列是否存在,如果不存在,我打算添加
for cols in req_cols:
if cols not in df.columns:
df = df.withColumns(cols,lit(None))
我遇到了一个错误,它说 cols 应该是一个字符串或一个有效的 spark 列,我做错了什么?我的数据框是否也一直覆盖?我可以使用什么替代解决方案?
添加缺少的 2 列后我需要的输出
#+---+-----+---+---+----+-----+-----+
#| A| B | C | D | E | F | G |
#+---+-----+---+---+----+-----+-----+
#| 5 | 10 | 8 |9 | 0 | | |
#+---+-----+---+---+----+-----+-----+
【问题讨论】:
【参考方案1】:应该是df.withColumn
,没有s
。
以下对我有用:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
data = [
"A": 5, "B": 10, "C": 8, "D": 9, "E": 0,
]
df = spark.createDataFrame(data)
req_cols = ["A", "B", "C", "D", "E", "F", "G"]
for col in req_cols:
if col not in df.columns:
df = df.withColumn(col, F.lit(None))
结果:
+---+---+---+---+---+----+----+
|A |B |C |D |E |F |G |
+---+---+---+---+---+----+----+
|5 |10 |8 |9 |0 |null|null|
+---+---+---+---+---+----+----+
【讨论】:
更健壮的方法是为新列提供数据类型,例如,df = df.withColumn(col, F.lit(None).cast('integer'))
为什么需要在df.withColumn(col, F.lit(None).cast('integer'))中使用'F'
@RData from pyspark.sql.functions import *
不是一个好的做法,因为它可以覆盖 python 函数并导致不必要的问题(这通常是一个不好的做法,不仅仅是 pyspark)。始终使用from pyspark.sql import functions as F
,然后使用F.<function>
。以上是关于pyspark将列添加到列表中已经不存在的数据框的主要内容,如果未能解决你的问题,请参考以下文章