Spark sql 查询导致分区计数膨胀
Posted
技术标签:
【中文标题】Spark sql 查询导致分区计数膨胀【英文标题】:Spark sql query causes partition count inflation 【发布时间】:2016-03-15 15:50:22 【问题描述】:Spark (1.5.2) 中的分区计数在某些 sql 查询中会爆炸。
可以避免吗?
就我而言,我有三个表(文本、所有者、人员),我在这些表上执行以下查询:
sqlContext.sql(
"SELECT t.* FROM texts t JOIN ("+
"SELECT o.TextId FROM "+
"owners o JOIN ("+
"SELECT UserId FROM person WHERE LOWER(name) "+
"RLIKE '.*"+escapeRegex(filter.name)+"\\s*$'"+
") p ON (o.UserId=p.UserId) GROUP BY o.TextId"+
") o "+
"ON (t.TextId = o.TextId)")
查询前的分区数为2,之后使用textsDF.javaRDD().partitions().size()
得到200
【问题讨论】:
嗨乔纳森,你找到你要找的东西了吗? Jonathan,默认为 200。查看此链接 spark.apache.org/docs/latest/sql-programming-guide.html 并搜索该属性。 为了避免使用默认值,您应该按照我的建议在代码中设置您自己的属性以降低值(到 2 或 4)。然后你会得到更少的分区。 【参考方案1】:Join/Group by 或任何具有 shuffle 的操作的分区数取决于属性“spark.sql.shuffle.partitions”。这必须在您的集群配置中设置为 200。
这个属性的重要性:这决定了对数据的reducer(种类,要理解)操作的数量。通过将此属性设置得更高,您可以确保存在大量的并行度。
无论如何,您都可以根据需要更改该属性。您可以将 SparkConf 设置如下,任意编号。
conf.set("spark.sql.shuffle.partitions","2");
注意:将其设置为较低会降低性能,从而增加网络使用量并降低并行度。
另一方面,文件读取的并行度取决于默认的并行度属性,该属性告诉您每个核心的任务数/ hdfs 数据中的块数。但是对于任何有 shuffle 的操作,它都取决于我提到的属性。
【讨论】:
spark.sql.shuffle.partitions
不是由我的代码设置的,也不是在环境中设置的。我使用.setMaster("local[4]")
来初始化我的配置。尽管如此,sqlContext.read().json(...)
只创建了 2 个分区。这就是我在DataFrame
上使用.repartition(4)
的原因。第一个查询保持分区计数不变。随后的第二个查询将计数更改为 200。在当前项目中,使用多个分区会导致性能很差(即使我在非本地运行),因为每个分区都有很大的初始化开销(.mapPartitions()
)。
Jonathan,默认为 200。查看此链接 spark.apache.org/docs/latest/sql-programming-guide.html 并搜索该属性。
是的,这行得通。谢谢!我仍然对这种行为感到非常惊讶。我本来希望它使用已经存在的线程/分区进行洗牌,或者使用合理的系统相关常数,如核心数或'setMaster("local[.]")`中所述的核心数。 200 似乎是一个非常随意的选择。
实际使用的并行度取决于您可用的架构。我可以猜出这个数字的动机:洗牌会导致数据的完全重组,所以人们不妨使用它来安排数据以实现良好的并行性。但是常数只是一个糟糕的选择。如果您的内核和初始化开销很少(例如我的情况),您的性能就会很差。如果您有很多内核 (>>200),您实际上会降低并行度。
是的。这样就对了。不仅如此,如果有人没有使用或玩过该属性,则很难识别..以上是关于Spark sql 查询导致分区计数膨胀的主要内容,如果未能解决你的问题,请参考以下文章