在 read.csv 期间使用限制的 Spark 推断架构
Posted
技术标签:
【中文标题】在 read.csv 期间使用限制的 Spark 推断架构【英文标题】:Spark infer schema with limit during a read.csv 【发布时间】:2019-04-29 14:43:20 【问题描述】:我想使用一小部分行(例如limit(100)
)从 CSV 文件目录中推断出 Spark.DataFrame 架构。
但是,将inferSchema
设置为True
意味着FileScanRDD
的Input Size / Records
似乎总是等于所有CSV 文件中的行数。
有没有办法让 FileScan 更具选择性,以便 Spark 在推断架构时查看更少的行?
注意:将samplingRatio
option 设置为
【问题讨论】:
您可以随时将两百行写入 tmp csv 文件,然后使用架构推断读取该文件,否则 csv 源将始终扫描整个文件以推断架构... 对;有几种“带外”方法有效;希望避免这种情况。但是,谢谢@eliasah! 这可能会在 Spark 3+ 中解决 github.com/apache/spark/blob/… 随机过滤数据,然后将其存储到临时文件中,读回临时文件,然后推断架构? 【参考方案1】:您可以将输入数据的子集读入字符串的数据集中。 CSV 方法允许您将其作为参数传递。
这是一个简单的例子(我将把从输入文件中读取的行样本留给你):
val data = List("1,2,hello", "2,3,what's up?")
val csvRDD = sc.parallelize(data)
val df = spark.read.option("inferSchema","true").csv(csvRDD.toDS)
df.schema
在 spark-shell 中运行时,上面的最后一行会打印出来(为了便于阅读,我重新格式化了它):
res4: org.apache.spark.sql.types.StructType =
StructType(
StructField(_c0,IntegerType,true),
StructField(_c1,IntegerType,true),
StructField(_c2,StringType,true)
)
对于我有限的输入数据集,哪个是正确的架构。
【讨论】:
是的,这很聪明!与带外解决方案类似,我没有利用执行程序的分布式读取,而是对驱动程序进行本地读取。但它可以完成工作。 谢谢。如果从 HDFS 读取数据,您仍然可以从集群读取数据并在创建 csvRDD 数据集时使用 limit 方法。这就是执行者的分布式读取的意思吗? @Jedi,如果这对您有所帮助,请您接受它作为答案。【参考方案2】:假设您只对架构感兴趣,这是基于cipri.l 在此link 中的帖子的一种可能方法
import org.apache.spark.sql.execution.datasources.csv.CSVOptions, TextInputCSVDataSource
def inferSchemaFromSample(sparkSession: SparkSession, fileLocation: String, sampleSize: Int, isFirstRowHeader: Boolean): StructType =
// Build a Dataset composed of the first sampleSize lines from the input files as plain text strings
val dataSample: Array[String] = sparkSession.read.textFile(fileLocation).head(sampleSize)
import sparkSession.implicits._
val sampleDS: Dataset[String] = sparkSession.createDataset(dataSample)
// Provide information about the CSV files' structure
val firstLine = dataSample.head
val extraOptions = Map("inferSchema" -> "true", "header" -> isFirstRowHeader.toString)
val csvOptions: CSVOptions = new CSVOptions(extraOptions, sparkSession.sessionState.conf.sessionLocalTimeZone)
// Infer the CSV schema based on the sample data
val schema = TextInputCSVDataSource.inferFromDataset(sparkSession, sampleDS, Some(firstLine), csvOptions)
schema
与上面的GMc 的回答不同,这种方法尝试直接推断架构,就像 DataFrameReader.csv() 在后台所做的那样(但无需通过该架构构建额外的数据集,然后我们只会用它来从中检索模式)
架构是根据一个 Dataset[String] 推断出来的,该数据集仅包含输入文件中的前 sampleSize
行作为纯文本字符串。
在尝试从数据中检索样本时,Spark 只有两种方法:
-
检索给定百分比数据的方法。此操作从所有分区中获取随机样本。它得益于更高的并行性,但它必须读取所有输入文件。
检索特定行数的方法。此操作必须收集驱动程序上的数据,但它可以读取单个分区(如果所需的行数足够低)
由于您提到要使用特定的少量行,并且您希望避免接触所有数据,因此我提供了基于选项 2 的解决方案
PS:DataFrameReader.textFile 方法接受文件、文件夹的路径,并且它还有一个可变参数变体,因此您可以传入一个或多个文件或文件夹。
【讨论】:
谢谢!对,这里有一个共同的主题;为了避免从所有分区读取,这个解决方案和@GMc 的解决方案都在驱动程序上构造读取行的子集并从中创建一个 RDD。以上是关于在 read.csv 期间使用限制的 Spark 推断架构的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark spark.read.csv().collect() 返回一个空列表