Spark 如何处理涉及 JDBC 数据源的故障场景?

Posted

技术标签:

【中文标题】Spark 如何处理涉及 JDBC 数据源的故障场景?【英文标题】:How does Spark handle failure scenarios involving JDBC data source? 【发布时间】:2019-01-09 21:46:40 【问题描述】:

我正在编写一个与 Spark 的 JDBC 数据源实现有相似之处的数据源,我想问一下 Spark 如何处理某些故障场景。据我了解,如果执行器在运行任务时死亡,Spark 将恢复执行器并尝试重新运行该任务。然而,这在数据完整性和 Spark 的 JDBC 数据源 API(例如df.write.format("jdbc").option(...).save())的上下文中如何发挥作用?

在JdbcUtils.scala的savePartition函数中,我们看到Spark调用了用户提供的数据库url/凭据生成的Java连接对象的提交和回滚函数(见下文)。但是,如果执行器在 commit() 完成后或调用 rollback() 之前立即死亡,Spark 是否会尝试重新运行任务并再次写入相同的数据分区,实质上是在数据库中创建重复的已提交行?如果 executor 在调用 commit() 或 rollback() 的过程中死亡会发生什么?

try 
    ...
    if (supportsTransactions) 
        conn.commit()
    
    committed = true
    Iterator.empty
 catch 
    case e: SQLException =>
        ...
        throw e
 finally 
    if (!committed) 
        // The stage must fail.  We got here through an exception path, so
        // let the exception through unless rollback() or close() want to
        // tell the user about another problem.
        if (supportsTransactions) 
          conn.rollback()
        
        conn.close()
     else 
        ...
    

【问题讨论】:

【参考方案1】:

但是,如果执行器在 commit() 完成后或调用 rollback() 之前立即死亡,Spark 是否会尝试重新运行任务并再次写入相同的数据分区,从而在数据库中创建重复的已提交行?

既然 Spark SQL(它是一种基于 RDD API 的高级 API)对 JDBC 或任何其他协议的所有特性知之甚少,您会期待什么?更不用说底层执行运行时,即 Spark Core。

当您编写像 df.write.format(“jdbc”).option(...).save() 这样的结构化查询时,Spark SQL 会使用类似程序集的低级 RDD API 将其转换为分布式计算。由于它试图包含尽可能多的“协议”(包括 JDBC),Spark SQL 的 DataSource API 将大部分错误处理留给了数据源本身。

调度任务(不知道甚至不关心任务做什么)的 Spark 核心只是监视执行,如果任务失败,它将尝试再次执行它(直到默认尝试 3 次失败)。

因此,当您编写自定义数据源时,您会知道如何进行练习,并且必须在代码中处理此类重试。

处理错误的一种方法是使用TaskContext(例如addTaskCompletionListeneraddTaskFailureListener)注册任务侦听器。

【讨论】:

【参考方案2】:

出于所描述的原因,我不得不引入一些重复数据删除逻辑。实际上,您最终可能会两次(或更多)提交相同的内容。

【讨论】:

您能否描述一下您是如何实现这种重复数据删除逻辑的?我很好奇您添加它的位置以及您使用的数据库类型等细节。 这是一个 Amazon RedShift,我必须添加额外的时间戳列,以便稍后在 ETL 过程中进行重复数据删除。一般来说,您只需要一些唯一的键就可以判断它是新记录还是重复记录,显然 RedShift 对键不是很好,因此需要时间戳。您的数据目标是什么? 谢谢!我正在尝试使用 SQL Server。我对 Redshift 了解不多,但我会调查一下是否可以实施类似的策略。 使用 SQL Server 实际上更简单 - 看看这里***.com/a/21220868/5478299。

以上是关于Spark 如何处理涉及 JDBC 数据源的故障场景?的主要内容,如果未能解决你的问题,请参考以下文章

Apache Spark 如何处理不适合内存的数据?

电脑显示这是啥意思,如何处理

spark如何处理非数值的聚合最大值? [复制]

spark结构化流作业如何处理流-静态DataFrame连接?

如何处理 Spark 中的多个 csv.gz 文件?

通过JDBC处理数据库时如何处理夏令时?