如何正确处理 spark.sql.AnalysisException

Posted

技术标签:

【中文标题】如何正确处理 spark.sql.AnalysisException【英文标题】:How to correctly handle a spark.sql.AnalysisException 【发布时间】:2018-07-01 15:43:22 【问题描述】:

我一直在使用 Spark Dataset API 对 JSON 执行操作,以根据需要提取某些字段。但是,当我提供的让 spark 知道要提取哪个字段的规范出错时,spark 会吐出一个

org.apache.spark.sql.AnalysisException

如何在这样的分布式处理场景中处理未经检查的运行时异常?我知道抛出一个 try-catch 会让事情得到排序,但是处理这种情况的推荐方法是什么

dataset = dataset.withColumn(current, functions.explode(dataset.col(parent + Constants.PUNCTUATION_PERIOD + child.substring(0, child.length() - 2))));

【问题讨论】:

如果被抓住了,你会怎么做? 将异常包装在我的自定义异常之一中并从那里抛出,以供调用对象用来处理异常 但实际上并不意味着停止、修复和重新运行。这更符合我的观点。 嗯,是的,这是一种方法,但是用我自己的异常包装诸如此类的运行时异常是个好主意?? 不,没有争议——这很好,但结果是我们可能仍然需要在修复后重新运行。至少在 BI 和 DWH 中一直如此。 【参考方案1】:

在 scala 中,您应该简单地将调用包装在 Try 中并管理失败。比如:

val result = Try(executeSparkCode()) match 
    case s: Success(_) => s;
    case Failure(error: AnalysisException) => Failure(new MyException(error));

注意 1:如果您的问题暗示如何在 scala 中管理异常,则有很多关于此主题的文档和帖子(即不要抛出)。比如可以查看that answer (of mine)

注意 2:我这里没有 scala 开发环境,所以我没有测试这段代码)


然而,在 java 中存在一个棘手的情况:编译器不希望出现未经检查的 AnalysisException,因此您无法专门捕获此异常。可能是一些 scala/java 误解,因为 scala 不跟踪检查的异常。我所做的是:

try
    return executeSparkCode();
 catch (Exception ex) 
    if(ex instanceOf AnalysisException)
        throw new MyException(ex);
     else 
        throw ex; // unmanaged exceptions
    

注意:在我的例子中,我还测试了我必须管理的特定异常的错误消息内容(即“路径不存在”),在这种情况下,我返回一个空数据集而不是引发另一个异常。我一直在寻找更好的解决方案,碰巧到了这里......

【讨论】:

嗨。这是我第一份工作刚开始读本科的时候。我使用了一个多捕获,最后一个捕获是异常类。它已经在 prod 上运行了很长一段时间,当出现问题时会提供相当不错的信息/堆栈跟踪。 我正在尝试/捕捉try /*spark dataframe filtering query*/ catch case e: Throwable => println(e.getMessage) 并看到它仍然抛出AnalysisException 并崩溃:( 这不是Throwable 从你所说的来看,它应该可以工作。也许问题出在其他地方,但我需要查看更多代码才能知道。您是否检查了堆栈跟踪以确保在 try/catch 中引发了异常?例如:火花是懒惰的,错误发生在调用操作方法(写入、收集)上,而不是在过滤调用中,即使问题在过滤器调用中

以上是关于如何正确处理 spark.sql.AnalysisException的主要内容,如果未能解决你的问题,请参考以下文章

如何以正确的方式处理结果集?

如何正确处理 JWT 刷新?

如何正确处理 typescript 中的 promisifyAll?

如何正确处理服务器端错误?

如何正确处理位置服务切换

如何正确处理 UnsafeMutablePointer