Spark:从异构数据中写入 Paquet
Posted
技术标签:
【中文标题】Spark:从异构数据中写入 Paquet【英文标题】:Spark: write Paquet from heterogeneous data 【发布时间】:2021-11-20 11:39:54 【问题描述】:我有一个包含异构 JSON 的数据集,我可以按类型对其进行分组并应用 StructType。例如。 RDD[(Type, JSON)]
和 Set[Type]
,包含原始 RDD 中的所有类型。
现在我想将这些 JSON 写入一个类型化的 Parquet 文件,按类型分区。
我目前做的是:
val getSchema: Type => StructType = ???
val buildRow: StructType => JSON => Row = ???
types.foreach jsonType =>
val sparkSchema: StructType = getSchema(jsonType)
val rows: RDD[Row] = rdd
.filter(k => k == jsonType)
.map case (_, json) => buildRow(sparkSchema)(json)
spark.createDataFrame(rows, sparkSchema).write.parquet(path)
它可以工作,但是效率很低,因为它需要多次遍历原始 RDD - 我通常有几十甚至几百种类型。
有没有更好的解决方案?我试图合并数据框,但它失败了,因为无法合并具有不同数量的行。或者至少我可以通过取消持久化来释放数据帧占用的资源?
【问题讨论】:
“类型”的数据类型是什么? 它是Set[Type]
,其中Type
是我的域特定架构——唯一重要的是我们可以从中派生StructType
。
【参考方案1】:
我想到了一些选项(有些可能适合也可能不适合您的要求)。
-
如果不同类型不需要是单独的文件,您可以使用
type1col : StructType1, type2 : StructType2, etc
之类的架构并写出仅填充一个结构列的 parquet 文件。
重新分区数据以获取同一分区中所有相同类型的 JSON 对象(+ 某种辅助键),并写出分区数据。然后将其读入并按类型过滤(如果将其写出分区,则只需加载一次数据)。这只需要读取数据 2 倍(但 shuffle 可能会很昂贵)。
您可以创建一个自定义WriteBatch,它对(类型,JSON)的行进行操作,在写入之前应用转换并为每种类型保留一个打开的文件句柄。如果您需要单独的文件,这可能是最有效的,但需要大量代码来维护和调试。
【讨论】:
谢谢!第一个选项是我的想法 - 我非常想将数据写入单独的文件,但现在看来我必须坚持使用胖表格式。第二个选项对我们不起作用,因为这项工作对成本敏感,现在我们将数据写入 TSV,在小型集群上大约需要 2 小时,第二个选项(以及我在问题中显示的内容)需要 > 10小时。第三个选项虽然是全新的 - 我将尝试它。 如果你没有绑定到 spark(和 JVM),我也会添加,这纯粹是转换 JSON + Type => Parquet,使用像 Dask 这样的另一个框架可能更简单的代码(基于 Python)或者如果您的数据足够小,根本没有框架...... 重新分区需要很长时间的事实意味着您的数据不适合集群内存(磁盘溢出)。正如@Micah Kornfield 所建议的那样,您最好不要使用spark,或者增加集群大小。【参考方案2】:这将有助于加快 parquet 文件的写入速度。由于types
是Set[Type]
类型,foreach 循环将按顺序发生。这意味着镶木地板文件是一个一个地写入的(不是并行的)。并且由于每个parquet文件的写入是相互独立的,如果一次写入多个文件应该没有问题。
使用 scala 并行集合(参考:https://docs.scala-lang.org/overviews/parallel-collections/overview.html)将是实现它的最简单方法。 尝试将第 4 行编辑为:
types.par.foreach jsonType =>
如果多次访问,也缓存原始RDD。
【讨论】:
以上是关于Spark:从异构数据中写入 Paquet的主要内容,如果未能解决你的问题,请参考以下文章