Spark SQL DataFrame - 异常处理

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark SQL DataFrame - 异常处理相关的知识,希望对你有一定的参考价值。

在我们的应用程序中,我们的大多数代码只是在filter上应用group byaggregateDataFrame操作,并将DF保存到Cassandra数据库。

与下面的代码一样,我们有几种方法可以在不同数量的字段上执行相同类型的操作[filter, group by, join, agg]并返回DF并将其保存到Cassandra表中。

示例代码是:

 val filteredDF = df.filter(col("hour") <= LocalDataTime.now().getHour())
.groupBy("country")
.agg(sum(col("volume")) as "pmtVolume")

saveToCassandra(df)

def saveToCassandra(df: DataFrame) {
    try {
        df.write.format("org.apache.spark.sql.cassandra")
        .options(Map("Table" -> "tableName", "keyspace" -> keyspace)
        .mode("append").save()
    }
    catch {
        case e: Throwable => log.error(e)
    }
}

由于我通过将DF保存到Cassandra来调用该操作,我希望我只需要根据this线程处理该行的异常。

如果我得到任何异常,我可以默认在Spark详细日志中看到异常。

我是否必须真正围绕过滤器,使用Trytry , catch?进行分组

我没有看到有关异常处理的Spark SQL DataFrame API示例的任何示例。

我如何在Try方法上使用saveToCassandra?它返回Unit

答案

你真的不需要用filtergroup byTry包围trycatch代码。由于所有这些操作都是转换,因此在对它们执行操作之前它们不会执行,例如saveToCassandra

但是,如果在过滤,分组或聚合数据帧时发生错误,saveToCassandra函数中的catch子句将记录它,因为正在执行操作。

另一答案

在try catch中包含懒惰的DAG是没有意义的。 您需要在Try()中包装lambda函数。 不幸的是,AFAIK无法在DataFrames中进行行级异常处理。

您可以使用RDD或DataSet,如spache spark exception handling下面这篇文章的回答中所述

以上是关于Spark SQL DataFrame - 异常处理的主要内容,如果未能解决你的问题,请参考以下文章

Spark:创建 DataFrame 会出现异常

使用Scala在Spark中创建DataFrame时出错

[Spark][Python][DataFrame][SQL]Spark对DataFrame直接执行SQL处理的例子

spark踩坑——dataframe写入hbase连接异常

Spark-Scala:使用异常处理将固定宽度线解析为 Dataframe Api

Spark编程--Spark SQL DataFrame