Spark SQL - 将 csv 读入 Dataset[T],其中 T 是 Option[BigDecimal] 字段的案例类
Posted
技术标签:
【中文标题】Spark SQL - 将 csv 读入 Dataset[T],其中 T 是 Option[BigDecimal] 字段的案例类【英文标题】:Spark SQL - Read csv into Dataset[T] where T is a case class of Option[BigDecimal] field 【发布时间】:2018-03-26 12:10:09 【问题描述】:我之前已将 Dataset[T] 写入 csv 文件。
在这种情况下,T 是一个包含字段 x 的案例类:Option[BigDecimal]
当我尝试将文件加载回 Dataset[T] 时,我看到以下错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Cannot up cast `x` from double to decimal(38,18) as it may truncate.
我猜原因是推断的架构包含一个双精度列而不是 BigDecimal 列。有没有办法解决这个问题?我希望避免基于列名进行强制转换,因为读取的代码是通用函数的一部分。我的阅读代码如下:
val a = spark
.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.load(file)
.as[T]
我的案例类反映了从 JDBC 读取的表,其中 Option[T]
用于表示可空字段。 Option[BigDecimal]
用于从 JDBC 接收 Decimal 字段。
当我在本地机器上读/写时,我已经添加了一些代码来读取/写入 csv 文件,以便我可以轻松地检查内容。
所以我的下一个尝试是这样的:
var df = spark
.read
.format("com.databricks.spark.csv")
.option("header", "true")
.schema(implicitly[Encoder[T]].schema)
.load(file)
val schema = df.schema
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
schema.foreach field =>
field.dataType match
case t: DoubleType =>
df = df.withColumn(field.name,
col(field.name).cast(DecimalType(38,18)))
case _ => // do nothing
df.as[T]
不幸的是,我的案例类现在包含所有 None
s 而不是预期的值。如果我只是将 csv 作为具有推断类型的 DF 加载,则所有列值都已正确填充。
看起来我实际上有两个问题。
-
从 Double 转换 -> BigDecimal。
可空字段未包装在选项中。
我们将不胜感激地收到任何帮助/建议。如果从 csv 文件轻松写入/读取 Options/BigDecimals 存在问题,我很乐意调整我的方法。
【问题讨论】:
嗨@Terry 两个问题?您是否有任何特定原因不想转换为 BigDecimal?你能发布一些示例数据吗? 我已经更新了我的原始帖子以提供更多信息。 你好@Terry 你也能发布那个案例类吗? 案例类看起来像 MyCaseClass(a: String, b Option[BigDecimal]) 其中 a id 不可为空,b 可空等。没有涉及时髦的类型,标准产品编码器似乎从 JDBC 读取数据集的工作 所以如果你将类更改为 MyCaseClass(a: String, b Option[Double]) 是否有效? 【参考方案1】:首先我会用 dfB.na.fill(0.0) 填充空值,然后我会尝试下一个解决方案:
case class MyCaseClass(id: String, cost: Option[BigDecimal])
var dfB = spark.createDataset(Seq(
("a", Option(12.45)),
("b", Option(null.asInstanceOf[Double])),
("c", Option(123.33)),
("d", Option(1.3444))
)).toDF("id", "cost")
dfB
.na.fill(0.0)
.withColumn("cost", col("cost").cast(DecimalType(38,18)))
.as[MyCaseClass]
.show()
首先将列成本显式转换为 DecimalType(38,18),然后检索数据集 [MyCaseClass]。我认为这里的问题是 spark 无法在不明确指定比例精度的情况下将 double 转换为 BigDecimal,因此您需要首先将其转换为特定的十进制类型,然后将其用作 BigDecimal。
更新: 我稍微修改了前面的代码,以便也可以处理 Option[BigDecimal] 类型的成员
祝你好运
【讨论】:
以上是关于Spark SQL - 将 csv 读入 Dataset[T],其中 T 是 Option[BigDecimal] 字段的案例类的主要内容,如果未能解决你的问题,请参考以下文章