过滤时Spark sql“期货在300秒后超时”

Posted

技术标签:

【中文标题】过滤时Spark sql“期货在300秒后超时”【英文标题】:Spark sql "Futures timed out after 300 seconds" when filtering 【发布时间】:2017-04-27 19:02:17 【问题描述】:

在执行看似简单的 spark sql 过滤工作时出现异常:

    someOtherDF
      .filter(/*somecondition*/)
      .select($"eventId")
      .createOrReplaceTempView("myTempTable")

    records
      .filter(s"eventId NOT IN (SELECT eventId FROM myTempTable)")

知道如何解决这个问题吗?

注意:

someOtherDF 在过滤后包含约 1M 到 5M 行,并且 eventId 是 guid。 记录包含 40M 到 50M 行。

错误:

Stacktrace:

org.apache.spark.SparkException: Exception thrown in awaitResult:
        at org.apache.spark.util.ThreadUtils$.awaitResultInForkJoinSafely(ThreadUtils.scala:215)
        at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:124)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:124)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
        at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:123)
        at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doExecute(BroadcastNestedLoopJoinExec.scala:343)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
        at ...
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at org.apache.spark.util.ThreadUtils$.awaitResultInForkJoinSafely(ThreadUtils.scala:212)
    ... 84 more

【问题讨论】:

我们遇到了类似的“期货超时”问题 (SPARK-20784),但查询模式却截然不同。您使用的是什么类型的部署?纱线客户端?其他 ?涉及的数据集是否已缓存? 在这个特定场景中,数据集没有被缓存,因为它是单通道转换。这些测试是在纱线上运行的;如果我没记错的话,客户端或集群模式并不重要。 我终于找到了我的原因,这是由于驱动程序上的OOM(大部分时间是无声的,只是在日志流中偶然看到了异常) 【参考方案1】:

使用以下作品: 1)How to exclude rows that don't join with another table? 2)Spark Duplicate columns in dataframe after join

我可以使用这样的左外连接来解决我的问题:

    val leftColKey = records("eventId")
    val rightColKey = someOtherDF("eventId")
    val toAppend: DataFrame = records
      .join(someOtherDF, leftColKey === rightColKey, "left_outer")
      .filter(rightColKey.isNull) // Keep rows without a match in 'someOtherDF'. See (1)
      .drop(rightColKey) // Needed to discard duplicate column. See (2)

性能非常好,不会出现“未来超时”问题。

编辑

正如一位同事向我指出的,“leftanti”连接类型更有效。

【讨论】:

以上是关于过滤时Spark sql“期货在300秒后超时”的主要内容,如果未能解决你的问题,请参考以下文章

在 Spark sql 中按二进制类型过滤

4.2 Spark SQL DataFrame 编程操作大全 (超详细)

Spark.sql 按 MAX 过滤行

哪个更快? Spark SQL with Where 子句或在 Spark SQL 之后在 Dataframe 中使用过滤器

Spark SQL Dataframe API - 动态构建过滤条件

在 SQL/Spark 中使用窗口函数执行特定过滤器