Spark:将 JSON 文件转换为正确的格式

Posted

技术标签:

【中文标题】Spark:将 JSON 文件转换为正确的格式【英文标题】:Spark: Transforming JSON files to correct format 【发布时间】:2018-04-27 16:05:18 【问题描述】:

我有 100+ 百万条记录存储在具有以下 JSON 结构的文件中(实际数据有更多的列、行并且也是嵌套的)

"id":"2-2-3","key":"value""id":"2-2-3","key":"value""id":"2-2-3","key":"value""id":"2-2-3","key":"value""id":"2-2-3","key":"value"

sqlContext.read.json 函数无法解析这个,因为记录不是在多行上而是在一个大行上。下面的解决方案解决了这个问题,但却是一个很大的性能杀手。在性能方面,在 Apache Spark 中处理此问题的最佳方法是什么?

val rdd = sc.wholeTextFiles("s3://some-bucket/**/*")
val validJSON = rdd.flatMap(_._2.replace("", "\n").split("\n"))

val df = sqlContext.read.json(validJSON)

df.count()
df.select("id").show()

【问题讨论】:

输入文件的平均大小是多少? @user9613318 10mb,总共将近一百万个文件。 【参考方案1】:

这是Antot's answer 的即兴演奏,它应该处理嵌套的 JSON

input.toVector
    .foldLeft((false, Vector.empty[Char], Vector.empty[String])) 
      case ((true, charAccum, strAccum), '') => (false, Vector(''), strAccum :+ charAccum.mkString);
      case ((_, charAccum, strAccum), '')    => (true, charAccum :+ '', strAccum);
      case ((_, charAccum, strAccum), char)   => (false, charAccum :+ char, strAccum)
    
    ._3

基本上它所做的是将数据拆分为Vector[Char],并使用foldLeft 将输入聚合为子字符串。诀窍是跟踪有关前一个字符的足够信息,以确定 是否标志着新对象的开始。

我使用这个输入来测试它(基本上是 OP 的示例输入,其中包含一个嵌套对象):

val input = """"id":"2-2-3","key": "test": "value""id":"2-2-3","key":"value""id":"2-2-3","key":"value""id":"2-2-3","key":"value""id":"2-2-3","key":"value""""

得到了这个结果,看起来不错:

Vector("id":"2-2-3","key": "test": "value", 
       "id":"2-2-3","key":"value", 
       "id":"2-2-3","key":"value", 
       "id":"2-2-3","key":"value")

【讨论】:

【参考方案2】:

原始方法的问题是调用_._2.replace("", "\n",它从输入字符串创建另一个巨大的字符串,插入新的行字符,然后再次拆分成一个数组。

通过最小化中间字符串的创建并尽快检索目标字符串,可以进行改进。为此,我们可以玩一下子字符串:

val validJson = rdd.flatMap(rawJson => 

  // functions extracted to make it more readable.
  def nextObjectStartIndex(fromIndex: Int):Int = rawJson._2.indexOf('', fromIndex)
  def currObjectEndIndex(fromIndex: Int): Int = rawJson._2.indexOf('', fromIndex)
  def extractObject(fromIndex: Int, toIndex: Int): String = rawJson._2.substring(fromIndex, toIndex + 1)

  // the resulting strings are put in a local buffer
  val buffer = new ListBuffer[String]()

  // init the scanning of the input string
  var posStartNextObject = nextObjectStartIndex(0)

  // main loop terminates when there are no more '' chars
  while (posStartNextObject != -1) 
    val posEndObject = currObjectEndIndex(posStartNextObject)
    val extractedObject = extractObject(posStartNextObject, posEndObject)
    posStartNextObject = nextObjectStartIndex(posEndObject)
    buffer += extractedObject
  

  buffer
)

请注意,这种方法只有在输入 JSON 中的对象没有嵌套时才有效,假设所有花括号将同一级别的对象分开。

【讨论】:

感谢您提供出色的答案和有趣的解决方案。不幸的是,这个 JSON 是嵌套的,我会更新我的问题。

以上是关于Spark:将 JSON 文件转换为正确的格式的主要内容,如果未能解决你的问题,请参考以下文章

在 Spark SQL 中,将 JSON 键名转换为值

使用 Java 将 Json 对象转换为 Parquet 格式而不转换为 AVRO(不使用 Spark、Hive、Pig、Impala)

使用 Spark 处理 txt 文件

如何将 JSON 格式的数据展平为 spark 数据框

如何将 Array[String] 转换为 spark Dataframe 以保存 CSV 文件格式? [复制]

使用 pyspark 将 spark 数据帧转换为嵌套 JSON