将 JSON 文件读入 Spark 时出现 _corrupt_record 错误

Posted

技术标签:

【中文标题】将 JSON 文件读入 Spark 时出现 _corrupt_record 错误【英文标题】:_corrupt_record error when reading a JSON file into Spark 【发布时间】:2016-05-26 08:57:37 【问题描述】:

我有这个 JSON 文件


    "a": 1, 
    "b": 2

通过Python json.dump 方法获得。 现在,我想使用 pyspark 将此文件读入 Spark 中的 DataFrame。按照文档,我正在这样做

sc = SparkContext()

sqlc = SQLContext(sc)

df = sqlc.read.json('my_file.json')

打印 df.show()

虽然打印语句吐出了这个:

+---------------+
|_corrupt_record|
+---------------+
|              |
|       "a": 1, |
|         "b": 2|
|              |
+---------------+

任何人都知道发生了什么以及为什么它没有正确解释文件?

【问题讨论】:

【参考方案1】:

如果您想保持 JSON 文件原样(不删除换行符 \n),请包含 multiLine=True 关键字参数

sc = SparkContext() 
sqlc = SQLContext(sc)

df = sqlc.read.json('my_file.json', multiLine=True)

print df.show()

【讨论】:

【参考方案2】:

您需要在输入文件中的每一行有一个 json 对象,请参阅http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.json

如果您的 json 文件看起来像这样,它将为您提供预期的数据帧:

 "a": 1, "b": 2 
 "a": 3, "b": 4 

....
df.show()
+---+---+
|  a|  b|
+---+---+
|  1|  2|
|  3|  4|
+---+---+

【讨论】:

如果我的 JSON 文件很大(有 100K 行)并且记录之间有很多新行(列或特征),我该如何解决?谢谢。 也许使用jq 重新格式化(压缩)文件? @M.Rez @ttimasdf 即使用jq -c 选项。【参考方案3】:

在 Spark 2.2+ 中,您可以使用以下命令读取多行的 json 文件。

val dataframe = spark.read.option("multiline",true).json( " filePath ")

如果每行有json对象,则

val dataframe = spark.read.json(filepath)

【讨论】:

这是 scala,不是 python。【参考方案4】:

添加到@Bernhard 的精彩回答

# original file was written with pretty-print inside a list
with open("pretty-printed.json") as jsonfile:
    js = json.load(jsonfile)      

# write a new file with one object per line
with open("flattened.json", 'a') as outfile:
    for d in js:
        json.dump(d, outfile)
        outfile.write('\n')

【讨论】:

【参考方案5】:

我想分享我的经验,我有一个 JSON 列 String 但使用 Python 表示法,这意味着我有 None 而不是 nullFalse 而不是 falseTrue 而不是 @ 987654326@.

解析此列时,spark 会返回一个名为 _corrupt_record 的列。所以在解析 JSON 字符串之前我必须做的是用标准 JSON 表示法替换 Python 表示法:

df.withColumn("json_notation",
    F.regexp_replace(F.regexp_replace(F.regexp_replace("_corrupt_record", "None", "null"), "False", "false") ,"True", "true")

经过此转换后,我可以在 json_notation 列上使用函数 F.from_json(),并且 Pyspark 能够正确解析 JSON 对象。

【讨论】:

以上是关于将 JSON 文件读入 Spark 时出现 _corrupt_record 错误的主要内容,如果未能解决你的问题,请参考以下文章

将 json 文件读入 CoreData 时出现精度错误

将 json 文件读入 Spark DataFrame

在 Spark 2.0 中访问向量列时出现 MatchError

尝试将 JSON 文件展平为 CSV 时出现错误消息

使用 pyspark,如何将文件中单行的多个 JSON 文档读入数据框?

Swift 解析 JSON 时出现问题:无法将“__NSCFDictionary”类型的值转换为“NSArray”错误