生成 Spark 模式代码/持久化和重用模式

Posted

技术标签:

【中文标题】生成 Spark 模式代码/持久化和重用模式【英文标题】:Generate Spark schema code/persist and reuse schema 【发布时间】:2020-11-11 23:17:30 【问题描述】:

我正在从 Parquet 数据源实现一些 Spark 结构化流转换。为了将数据读入流式 DataFrame,必须指定模式(不能自动推断)。架构非常复杂,手动编写架构代码将是一项非常复杂的任务。

你能建议一个绕行吗?目前我正在预先创建一个批处理 DataFrame(使用相同的数据源),Spark 推断架构,然后我将架构保存到 Scala 对象并将其用作结构化流读取器的输入。

我不认为这是一个可靠或性能良好的解决方案。请建议如何自动生成架构代码或以某种方式将架构保存在文件中并重用它。

【问题讨论】:

【参考方案1】:

来自文档:

默认情况下,来自基于文件的源的结构化流式传输需要您 指定模式,而不是依赖 Spark 来推断它 自动地。此限制可确保一致的架构 用于流式查询,即使在失败的情况下。对于临时 用例,您可以通过设置重新启用模式推断 spark.sql.streaming.schemaInferencetrue

您还可以打开一个 shell,读取启用了自动模式推断的 parquet 文件之一,然后 save the schema to JSON for later reuse。您只需执行一次此操作,因此它可能比您现在使用的听起来类似的解决方法更快/更有效。

【讨论】:

嘿,我认为将模式保存到 JSON 并重用它是一个好主意,但是您链接的示例对我不起作用。尝试使用架构时出现以下错误:使用替代方法重载方法值架构:(schemaString: String)org.apache.spark.sql.streaming.DataStreamReader (schema: org.apache.spark.sql. types.StructType)org.apache.spark.sql.streaming.DataStreamReader 不能应用于 (Option[org.apache.spark.sql.types.StructType]) 如果只使用 SparkSession.read.parquet() 并使用它的模式打开一个 spark-shell 并仅将一个 parquet 文件加载为普通的、旧的、非流式 DataFrame 怎么样?

以上是关于生成 Spark 模式代码/持久化和重用模式的主要内容,如果未能解决你的问题,请参考以下文章

Spark-数据持久化操作

不同元素的火花流和批处理模式之间的代码重用

PersistenceException:没有为名为 default 的持久性单元的模式生成找到持久性提供程序

RDD的缓存,依赖,spark提交任务流程

在关系数据库中持久化实体顺序的最佳模式是啥?

RDD缓存