在过滤损坏的记录字段时,Spark 的 .count() 函数与数据帧的内容不同

Posted

技术标签:

【中文标题】在过滤损坏的记录字段时,Spark 的 .count() 函数与数据帧的内容不同【英文标题】:Spark's .count() function is different to the contents of the dataframe when filtering on corrupt record field 【发布时间】:2018-05-01 18:33:44 【问题描述】:

我有一个用 Python 编写的 Spark 作业,在检查其数据中的错误时会出现奇怪的行为。简化版如下:

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField, DoubleType
from pyspark.sql.functions import col, lit

spark = SparkSession.builder.master("local[3]").appName("pyspark-unittest").getOrCreate()
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")


SCHEMA = StructType([
    StructField("headerDouble", DoubleType(), False),
    StructField("ErrorField", StringType(), False)
])

dataframe = (
    spark.read
    .option("header", "true")
    .option("mode", "PERMISSIVE")
    .option("columnNameOfCorruptRecord", "ErrorField")
    .schema(SCHEMA).csv("./x.csv")
)

total_row_count = dataframe.count()
print("total_row_count = " + str(total_row_count))

errors = dataframe.filter(col("ErrorField").isNotNull())
errors.show()
error_count = errors.count()
print("errors count = " + str(error_count))

它正在读取的 csv 很简单:

headerDouble
wrong

这个的相关输出是

total_row_count = 1
+------------+----------+
|headerDouble|ErrorField|
+------------+----------+
|        null|     wrong|
+------------+----------+

errors count = 0

现在这怎么可能发生?如果数据帧有记录,怎么算为 0?这是 Spark 基础架构中的错误还是我遗漏了什么?

编辑:看起来这可能是 Spark 2.2 上的一个已知错误,已在 Spark 2.3 中修复 - https://issues.apache.org/jira/browse/SPARK-21610

【问题讨论】:

绝对是一个错误。基于可能在下推机制中某处的执行计划和行为。 Conisder dataframe.select(col("ErrorField").isNotNull()).show()dataframe.select("*", col("ErrorField").isNotNull()).show()... 【参考方案1】:

感谢@user6910411 - 似乎确实是一个错误。我在 Spark 项目的错误跟踪器中提出了一个问题。

我推测 Spark 会因为架构中存在 ErrorField 而感到困惑,该架构也被指定为错误列并用于过滤数据帧。

同时,我想我已经找到了一种解决方法,可以以合理的速度计算数据帧行数:

def count_df_with_spark_bug_workaround(df):
    return sum(1 for _ in df.toLocalIterator())

不太清楚为什么当.count() 不起作用时这会给出正确的答案。

我提出的 Jira 票证: https://issues.apache.org/jira/browse/SPARK-24147

这原来是重复的: https://issues.apache.org/jira/browse/SPARK-21610

【讨论】:

您使用的是哪个 spark 版本?我问是因为我收到了AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column 谢谢,这是 Spark 2.2-point-something。从在 2.3 分支中给出错误消息的意义上来说,这似乎已经“修复”了。

以上是关于在过滤损坏的记录字段时,Spark 的 .count() 函数与数据帧的内容不同的主要内容,如果未能解决你的问题,请参考以下文章

Spark Dataframe 中的过滤操作

一次sparksql问题排查记录

在 Spark 中过滤有效和无效记录

处理“不是有效的书签。”错误,记录已损坏

使用数据框的子集和 spark/scala 中的两个特定字段过滤数据框 [关闭]

Spark/Glue:.count() 或在约 20MM 记录和 1 个工作人员的数据帧上生成字段列表时的性能问题