如何使用 scala spark 从没有标题且有超过 150 列的 csv 创建数据集

Posted

技术标签:

【中文标题】如何使用 scala spark 从没有标题且有超过 150 列的 csv 创建数据集【英文标题】:How to create a Dataset from a csv which doesn't have a header and has more than 150 columns using scala spark 【发布时间】:2020-01-22 12:16:07 【问题描述】:

我有一个 csv,我需要将其作为数据集读取。 csv 有 140 列,并且没有标题。 我用StructType(Seq(StructFiled(...), Seq(StructFiled(...), ...)) 创建了一个模式,要读取的代码如下:-

object dataParser 
def getData(inputPath: String, delimeter: String)(implicit spark: SparkSession): Dataset[MyCaseClass] = 
  val parsedData: Dataset[MyCaseClass] = spark.read
                                         .option("header", "false")
                                         .option("delimeter", "delimeter")
                                         .option("inferSchema", "true")
                                         .schema(mySchema)
                                         .load(inputPath)
                                         .as[MyCaseClass]
  parsedData


而我创建的案例类是这样的:-

case class MycaseClass(
    mycaseClass1: MyCaseClass1,
    mycaseClass2: MyCaseClass2,
    mycaseClass3: MyCaseClass3,
    mycaseClass4: MyCaseClass4,
    mycaseClass5: MyCaseClass5,
    mycaseClass6: MyCaseClass6,
    mycaseClass7: MyCaseClass7,
)

MyCaseClass1(
 first 20 columns of csv: it's datatypes
)

MyCaseClass2(
 next 20 columns of csv: it's datatypes
)

等等。

但是当我尝试编译它时,它给了我如下错误:-

Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
[error]                                                 .as[myCaseClass]

我从我的 Scala 应用程序中将其称为:-

object MyTestApp
 def main(args: Array[String]): Unit =
  implicit val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
 run(args)


def run(args: Array[String])(implicit spark: SparkSession): Unit = 
val inputPath = args.get("inputData")
val delimeter = Constants.delimeter
 val myData = Dataparser.getData(inputPath, delimeter)


```
I'm not very sure about the approach also as I'm new to Dataset.
I saw multiple answers around this issue but they were mainly for very small no of columns which can be contained within the scope of a single case class and that too with header which makes this little simpler.
Any help would be really appreciated.

【问题讨论】:

你的案例类中有哪些数据类型? 因为它有 140 列,所以它具有所有数据类型,主要是 String、Integer 和 Date。 【参考方案1】:

感谢所有的观众。其实我发现了这个问题。在此处发布答案,以便遇到任何此类问题的其他人能够摆脱此问题。

我需要在这里导入 spark.implicits._

object dataParser 
def getData(inputPath: String, delimeter: String)(implicit spark: SparkSession): Dataset[MyCaseClass] = 
**import spark.implicits._**
  val parsedData: Dataset[MyCaseClass] = spark.read
                                         .option("header", "false")
                                         .option("delimeter", "delimeter")
                                         .option("inferSchema", "true")
                                         .schema(mySchema)
                                         .load(inputPath)
                                         .as[MyCaseClass]
  parsedData


【讨论】:

是的,那是在应用程序中,但我们甚至在解析器中也需要它。 嗯,看来这段代码还有更多问题。这里需要一点帮助。架构很重,大约有 150 列,但案例类的限制为 23。那么,我们如何在读取时将 as 部分提供给数据集? 你可以对,案例类不能处理这么多列:(但是我知道一个技巧。我会在一段时间后发布答案。

以上是关于如何使用 scala spark 从没有标题且有超过 150 列的 csv 创建数据集的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Scala 在 Spark 中进行滑动窗口排名?

如何使用 Scala 从 Spark 中的列表或数组创建行

如何在窗口 scala/spark 中使用 partitionBy 函数

Scala/Spark - 如何获取所有子数组的第一个元素

如何使用 spark/scala 将 json 字符串格式化为 MongoDB 文档样式?

如何使用 spark/scala 检查是不是存在大查询表