在火花中加入大小不等的数据集

Posted

技术标签:

【中文标题】在火花中加入大小不等的数据集【英文标题】:Joining unequal sized data sets in spark 【发布时间】:2015-08-27 17:00:41 【问题描述】:

我有以下数据集:

Dataset 1:                 Dataset 2:                   Dataset 3:
id  field1                 l_id    r_id                 id field2

以下是它们的尺寸: 数据集1:20G 数据集2:5T 数据集3:20G

目标: 我想将 id 字段上的所有这些数据集(l_id 与 Dataset1 中的 id 和 r_id 与 Dataset 3 中的 id )与最终数据集相结合,如下所示:

l_id     r_id     field1      field2

我目前的做法: 加入 Dataset1 和 Dataset2(在 id 和 l_id 上)以生成(l_id r_id field1),然后将其与 Dataset3(在 r_id 和 id 上)结合以生成(l_id r_id field1 field2) 我假设 spark 自动使用哈希分区器查看字段被加入。 但是,这种方法会导致其中一个执行程序耗尽磁盘空间,这可能是由于洗牌的数量。

您能建议我如何加入这些数据集吗?我是否理解 spark 默认使用哈希分区器,查看正在连接的列是否正确?还是我必须先手动对数据进行分区,然后再执行连接?

请注意,广播 Dataset1/2 不是一种选择,因为它们太大了,将来可能会变得更大。此外,所有数据集都是非键值 RDD,包含的字段比此处列出的字段多。所以我不确定默认分区是如何工作的以及如何配置自定义分区器。

谢谢。

更新 1

我正在使用 hive SQL 执行所有连接,并将 spark.sql.shuffle.partitions 设置为 33000 和以下配置:

sparkConf.set("spark.akka.frameSize", "500")
sparkConf.set("spark.storage.memoryFraction", "0.2")
sparkConf.set("spark.network.timeout", "1200")
sparkConf.set("spark.yarn.scheduler.heartbeat.interval-ms", "10000")
sparkConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
sparkConf.set("spark.driver.maxResultSize", "0")
sparkConf.set("spark.shuffle.consolidateFiles", "true")

我还可以控制所有这些数据集的生成方式。他们似乎都没有设置分区器(通过查看 rdd.partitioner),而且我在 SQLContext 中看不到任何 API,它可以让我在创建数据框时配置分区器。

我正在使用 scala 和 Spark 1.3

【问题讨论】:

你的集群配置是什么? 这是一个 100 个节点的集群,每个节点有 60G RAM 和 745G 磁盘空间。我的作业配置是20G driver-memory、20G executor-memory、2 executor-cores、120 num-executors、33000 spark.sql.shuffle.partitions和2 spark.driver.cores。 你能从失败的执行者那里发布关于洗牌的写入和日志的信息吗?还有你在 shuffle 目录中有多少空间,也许可以尝试调整 shuffle memoryFraction。 您好,shuffle目录大约有15T空间。但是,该作业导致大约 18T 的随机写入和 10T 的随机读取。我试图弄清楚是否有办法重新分区数据或批量读取更大的数据集(执行连接并合并部分数据集)以减少洗牌 【参考方案1】:

数据的分区取决于 RDD 的来源。您不需要手动重新分区数据。但是,如果您对数据进行重新分区以使它们具有相同的分区器,那么加入(& cogrouping)将导致狭窄的转换,而不必在加入时进行洗牌。请注意,在较新版本的 Spark (1.2+) 中,默认随机播放现在是基于排序的随机播放,而不是基于哈希的随机播放。

在没有代码和日志的情况下很难说如何更改您的联接(也许还可以知道 id 的分布是什么样的)。

如果出现不平衡数据的问题,您可以尝试增加分区数量(作为输入和输出)。一种可能是您的暂存空间太小,您可以将 Spark 配置为使用不同的目录来临时存储 spark.local.dir。如果您的对象是 kyro 可序列化的(或者如果您有时间添加它),您可能还想查看更改 spark.serializer,因为不同的序列化可以占用更少的空间。

虽然与作业完成没有直接关系,但您可能还希望增加 spark.shuffle.memoryFraction 并减少 spark.storage.memoryFraction 以减少洗牌期间所需的溢出到磁盘的数量。

如果您的数据结构稍有不同,一种选择是使用cogroup,它支持同时连接多个 RDD,但这要求所有键都相同。

注意:这一切都假设您使用的是原始 Spark 而不是 Spark SQL。要调整 Spark SQL 连接,请查看 https://spark.apache.org/docs/latest/sql-programming-guide.html(尤其考虑调整 spark.sql.shuffle.partitions)。

希望这会有所帮助。

【讨论】:

感谢您的回复。我正在使用 hive sql 来执行连接。我用我正在使用的配置更新了我的原始帖子。

以上是关于在火花中加入大小不等的数据集的主要内容,如果未能解决你的问题,请参考以下文章

火花数据集的转换

根据不等式条件连接两个数据集

如何在火花中遍历数据集列?

火花数据集分组和总和

如何在不转换为火花数据集的情况下遍历数据框?

火花数据集:如何从列中获取唯一值的出现次数