Pyspark - 使用累加器检查 Json 格式

Posted

技术标签:

【中文标题】Pyspark - 使用累加器检查 Json 格式【英文标题】:Pyspark - checking Json format using accumulator 【发布时间】:2018-05-07 17:04:10 【问题描述】:

如何检查 JSON 文件是否损坏,例如缺少 、、逗号或错误的数据类型。我试图通过使用累加器来实现,因为进程在多个执行器上运行。

spark_config = SparkConf().setAppName(application_name)
ss = SparkSession.builder.config(conf=spark_config).getOrCreate()

class StringAccumulatorParam(AccumulatorParam):
  def zero(self, v):
      return []
  def addInPlace(self, variable, value):
      variable.append(value)
      return variable
errorCount = ss.sparkContext.accumulator(0)
errorValues = ss.sparkContext.accumulator("", StringAccumulatorParam())

newSchema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True)
    StructField("status", BooleanType(), True)])

errorDF = ss.read.json("/Users/test.jsonl")
errorDF2 = ss.createDataFrame(errorDF, newSchema).cache()

def checkErrorCount(row):
   global errorCount
   errorDF2["id"] = row. newSchema["id"]
      errorCount.add(1)
      errorValues.add(errorDF2["id"])

errorDF.foreach(lambda x: checkErrorCount(x))
print(" rows had questionable values.".format(errorCount.value))

ss.stop()

这是损坏的 JSON 文件 -

"name":"Standards1","id":90,"status":true
"name":"Standards2","id":91
"name":"Standards3","id":92,"status":true
"name":781,"id":93,"status":true

【问题讨论】:

我可能会将文件加载为文本文件。然后编写一个用户定义函数 (udf) 来尝试将每个文本行转换为 JSON。如果成功则输出0,否则输出1。然后对结果求和。 另一种选择是读取文件两次。一次作为文本获取总行数。然后再次作为带有选项 mode=DROPMALFORMED 的 json 来获得有效计数。区别在于坏行的数量。 谢谢。我有 40 多个良好大小 (MB) 的文件,因此阅读它们两次将超过我们完成工作的时间限制。我曾尝试使用“FAILFAST”,但由于多个执行者的作业我无法捕获异常。有没有办法只获取坏行的数量? 如何将每个文本行转换为 JSON。我已经尝试过 textFile("/path/filename.jsonl") 但如何在 Python Spark (PySpark) 中将此 texFile 解析为 JSON。 【参考方案1】:

我对此进行了尝试,并提出了以下建议。

在 2 个解决方案中,我认为计数差异会更快,因为它将使用原生 Spark JSON 处理。

UDF 解决方案将在 Python 中进行 JSON 解析,这意味着您必须支付将每个文件行从 Java 传输到 Python 的成本,因此可能会更慢。

import json
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, udf
from pyspark.sql.types import LongType

application_name = 'Count bad JSON lines'
spark_config = SparkConf().setAppName(application_name)
ss = SparkSession.builder.config(conf=spark_config).getOrCreate()

# Difference of counts solution
input_path = '/baddata.json'
total_lines = ss.read.text(input_path).count()
good_lines = ss.read.option('mode', 'DROPMALFORMED').json(input_path).count()
bad_lines = total_lines - good_lines
print('Found  bad JSON lines in data'.format(bad_lines))

# Parse JSON with UDF solution
def is_bad(line):
    try:
        json.loads(line)
        return 0
    except ValueError:
        return 1

is_bad_udf = udf(is_bad, LongType())
lines = ss.read.text(input_path)
bad_sum = lines.select(sum(is_bad_udf('value'))).collect()[0][0]
print('Got  bad lines'.format(bad_sum))

ss.stop()

【讨论】:

以上是关于Pyspark - 使用累加器检查 Json 格式的主要内容,如果未能解决你的问题,请参考以下文章

pyspark中使用累加器Accumulator统计指标

想将key添加到pyspark dataFrame的爆炸数组中

使用 Pyspark 处理 JSON 结构

pyspark ImportError:无法导入名称累加器

PySpark 从 excel 中读取,只有一列 json 格式

PySpark 从目录中读取多个 txt 文件为 json 格式