Spark 12 GB 数据加载与 Window 函数性能问题
Posted
技术标签:
【中文标题】Spark 12 GB 数据加载与 Window 函数性能问题【英文标题】:Spark 12 GB data load with Window function Performance issue 【发布时间】:2017-12-10 03:27:17 【问题描述】:我正在使用 sparksql 转换 12 GB 数据。我的转换是在一个字段上应用带有分区的行号函数,然后将数据分成两组,第一组其中行号为 1,第二组包括其余数据,然后写入数据到 30 个分区中的目标位置。
我的工作目前大约需要 1 小时。我想在 10 分钟内完成。
我正在使用规格(16 核和 32 GB RAM)的 3 节点集群上运行此作业。 节点 1 纱线主节点。 节点 2 两个执行器 1 个驱动程序和 1 个其他 节点 3 两个执行器都用于处理。 每个执行器分配有 5 个内核和 10GB 内存。
我的硬件够用还是我需要更强大的硬件? 执行器配置正确吗? 如果硬件和配置都不错,那我肯定需要改进我的代码。
我的代码如下。
sqlContext=SQLContext(sc)
SalesDf = sqlContext.read.options(header='true').load(path, format='csv')
SalesDf.cache()
SalesDf_Version=SalesDf.withColumn('row_number',F.row_number().over(Window.partitionBy("id").orderBy(desc("recorddate"))))
initialLoad = SalesDf_Version.withColumn('partition',SalesDf_Version.year).withColumn('isActive', when(col('row_number') == 1, lit('Y')).when(col('row_number') != 1, lit('N')))
initialLoad = initialLoad.withColumn('version_flag',col ('isActive')).withColumn('partition',col('city'))
initialLoad = initialLoad.drop('row_number')
initialLoad.coalesce(1).write.partitionBy('isActive','partition').option("header", "true").mode('overwrite').csv(path +'Temp/target/')
initialLoad.coalesce(1).write.partitionBy('isActive','partition').option("header", "true").mode('overwrite').csv(path +'Temp/target/')
sc.stop()
提前感谢您的帮助
【问题讨论】:
也许你可以发布代码,这样可能更容易评论。 上传了我的代码 根据coalesce,见***.com/questions/44494656/… 【参考方案1】:你在写之前有一个coalesce(1)
,这是什么原因? Coalesce 减少了该阶段的并行化,在您的情况下,这将导致窗口查询在 1 个核心上运行,因此您将失去每个节点 16 个核心的优势。
移除合并,这应该会开始改进。
【讨论】:
当我在小数据集上测试时,删除 coalesce(1) 会降低性能,但那是在具有两个核心的非常小的集群上。我应该删除它还是应该将它增加到 coalesce(12) 或其他东西 你可以调整spark.sql.shuffle.partitions
,而不是合并,默认为 200。在非常小的数据集上,是的,这个值可能太高了。对于您的实际数据集,请尝试不使用 coalesce
并保留默认的 spark.sql.shuffle.partitions
。然后让我们知道它是如何运行的。
移除合并后花了 1.5 小时
id
列的基数是多少? SalesDf.groupBy("id").count().show()
另外,您的示例代码中有 2 个保存操作,这是故意的还是只是复制粘贴错误?
我在写时尝试了 coalesce(12),它在 15 分钟内执行。关于你的问题,我写了两次它不是复制粘贴错误。【参考方案2】:
以下是我们为提高代码性能而实施的更改。
我们删除了 coalesce 并使用了 repartition(50)。我们在括号中尝试了更高和更低的数字,但 50 是我们案例中的优化数字。 我们使用 s3 作为我们的目标,但由于在 spark 中重命名事物,我们花费了很多,所以我们使用 HDFS 代替,我们的工作时间减少到以前的一半。 总体而言,通过上述更改,我们的代码运行了 12 分钟,之前是 50 分钟。 谢谢 阿马尔
【讨论】:
而不是repartition
,您可以尝试将spark.sql.shuffle.partitions
设置为50 看看它是如何运行的吗?这会将您的洗牌减少到只有 1 个窗口。此外,如果您正在写入 s3,则应将文件输出提交程序版本设置为 2,请参阅此处docs.databricks.com/spark/latest/faq/…
当然我会尝试这个改变并分享它是如何运行的以上是关于Spark 12 GB 数据加载与 Window 函数性能问题的主要内容,如果未能解决你的问题,请参考以下文章