如何在 Spark 上平均分配 Beam 任务?

Posted

技术标签:

【中文标题】如何在 Spark 上平均分配 Beam 任务?【英文标题】:How to distribute Beam Tasks evenly on Spark? 【发布时间】:2017-03-29 21:04:03 【问题描述】:

我有一个简单的管道,它可以读取文本文件和 mysql 的记录并尝试协调它们,即当数据库中不存在记录时插入记录,使用文件更新数据库中的记录,并执行其他操作更新文件中不存在的数据库中的记录。

在 Spark 中使用 2M 记录运行时出现的问题如下:

我的预感是下面的代码会产生这种不平衡

        final TupleTag<FileRecord> fileTag = new TupleTag<>();
        final TupleTag<MysqlRecord> mysqlTag = new TupleTag<>();
        PCollection<KV<Integer, CoGbkResult>> joinedRawCollection =
                KeyedPCollectionTuple.of(fileTag, fileRecords)
                        .and(mysqlTag, mysqlRecords)
                        .apply(CoGroupByKey.create());

这是 Spark Executor DAG 可视化

最终,一个工人将耗尽内存。我知道在 Spark 中,可以指定分区器来帮助将工作负载分配给工作人员。但是,我如何在 Beam 中做到这一点?

编辑:

我怀疑 JDBCIo 无法正确分发一个查询,因此我将其拆分为多个 PCollection,然后将它们展平。我从 Mysql 中读取速度更快,但最终遇到了同样的问题。

以下是正在进行的阶段:

但是每个阶段仍然受到这种不平衡的影响?:

【问题讨论】:

实际上,这种不平衡的原因可能是因为它是来自 MySQL 的读取步骤,具有那么多记录。由于 JDBCIO 可能不会分发一个 SELECT 查询,因此我们看到了这种争用。让我试着把它分开。 【参考方案1】:

通过意识到我自己未能区分 Spark Stages 和任务来回答我自己的问题。任务确实分散了,只是我实际上没有为驱动程序分配足够的内存。

【讨论】:

以上是关于如何在 Spark 上平均分配 Beam 任务?的主要内容,如果未能解决你的问题,请参考以下文章

Spark 任务内存分配

Spark常规性能调优

Spark 常规性能调优

Hadoop YARN 集群/Spark 和 RAM 磁盘

如果分配更多内核,单个 Spark 任务会在计算上消耗更多时间

如何将 zip 文件的内容分配给 Spark 中的每个任务?