动态和可配置地更改几种 Spark DataFrame 列类型

Posted

技术标签:

【中文标题】动态和可配置地更改几种 Spark DataFrame 列类型【英文标题】:Changing several Spark DataFrame column types, dynamically and configurable 【发布时间】:2017-11-27 22:07:04 【问题描述】:

我是 Spark 和 Scala 的新手。

我们有一个外部数据源为我们提供 JSON。此 JSON 包含所有值的引号,包括数字和布尔字段。所以当我把它放到我的 DataFrame 中时,所有的列都是字符串。最终目标是将这些 JSON 记录转换为正确类型的 Parquet 文件。

大约有 100 个字段,我需要将几个类型从字符串更改为 int、boolean 或 bigint(long)。此外,我们处理的每个 DataFrame 将只有这些字段的一个子集,而不是全部。因此,我需要能够处理给定 DataFrame 的列子集,将每一列与已知的列类型列表进行比较,并将某些列从字符串转换为 int、bigint 和 boolean,具体取决于 DataFrame 中出现的列。

最后,我需要可配置的列类型列表,因为我们将来会有新列,并且可能想要摆脱或更改旧列。

所以,这是我目前所拥有的:

// first I convert to all lower case for column names
val df = dfIn.toDF(dfIn.columns map(_.toLowerCase): _*)

// Big mapping to change types
// TODO how would I make this configurable?
// I'd like to drive this list from an external config file.
val dfOut = df.select(
   df.columns.map 

     ///// Boolean
     case a @ "a" => df(a).cast(BooleanType).as(a)
     case b @ "b" => df(b).cast(BooleanType).as(b)

     ///// Integer
     case i @ "i" => df(i).cast(IntegerType).as(i)
     case j @ "j" => df(j).cast(IntegerType).as(j)


     // Bigint to Double
     case x @ "x" => df(x).cast(DoubleType).as(x)
     case y @ "y" => df(y).cast(DoubleType).as(y)

     case other         => df(other)
   : _*
)

这是一种将这些数据转换为我想要的 Scala 类型的有效方法吗?

我可以使用一些建议来了解如何将其从外部“配置”文件中移除,我可以在其中定义列类型。

【问题讨论】:

【参考方案1】:

我的问题演变成这个问题。那里给出了很好的答案:

Spark 2.2 Scala DataFrame select from string array, catching errors

【讨论】:

以上是关于动态和可配置地更改几种 Spark DataFrame 列类型的主要内容,如果未能解决你的问题,请参考以下文章

Spark 参数配置的几种方法

Spark 参数配置的几种方法

Spark Project中动态更改表中要查询的列

在 Spark 中使用相应的列名(有条件地)更改数据框

Flink的local模式部署安装

寒假十四