在 read.csv 期间使用限制的 Spark 推断架构

Posted

技术标签:

【中文标题】在 read.csv 期间使用限制的 Spark 推断架构【英文标题】:Spark infer schema with limit during a read.csv 【发布时间】:2019-04-29 14:43:20 【问题描述】:

我想使用一小部分行(例如limit(100))从 CSV 文件目录中推断出 Spark.DataFrame 架构。

但是,将inferSchema 设置为True 意味着FileScanRDDInput Size / Records 似乎总是等于所有CSV 文件中的行数。

有没有办法让 FileScan 更具选择性,以便 Spark 在推断架构时查看更少的行?

注意:将samplingRatio option 设置为

【问题讨论】:

您可以随时将两百行写入 tmp csv 文件,然后使用架构推断读取该文件,否则 csv 源将始终扫描整个文件以推断架构... 对;有几种“带外”方法有效;希望避免这种情况。但是,谢谢@eliasah! 这可能会在 Spark 3+ 中解决 github.com/apache/spark/blob/… 随机过滤数据,然后将其存储到临时文件中,读回临时文件,然后推断架构? 【参考方案1】:

您可以将输入数据的子集读入字符串的数据集中。 CSV 方法允许您将其作为参数传递。

这是一个简单的例子(我将把从输入文件中读取的行样本留给你):

val data = List("1,2,hello", "2,3,what's up?")
val csvRDD = sc.parallelize(data)
val df = spark.read.option("inferSchema","true").csv(csvRDD.toDS)
df.schema

在 spark-shell 中运行时,上面的最后一行会打印出来(为了便于阅读,我重新格式化了它):

res4: org.apache.spark.sql.types.StructType = 
    StructType(
      StructField(_c0,IntegerType,true),
      StructField(_c1,IntegerType,true),
      StructField(_c2,StringType,true)
    )

对于我有限的输入数据集,哪个是正确的架构。

【讨论】:

是的,这很聪明!与带外解决方案类似,我没有利用执行程序的分布式读取,而是对驱动程序进行本地读取。但它可以完成工作。 谢谢。如果从 HDFS 读取数据,您仍然可以从集群读取数据并在创建 csvRDD 数据集时使用 limit 方法。这就是执行者的分布式读取的意思吗? @Jedi,如果这对您有所帮助,请您接受它作为答案。【参考方案2】:

假设您只对架构感兴趣,这是基于cipri.l 在此link 中的帖子的一种可能方法

import org.apache.spark.sql.execution.datasources.csv.CSVOptions, TextInputCSVDataSource
def inferSchemaFromSample(sparkSession: SparkSession, fileLocation: String, sampleSize: Int, isFirstRowHeader: Boolean): StructType = 
  // Build a Dataset composed of the first sampleSize lines from the input files as plain text strings
  val dataSample: Array[String] = sparkSession.read.textFile(fileLocation).head(sampleSize)
  import sparkSession.implicits._
  val sampleDS: Dataset[String] = sparkSession.createDataset(dataSample)
  // Provide information about the CSV files' structure
  val firstLine = dataSample.head
  val extraOptions = Map("inferSchema" -> "true",   "header" -> isFirstRowHeader.toString)
  val csvOptions: CSVOptions = new CSVOptions(extraOptions, sparkSession.sessionState.conf.sessionLocalTimeZone)
  // Infer the CSV schema based on the sample data
  val schema = TextInputCSVDataSource.inferFromDataset(sparkSession, sampleDS, Some(firstLine), csvOptions)
  schema

与上面的GMc 的回答不同,这种方法尝试直接推断架构,就像 DataFrameReader.csv() 在后台所做的那样(但无需通过该架构构建额外的数据集,然后我们只会用它来从中检索模式)

架构是根据一个 Dataset[String] 推断出来的,该数据集仅包含输入文件中的前 sampleSize 行作为纯文本字符串。

在尝试从数据中检索样本时,Spark 只有两种方法:

    检索给定百分比数据的方法。此操作从所有分区中获取随机样本。它得益于更高的并行性,但它必须读取所有输入文件。 检索特定行数的方法。此操作必须收集驱动程序上的数据,但它可以读取单个分区(如果所需的行数足够低)

由于您提到要使用特定的少量行,并且您希望避免接触所有数据,因此我提供了基于选项 2 的解决方案

PS:DataFrameReader.textFile 方法接受文件、文件夹的路径,并且它还有一个可变参数变体,因此您可以传入一个或多个文件或文件夹。

【讨论】:

谢谢!对,这里有一个共同的主题;为了避免从所有分区读取,这个解决方案和@GMc 的解决方案都在驱动程序上构造读取行的子集并从中创建一个 RDD。

以上是关于在 read.csv 期间使用限制的 Spark 推断架构的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark spark.read.csv().collect() 返回一个空列表

Spark读取一个文件夹中的所有csv文件

使用跳过行在 Spark 中读取 csv

无法使用 spark(sqlContext) 在 aws redshift 中写入 csv 数据

为啥 spark csv 会过滤掉空行?

spark 2.x 正在使用 csv 函数将整数/双列作为字符串读取