如何在 pyspark 中验证 Dataframe 的架构(列的编号和名称)?

Posted

技术标签:

【中文标题】如何在 pyspark 中验证 Dataframe 的架构(列的编号和名称)?【英文标题】:How can I verify the schema (number and name of the columns) of a Dataframe in pyspark? 【发布时间】:2021-12-06 16:11:03 【问题描述】:

我必须读取一个 csv 文件,并且我必须验证数据框的名称和列数。最小列数为 3,它们必须是:“id”、“name”和“phone”。没有比这更多的列的问题。但它总是需要至少有 3 列具有确切的名称。否则程序会失败。

例如: 正确:

+-----+-----+-----+   +-----+-----+-----+-----+
|   id| name|phone|   |   id| name|phone|unit |
+-----+-----+-----+   +-----+-----+-----+-----+
|3940A|jhon |1345 |   |3940A|jhon |1345 | 222 |
|2BB56|mike | 492 |   |2BB56|mike | 492 | 333 |
|3(401|jose |2938 |   |3(401|jose |2938 | 444 |
+-----+-----+-----+   +-----+-----+-----+-----+

不正确:

+-----+-----+-----+   +-----+-----+
|  sku| nomb|phone|   |  sku| name|
+-----+-----+-----+   +-----+-----+
|3940A|jhon |1345 |   |3940A|jhon |
|2BB56|mike | 492 |   |2BB56|mike |
|3(401|jose |2938 |   |3(401|jose |
+-----+-----+-----+   +-----+-----+

【问题讨论】:

所以基本上你只是想看看数据框中是否存在三列? 基本上是的,@JacobCelestine 【参考方案1】:

使用简单的python if-else 语句应该可以完成这项工作:

mandatory_cols = ["id", "name", "phone"]

if all(c in df.columns for c in mandatory_cols):
    # your logic
else:
    raise ValueError("missing columns!")

【讨论】:

【参考方案2】:

这是一个关于如何检查数据框中是否存在列的示例:

from pyspark.sql import Row


def check_columns_exits(cols):
    if 'id' in cols and 'name' in cols and 'phone' in cols:
        print("All required columns are present")
    else:
        print("Does not have all the required columns")


data = [Row(id="3940A", name="john", phone="1345", unit=222),
        Row(id="2BB56", name="mike", phone="492", unit=333)]
df = spark.createDataFrame(data)
check_columns_exits(df.columns)

data1 = [Row(id="3940A", name="john", unit=222),
         Row(id="2BB56", name="mike", unit=333)]
df1 = spark.createDataFrame(data1)
check_columns_exits(df1.columns)

结果:

All required columns are present
Does not have all the required columns

【讨论】:

以上是关于如何在 pyspark 中验证 Dataframe 的架构(列的编号和名称)?的主要内容,如果未能解决你的问题,请参考以下文章

如何在pyspark中更改DataFrame的hdfs块大小

如何在 jupyter 中像 pandas Dataframe 一样打印 Pyspark Dataframe

如何在 Databricks 的 PySpark 中使用在 Scala 中创建的 DataFrame

在 Pyspark 中合并 DataFrame

如何在pyspark中查找Dataframe列是一对一或一对多映射?

连接后如何在 Pyspark Dataframe 中选择和排序多个列