pyspark将列添加到列表中已经不存在的数据框

Posted

技术标签:

【中文标题】pyspark将列添加到列表中已经不存在的数据框【英文标题】:pyspark adding columns to dataframe that are already not present from a list 【发布时间】:2021-12-01 22:19:56 【问题描述】:

我有一个数据框,其中包含来自源文件的列不一致,每次加载都可以添加或删除新列

我为所需的列创建了一个列表,我正在尝试通过检查我的列表来添加数据框中已经缺少的新列

req_cols = ["A","B","C","D","E","F","G"]
df.show()
#+---+-----+---+---+----+
#| A|   B  | C | D |  E |
#+---+-----+---+---+----+
#| 5 | 10  | 8 | 9 |  0 |
#+---+-----+---+---+----+

如果数据框存在,我现在检查列是否存在,如果不存在,我打算添加

for cols in req_cols:
    if cols not in df.columns:
         df = df.withColumns(cols,lit(None))

我遇到了一个错误,它说 cols 应该是一个字符串或一个有效的 spark 列,我做错了什么?我的数据框是否也一直覆盖?我可以使用什么替代解决方案?

添加缺少的 2 列后我需要的输出

#+---+-----+---+---+----+-----+-----+
#| A|   B  | C | D |  E | F   |  G  |
#+---+-----+---+---+----+-----+-----+
#| 5 | 10  | 8 |9  |  0 |     |     |
#+---+-----+---+---+----+-----+-----+

【问题讨论】:

【参考方案1】:

应该是df.withColumn,没有s

以下对我有用:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()
data = [
    "A": 5, "B": 10, "C": 8, "D": 9, "E": 0,
]

df = spark.createDataFrame(data)
req_cols = ["A", "B", "C", "D", "E", "F", "G"]
for col in req_cols:
    if col not in df.columns:
        df = df.withColumn(col, F.lit(None))

结果:

+---+---+---+---+---+----+----+                                                 
|A  |B  |C  |D  |E  |F   |G   |
+---+---+---+---+---+----+----+
|5  |10 |8  |9  |0  |null|null|
+---+---+---+---+---+----+----+

【讨论】:

更健壮的方法是为新列提供数据类型,例如,df = df.withColumn(col, F.lit(None).cast('integer')) 为什么需要在df.withColumn(col, F.lit(None).cast('integer'))中使用'F' @RData from pyspark.sql.functions import * 不是一个好的做法,因为它可以覆盖 python 函数并导致不必要的问题(这通常是一个不好的做法,不仅仅是 pyspark)。始终使用from pyspark.sql import functions as F,然后使用F.<function>

以上是关于pyspark将列添加到列表中已经不存在的数据框的主要内容,如果未能解决你的问题,请参考以下文章

将列动态添加到数据网格或使用列表中的特定列生成数据网格

怎么解决 ? (将列表添加到列数据框pyspark)

Pyspark 将列列表转换为聚合函数

通过使用pyspark将列转换为行来解析数据框中的Json字符串列表?

如何将列添加到已经存在的超网格?

从另一个 DataFrame 将列添加到 Pyspark DataFrame