Spark 将自定义模式应用于 DataFrame

Posted

技术标签:

【中文标题】Spark 将自定义模式应用于 DataFrame【英文标题】:Spark apply custom schema to a DataFrame 【发布时间】:2018-10-25 21:10:45 【问题描述】:

我在 Parquet 文件中有一个数据,并希望对其应用自定义架构。

我在 Parquet 中的初始数据如下,

root
 |-- CUST_ID: decimal(9,0) (nullable = true)
 |-- INACTV_DT: string (nullable = true)
 |-- UPDT_DT: string (nullable = true)
 |-- ACTV_DT: string (nullable = true)
 |-- PMT_AMT: decimal(9,4) (nullable = true)
 |-- CMT_ID: decimal(38,14) (nullable = true)

我的自定义架构如下,

root
 |-- CUST_ID: decimal(38,0) (nullable = false)
 |-- INACTV_DT: timestamp (nullable = false)
 |-- UPDT_DT: timestamp (nullable = false)
 |-- ACTV_DT: timestamp (nullable = true)
 |-- PMT_AMT: decimal(19,4) (nullable = true)
 |-- CMT_ID: decimal(38,14) (nullable = false)

下面是我将新数据框应用到它的代码

val customSchema = getOracleDBSchema(sparkSession, QUERY).schema
val DF_frmOldParkquet = sqlContext_par.read.parquet("src/main/resources/data_0_0_0.parquet")
val rows: RDD[Row] = DF_frmOldParkquet.rdd
val newDataFrame = sparkSession.sqlContext.createDataFrame(rows, tblSchema)
newDataFrame.printSchema()
newDataFrame.show()

执行此操作时出现以下错误。

java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of timestamp
staticinvoke(class org.apache.spark.sql.types.Decimal$, DecimalType(38,0), fromDecimal, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, CUST_ID), DecimalType(38,0)), true) AS CUST_ID#27

【问题讨论】:

你能找到解决这个问题的方法吗??如果您对此有任何解决方案,请发布。 【参考方案1】:

Spark SQL中有两种主要的schema应用

schema 参数传递给DataFrameReaderschema method,用于转换某些格式的数据(主要是纯文本文件)。在这种情况下,架构可用于自动转换输入记录。 schema 参数传递给SparkSessioncreateDataFrame(采用RDDListRows 的变体)。在这种情况下,架构必须符合数据,并且不用于强制转换。

以上均不适用于您的情况:

输入是强类型的,因此schema(如果存在)会被阅读器忽略。

Schema 与数据不匹配,因此不能用于createDataFrame

在这种情况下,您应该将每列cast 设置为所需的类型。假设类型是兼容的,这样的事情应该可以工作

val newDataFrame = df.schema.fields.foldLeft(df) 
  (df, s) => df.withColumn(s.name, df(s.name).cast(s.dataType))     

根据数据的格式,这可能足够或不够。例如,如果应转换为时间戳的字段不使用标准格式,则转换将不起作用,您必须使用 Spark 日期时间处理实用程序。

【讨论】:

以上是关于Spark 将自定义模式应用于 DataFrame的主要内容,如果未能解决你的问题,请参考以下文章

将自定义函数应用于 spark 数据框组

使用冰山表格式将自定义元数据添加到 DataFrame 模式

将模型分数应用于 Spark DataFrame - Python

将 UDF 应用于 Spark Dataframe 中的多个列

将模式应用于 Java 对象的 Spark 数据集

如何使用 toDF() 将自定义 Java 类对象的 RDD 转换为 DataFrame?