Spark:从异构数据中写入 Paquet

Posted

技术标签:

【中文标题】Spark:从异构数据中写入 Paquet【英文标题】:Spark: write Paquet from heterogeneous data 【发布时间】:2021-11-20 11:39:54 【问题描述】:

我有一个包含异构 JSON 的数据集,我可以按类型对其进行分组并应用 StructType。例如。 RDD[(Type, JSON)]Set[Type],包含原始 RDD 中的所有类型。

现在我想将这些 JSON 写入一个类型化的 Parquet 文件,按类型分区。

我目前做的是:

val getSchema: Type => StructType = ???
val buildRow: StructType => JSON => Row = ???

types.foreach  jsonType =>
  val sparkSchema: StructType = getSchema(jsonType)
  val rows: RDD[Row] = rdd
    .filter(k => k == jsonType)
    .map  case (_, json) => buildRow(sparkSchema)(json) 
  spark.createDataFrame(rows, sparkSchema).write.parquet(path)

它可以工作,但是效率很低,因为它需要多次遍历原始 RDD - 我通常有几十甚至几百种类型。

有没有更好的解决方案?我试图合并数据框,但它失败了,因为无法合并具有不同数量的行。或者至少我可以通过取消持久化来释放数据帧占用的资源?

【问题讨论】:

“类型”的数据类型是什么? 它是Set[Type],其中Type 是我的域特定架构——唯一重要的是我们可以从中派生StructType 【参考方案1】:

我想到了一些选项(有些可能适合也可能不适合您的要求)。

    如果不同类型不需要是单独的文件,您可以使用 type1col : StructType1, type2 : StructType2, etc 之类的架构并写出仅填充一个结构列的 parquet 文件。 重新分区数据以获取同一分区中所有相同类型的 JSON 对象(+ 某种辅助键),并写出分区数据。然后将其读入并按类型过滤(如果将其写出分区,则只需加载一次数据)。这只需要读取数据 2 倍(但 shuffle 可能会很昂贵)。 您可以创建一个自定义WriteBatch,它对(类型,JSON)的行进行操作,在写入之前应用转换并为每种类型保留一个打开的文件句柄。如果您需要单独的文件,这可能是最有效的,但需要大量代码来维护和调试。

【讨论】:

谢谢!第一个选项是我的想法 - 我非常想将数据写入单独的文件,但现在看来我必须坚持使用胖表格式。第二个选项对我们不起作用,因为这项工作对成本敏感,现在我们将数据写入 TSV,在小型集群上大约需要 2 小时,第二个选项(以及我在问题中显示的内容)需要 > 10小时。第三个选项虽然是全新的 - 我将尝试它。 如果你没有绑定到 spark(和 JVM),我也会添加,这纯粹是转换 JSON + Type => Parquet,使用像 Dask 这样的另一个框架可能更简单的代码(基于 Python)或者如果您的数据足够小,根本没有框架...... 重新分区需要很长时间的事实意味着您的数据不适合集群内存(磁盘溢出)。正如@Micah Kornfield 所建议的那样,您最好不要使用spark,或者增加集群大小。【参考方案2】:

这将有助于加快 parquet 文件的写入速度。由于typesSet[Type] 类型,foreach 循环将按顺序发生。这意味着镶木地板文件是一个一个地写入的(不是并行的)。并且由于每个parquet文件的写入是相互独立的,如果一次写入多个文件应该没有问题。

使用 scala 并行集合(参考:https://docs.scala-lang.org/overviews/parallel-collections/overview.html)将是实现它的最简单方法。 尝试将第 4 行编辑为:

types.par.foreach  jsonType =>

如果多次访问,也缓存原始RDD。

【讨论】:

以上是关于Spark:从异构数据中写入 Paquet的主要内容,如果未能解决你的问题,请参考以下文章

在函数调用时从异构初始化列表构建元组

基于Spark的异构分布式深度学习平台

数据异构缓存策略

如何在 S3 中查询异构 JSON 数据?

Informatica_组件

异构的概念?大数据量的异构处理?