Spark sql 查询导致分区计数膨胀

Posted

技术标签:

【中文标题】Spark sql 查询导致分区计数膨胀【英文标题】:Spark sql query causes partition count inflation 【发布时间】:2016-03-15 15:50:22 【问题描述】:

Spark (1.5.2) 中的分区计数在某些 sql 查询中会爆炸。

可以避免吗?

就我而言,我有三个表(文本、所有者、人员),我在这些表上执行以下查询:

sqlContext.sql(
                "SELECT t.* FROM texts t JOIN ("+
                        "SELECT o.TextId FROM "+
                        "owners o JOIN ("+
                        "SELECT UserId FROM person WHERE LOWER(name) "+
                        "RLIKE '.*"+escapeRegex(filter.name)+"\\s*$'"+
                        ") p ON (o.UserId=p.UserId) GROUP BY o.TextId"+
                        ") o "+
                        "ON (t.TextId = o.TextId)")

查询前的分区数为2,之后使用textsDF.javaRDD().partitions().size()得到200

【问题讨论】:

嗨乔纳森,你找到你要找的东西了吗? Jonathan,默认为 200。查看此链接 spark.apache.org/docs/latest/sql-programming-guide.html 并搜索该属性。 为了避免使用默认值,您应该按照我的建议在代码中设置您自己的属性以降低值(到 2 或 4)。然后你会得到更少的分区。 【参考方案1】:

Join/Group by 或任何具有 shuffle 的操作的分区数取决于属性“spark.sql.shuffle.partitions”。这必须在您的集群配置中设置为 200。

这个属性的重要性:这决定了对数据的reducer(种类,要理解)操作的数量。通过将此属性设置得更高,您可以确保存在大量的并行度。

无论如何,您都可以根据需要更改该属性。您可以将 SparkConf 设置如下,任意编号。

conf.set("spark.sql.shuffle.partitions","2");

注意:将其设置为较低会降低性能,从而增加网络使用量并降低并行度。

另一方面,文件读取的并行度取决于默认的并行度属性,该属性告诉您每个核心的任务数/ hdfs 数据中的块数。但是对于任何有 shuffle 的操作,它都取决于我提到的属性。

【讨论】:

spark.sql.shuffle.partitions 不是由我的代码设置的,也不是在环境中设置的。我使用.setMaster("local[4]") 来初始化我的配置。尽管如此,sqlContext.read().json(...) 只创建了 2 个分区。这就是我在DataFrame 上使用.repartition(4) 的原因。第一个查询保持分区计数不变。随后的第二个查询将计数更改为 200。在当前项目中,使用多个分区会导致性能很差(即使我在非本地运行),因为每个分区都有很大的初始化开销(.mapPartitions())。 Jonathan,默认为 200。查看此链接 spark.apache.org/docs/latest/sql-programming-guide.html 并搜索该属性。 是的,这行得通。谢谢!我仍然对这种行为感到非常惊讶。我本来希望它使用已经存在的线程/分区进行洗牌,或者使用合理的系统相关常数,如核心数或'setMaster("local[.]")`中所述的核心数。 200 似乎是一个非常随意的选择。 实际使用的并行度取决于您可用的架构。我可以猜出这个数字的动机:洗牌会导致数据的完全重组,所以人们不妨使用它来安排数据以实现良好的并行性。但是常数只是一个糟糕的选择。如果您的内核和初始化开销很少(例如我的情况),您的性能就会很差。如果您有很多内核 (>>200),您实际上会降低并行度。 是的。这样就对了。不仅如此,如果有人没有使用或玩过该属性,则很难识别..

以上是关于Spark sql 查询导致分区计数膨胀的主要内容,如果未能解决你的问题,请参考以下文章

Spark SQL分区感知查询hive表

sql 在 spark sql 数据帧查询中使用分区

SQL 查询:分区计数因变量或硬编码参数而异

spark sql 无法在 S3 中查询镶木地板分区

Spark SQL(通过 HiveContext 进行 Hive 查询)总是创建 31 个分区

Spark SQL 用于从两个不同的查询中划分计数并将输出存储为 Double