Spark 12 GB 数据加载与 Window 函数性能问题

Posted

技术标签:

【中文标题】Spark 12 GB 数据加载与 Window 函数性能问题【英文标题】:Spark 12 GB data load with Window function Performance issue 【发布时间】:2017-12-10 03:27:17 【问题描述】:

我正在使用 sparksql 转换 12 GB 数据。我的转换是在一个字段上应用带有分区的行号函数,然后将数据分成两组,第一组其中行号为 1,第二组包括其余数据,然后写入数据到 30 个分区中的目标位置。

我的工作目前大约需要 1 小时。我想在 10 分钟内完成。

我正在使用规格(16 核和 32 GB RAM)的 3 节点集群上运行此作业。 节点 1 纱线主节点。 节点 2 两个执行器 1 个驱动程序和 1 个其他 节点 3 两个执行器都用于处理。 每个执行器分配有 5 个内核和 10GB 内存。

我的硬件够用还是我需要更强大的硬件? 执行器配置正确吗? 如果硬件和配置都不错,那我肯定需要改进我的代码。

我的代码如下。

sqlContext=SQLContext(sc)

SalesDf = sqlContext.read.options(header='true').load(path, format='csv')
SalesDf.cache()

SalesDf_Version=SalesDf.withColumn('row_number',F.row_number().over(Window.partitionBy("id").orderBy(desc("recorddate"))))

initialLoad = SalesDf_Version.withColumn('partition',SalesDf_Version.year).withColumn('isActive', when(col('row_number') == 1, lit('Y')).when(col('row_number') != 1, lit('N')))
initialLoad = initialLoad.withColumn('version_flag',col ('isActive')).withColumn('partition',col('city'))
initialLoad = initialLoad.drop('row_number')


initialLoad.coalesce(1).write.partitionBy('isActive','partition').option("header", "true").mode('overwrite').csv(path +'Temp/target/')
initialLoad.coalesce(1).write.partitionBy('isActive','partition').option("header", "true").mode('overwrite').csv(path +'Temp/target/')

sc.stop()

提前感谢您的帮助

【问题讨论】:

也许你可以发布代码,这样可能更容易评论。 上传了我的代码 根据coalesce,见***.com/questions/44494656/… 【参考方案1】:

你在写之前有一个coalesce(1),这是什么原因? Coalesce 减少了该阶段的并行化,在您的情况下,这将导致窗口查询在 1 个核心上运行,因此您将失去每个节点 16 个核心的优势。

移除合并,这应该会开始改进。

【讨论】:

当我在小数据集上测试时,删除 coalesce(1) 会降低性能,但那是在具有两个核心的非常小的集群上。我应该删除它还是应该将它增加到 coalesce(12) 或其他东西 你可以调整spark.sql.shuffle.partitions,而不是合并,默认为 200。在非常小的数据集上,是的,这个值可能太高了。对于您的实际数据集,请尝试不使用 coalesce 并保留默认的 spark.sql.shuffle.partitions。然后让我们知道它是如何运行的。 移除合并后花了 1.5 小时 id 列的基数是多少? SalesDf.groupBy("id").count().show() 另外,您的示例代码中有 2 个保存操作,这是故意的还是只是复制粘贴错误? 我在写时尝试了 coalesce(12),它在 15 分钟内执行。关于你的问题,我写了两次它不是复制粘贴错误。【参考方案2】:

以下是我们为提高代码性能而实施的更改。

我们删除了 coalesce 并使用了 repartition(50)。我们在括号中尝试了更高和更低的数字,但 50 是我们案例中的优化数字。 我们使用 s3 作为我们的目标,但由于在 spark 中重命名事物,我们花费了很多,所以我们使用 HDFS 代替,我们的工作时间减少到以前的一半。 总体而言,通过上述更改,我们的代码运行了 12 分钟,之前是 50 分钟。 谢谢 阿马尔

【讨论】:

而不是repartition,您可以尝试将spark.sql.shuffle.partitions 设置为50 看看它是如何运行的吗?这会将您的洗牌减少到只有 1 个窗口。此外,如果您正在写入 s3,则应将文件输出提交程序版本设置为 2,请参阅此处docs.databricks.com/spark/latest/faq/… 当然我会尝试这个改变并分享它是如何运行的

以上是关于Spark 12 GB 数据加载与 Window 函数性能问题的主要内容,如果未能解决你的问题,请参考以下文章

spark如何加载大于集群磁盘大小的输入文件?

Spark DataSet 过滤器性能

Spark结构化流内存绑定

Apache Spark 如何处理不适合内存的数据?

在 Spark 中高效地连接一个大表(1TB)和另一个小表(250GB)

用Spark读取庞大的CSV文件