Spark SQL DataFrame - 异常处理
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark SQL DataFrame - 异常处理相关的知识,希望对你有一定的参考价值。
在我们的应用程序中,我们的大多数代码只是在filter
上应用group by
,aggregate
和DataFrame
操作,并将DF保存到Cassandra数据库。
与下面的代码一样,我们有几种方法可以在不同数量的字段上执行相同类型的操作[filter, group by, join, agg
]并返回DF并将其保存到Cassandra表中。
示例代码是:
val filteredDF = df.filter(col("hour") <= LocalDataTime.now().getHour())
.groupBy("country")
.agg(sum(col("volume")) as "pmtVolume")
saveToCassandra(df)
def saveToCassandra(df: DataFrame) {
try {
df.write.format("org.apache.spark.sql.cassandra")
.options(Map("Table" -> "tableName", "keyspace" -> keyspace)
.mode("append").save()
}
catch {
case e: Throwable => log.error(e)
}
}
由于我通过将DF保存到Cassandra来调用该操作,我希望我只需要根据this线程处理该行的异常。
如果我得到任何异常,我可以默认在Spark详细日志中看到异常。
我是否必须真正围绕过滤器,使用Try
或try , catch?
进行分组
我没有看到有关异常处理的Spark SQL DataFrame API示例的任何示例。
我如何在Try
方法上使用saveToCassandra
?它返回Unit
你真的不需要用filter
或group by
,Try
包围try
,catch
代码。由于所有这些操作都是转换,因此在对它们执行操作之前它们不会执行,例如saveToCassandra
。
但是,如果在过滤,分组或聚合数据帧时发生错误,saveToCassandra
函数中的catch子句将记录它,因为正在执行操作。
在try catch中包含懒惰的DAG是没有意义的。 您需要在Try()中包装lambda函数。 不幸的是,AFAIK无法在DataFrames中进行行级异常处理。
您可以使用RDD或DataSet,如spache spark exception handling下面这篇文章的回答中所述
以上是关于Spark SQL DataFrame - 异常处理的主要内容,如果未能解决你的问题,请参考以下文章
[Spark][Python][DataFrame][SQL]Spark对DataFrame直接执行SQL处理的例子