在过滤损坏的记录字段时,Spark 的 .count() 函数与数据帧的内容不同
Posted
技术标签:
【中文标题】在过滤损坏的记录字段时,Spark 的 .count() 函数与数据帧的内容不同【英文标题】:Spark's .count() function is different to the contents of the dataframe when filtering on corrupt record field 【发布时间】:2018-05-01 18:33:44 【问题描述】:我有一个用 Python 编写的 Spark 作业,在检查其数据中的错误时会出现奇怪的行为。简化版如下:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField, DoubleType
from pyspark.sql.functions import col, lit
spark = SparkSession.builder.master("local[3]").appName("pyspark-unittest").getOrCreate()
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
SCHEMA = StructType([
StructField("headerDouble", DoubleType(), False),
StructField("ErrorField", StringType(), False)
])
dataframe = (
spark.read
.option("header", "true")
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", "ErrorField")
.schema(SCHEMA).csv("./x.csv")
)
total_row_count = dataframe.count()
print("total_row_count = " + str(total_row_count))
errors = dataframe.filter(col("ErrorField").isNotNull())
errors.show()
error_count = errors.count()
print("errors count = " + str(error_count))
它正在读取的 csv 很简单:
headerDouble
wrong
这个的相关输出是
total_row_count = 1
+------------+----------+
|headerDouble|ErrorField|
+------------+----------+
| null| wrong|
+------------+----------+
errors count = 0
现在这怎么可能发生?如果数据帧有记录,怎么算为 0?这是 Spark 基础架构中的错误还是我遗漏了什么?
编辑:看起来这可能是 Spark 2.2 上的一个已知错误,已在 Spark 2.3 中修复 - https://issues.apache.org/jira/browse/SPARK-21610
【问题讨论】:
绝对是一个错误。基于可能在下推机制中某处的执行计划和行为。 Conisderdataframe.select(col("ErrorField").isNotNull()).show()
与 dataframe.select("*", col("ErrorField").isNotNull()).show()
...
【参考方案1】:
感谢@user6910411 - 似乎确实是一个错误。我在 Spark 项目的错误跟踪器中提出了一个问题。
我推测 Spark 会因为架构中存在 ErrorField
而感到困惑,该架构也被指定为错误列并用于过滤数据帧。
同时,我想我已经找到了一种解决方法,可以以合理的速度计算数据帧行数:
def count_df_with_spark_bug_workaround(df):
return sum(1 for _ in df.toLocalIterator())
不太清楚为什么当.count()
不起作用时这会给出正确的答案。
我提出的 Jira 票证: https://issues.apache.org/jira/browse/SPARK-24147
这原来是重复的: https://issues.apache.org/jira/browse/SPARK-21610
【讨论】:
您使用的是哪个 spark 版本?我问是因为我收到了AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column
谢谢,这是 Spark 2.2-point-something。从在 2.3 分支中给出错误消息的意义上来说,这似乎已经“修复”了。以上是关于在过滤损坏的记录字段时,Spark 的 .count() 函数与数据帧的内容不同的主要内容,如果未能解决你的问题,请参考以下文章