Spark SQL - 将 csv 读入 Dataset[T],其中 T 是 Option[BigDecimal] 字段的案例类

Posted

技术标签:

【中文标题】Spark SQL - 将 csv 读入 Dataset[T],其中 T 是 Option[BigDecimal] 字段的案例类【英文标题】:Spark SQL - Read csv into Dataset[T] where T is a case class of Option[BigDecimal] field 【发布时间】:2018-03-26 12:10:09 【问题描述】:

我之前已将 Dataset[T] 写入 csv 文件。

在这种情况下,T 是一个包含字段 x 的案例类:Option[BigDecimal]

当我尝试将文件加载回 Dataset[T] 时,我看到以下错误:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Cannot up cast `x` from double to decimal(38,18) as it may truncate.

我猜原因是推断的架构包含一个双精度列而不是 BigDecimal 列。有没有办法解决这个问题?我希望避免基于列名进行强制转换,因为读取的代码是通用函数的一部分。我的阅读代码如下:

   val a = spark
    .read
    .format("com.databricks.spark.csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load(file)
    .as[T]

我的案例类反映了从 JDBC 读取的表,其中 Option[T] 用于表示可空字段。 Option[BigDecimal] 用于从 JDBC 接收 Decimal 字段。

当我在本地机器上读/写时,我已经添加了一些代码来读取/写入 csv 文件,以便我可以轻松地检查内容。

所以我的下一个尝试是这样的:

   var df = spark
    .read
    .format("com.databricks.spark.csv")
    .option("header", "true")
    .schema(implicitly[Encoder[T]].schema)
    .load(file)

  val schema = df.schema

  import org.apache.spark.sql.functions._
  import org.apache.spark.sql.types._

  schema.foreach field =>
    field.dataType match 
      case t: DoubleType =>
        df = df.withColumn(field.name, 
          col(field.name).cast(DecimalType(38,18)))
      case _ => // do nothing
    
  

  df.as[T]

不幸的是,我的案例类现在包含所有 Nones 而不是预期的值。如果我只是将 csv 作为具有推断类型的 DF 加载,则所有列值都已正确填充。

看起来我实际上有两个问题。

    从 Double 转换 -> BigDecimal。 可空字段未包装在选项中。

我们将不胜感激地收到任何帮助/建议。如果从 csv 文件轻松写入/读取 Options/BigDecimals 存在问题,我很乐意调整我的方法。

【问题讨论】:

嗨@Terry 两个问题?您是否有任何特定原因不想转换为 BigDecimal?你能发布一些示例数据吗? 我已经更新了我的原始帖子以提供更多信息。 你好@Terry 你也能发布那个案例类吗? 案例类看起来像 MyCaseClass(a: String, b Option[BigDecimal]) 其中 a id 不可为空,b 可空等。没有涉及时髦的类型,标准产品编码器似乎从 JDBC 读取数据集的工作 所以如果你将类更改为 MyCaseClass(a: String, b Option[Double]) 是否有效? 【参考方案1】:

首先我会用 dfB.na.fill(0.0) 填充空值,然后我会尝试下一个解决方案:

case class MyCaseClass(id: String, cost: Option[BigDecimal])
var dfB = spark.createDataset(Seq(
  ("a", Option(12.45)),
  ("b", Option(null.asInstanceOf[Double])),
  ("c", Option(123.33)),
  ("d", Option(1.3444))
)).toDF("id", "cost")

dfB
  .na.fill(0.0)
  .withColumn("cost", col("cost").cast(DecimalType(38,18)))
  .as[MyCaseClass]
  .show()

首先将列成本显式转换为 DecimalType(38,18),然后检索数据集 [MyCaseClass]。我认为这里的问题是 spark 无法在不明确指定比例精度的情况下将 double 转换为 BigDecimal,因此您需要首先将其转换为特定的十进制类型,然后将其用作 BigDecimal。

更新: 我稍微修改了前面的代码,以便也可以处理 Option[BigDecimal] 类型的成员

祝你好运

【讨论】:

以上是关于Spark SQL - 将 csv 读入 Dataset[T],其中 T 是 Option[BigDecimal] 字段的案例类的主要内容,如果未能解决你的问题,请参考以下文章

如何将数据从 Spark SQL 导出到 CSV

如何在不使用databricks csv api的情况下直接将CSV文件读入spark DataFrame?

无法将 unicode .csv 读入 R

将多个 CSV 文件读入单独的数据帧

Python 将 csv 文件列读入列表,忽略标题

如何将压缩(gz)CSV文件读入dask Dataframe?