加入后引发明显的性能问题
Posted
技术标签:
【中文标题】加入后引发明显的性能问题【英文标题】:Spark distinct performance issue after joining 【发布时间】:2017-03-22 10:12:43 【问题描述】:我正在阅读一个镶木地板文件
val df1 = sqlContext.read.parquet("")
df1.count = 5000000 records
val df2 = df1.select("id","aid", "DId" , "dd" , "mm", "yy","TO")
.distinct()
.groupBy("id","aid", "DId" , "dd" , "mm","yy","TO")
.count()
.filter($"count" > 1)
.select("id","aid", "DId" , "dd" , "mm", "yy")
val df3 = df2.join(df1,
df2("cid") <=> df1("cid") &&
df2("aid") <=> df1("aid") &&
df2("did") <=> df1("did") &&
df2("dd") <=> df1("dd") &&
df2("mm") <=> df1("mm") &&
df2("yy") <=> df1("yy"), "inner")
.distinct()
当我执行 df3.count ... 它花费了将近 897517 毫秒,这太高了,它会影响整体作业执行,并且有时作业会中止如果我增加超时选项,它是执行但延迟太高。我需要建议来改进这一点
【问题讨论】:
关于df2
的定义代码,我假设 df2 将始终为空。您执行 distinct() ,每个“键”将返回一行。然后 filter($"count" > 1) 将消除每一行。那是你想做的吗?
我的意图类似于sql查询SELECT cid,aid, DId, dd, mm, yy FROM (SELECT DISTINCT cid,aid, DId, dd, mm, yy, TO FROM nsc WHERE cid = 1) t GROUP BY cid,aid, DId, dd, mm, yy HAVING COUNT(*) > 1;
我将其转换为dataframe
distinct 是一个非常昂贵的操作,如果可能的话尽量避免它。如果您不需要确切的数字,请尝试将 .distinct().count() 替换为 .approx_count_distinct(...)。如果您确实需要确切的数字 - 使用 .countDistinct(...) - 我相信它会更快。请注意,在这两种情况下,您都需要导入函数。_ before
【参考方案1】:
根据您的评论:
我的意图类似于 sql Query SELECT cid,aid, DId, dd, mm, yy FROM (SELECT DISTINCT cid,aid, DId, dd, mm, yy, TO FROM nsc WHERE cid = 1) t GROUP BY cid ,aid, DId, dd, mm, yy HAVING COUNT(*) > 1;我将其转换为数据框
我认为正确的代码应该是:
df
.select("id", "aid", "DId", "dd", "mm", "yy", "TO")
.distinct()
.filter(col("cid") <=> 1)
.groupBy("id", "aid", "DId", "dd", "mm", "yy")
.count()
.filter(col("count") > 1)
.select("id", "aid", "DId", "dd", "mm", "yy")
因此无需进行最终连接。它应该更快。
【讨论】:
我想知道我在问题中发布了相同的查询,它将如何消除第二次查询的需要 我不明白你的评论。你成功了吗?以上是关于加入后引发明显的性能问题的主要内容,如果未能解决你的问题,请参考以下文章