Spark读取一个文件夹中的所有csv文件

Posted CYDCS

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark读取一个文件夹中的所有csv文件相关的知识,希望对你有一定的参考价值。

Spark读取一个文件夹中的所有csv文件

前提:dataset 文件夹中包含多个csv文件

1、直接读取
val data: DataFrame = spark.read.csv(“dataset/*.csv”) //使用星号匹配所有csv文件

2、对读取的内容进行设置
val data: DataFrame = spark.read.option(“inferSchema”, “true”) //通过设置inferSchema=true,Spark 将自动遍历 csv 文件并推断每列的架构。默认为false,数据读取为String类型。
.option(“header”, true) //选取数据第一行作为dataframe数据列的名称
.option(“dateFormat”, “yyyyMMdd”) //设置时间格式
.csv("dataset/*.csv)

Spark 读取 csv 文件,文件中的一列带有引号

【中文标题】Spark 读取 csv 文件,文件中的一列带有引号【英文标题】:Spark read for csv file with quotes on one column in file 【发布时间】:2022-01-16 19:43:54 【问题描述】:

我的 HDFS 位置有一个 csv 文件,它在一列上有引号。我的文件是 records.csv ,这是它的数据

 100,"surender,CHN",IND
 101,"ajay,HYD",IND



 scala> val schema = StructType(
 | Array(
 | StructField("emp_id", StringType, true),
 | StructField("emp_name", StringType, true),
 | StructField("emp_city", StringType, true),
 | StructField("emp_country", StringType, true)
 | )
 | )
 schema: org.apache.spark.sql.types.StructType = StructType(StructField(emp_id,StringType,true), StructField(emp_name,StringType,true), StructField(emp_city,StringType,true), StructField(emp_country,StringType,true))

 scala>

 scala> val loc = "/user/omega/records.csv"
 loc: String = /user/omega/records.csv

                                                                                          ^

 scala> val loc = "/user/omega/records.csv"
 loc: String = /user/omega/records.csv

 scala> val df = spark.read.option("delimiter", ",").option("quote", "\"").option("escape", "\"").schema(schema).csv(loc)
 df: org.apache.spark.sql.DataFrame = [emp_id: string, emp_name: string ... 2 more fields]

  scala> df.show(10,false)
  +------+------------+--------+-----------+
  |emp_id|emp_name    |emp_city|emp_country|
  +------+------------+--------+-----------+
  |100   |surender,CHN|IND     |null       |
  |101   |ajay,HYD    |IND     |null       |
  +------+------------+--------+-----------+

但我的预期输出是

  +------+------------+--------+-----------+
  |emp_id|emp_name    |emp_city|emp_country|
  +------+------------+--------+-----------+
  |100   |surender    |CHN     |IND       |
  |101   |ajay        |HYD     |IND       |
  +------+------------+--------+-----------+

我如何获得预期的输出?

我尝试了另一个代码,如下所示

  val df1 = spark.read.option("delimiter", ",").option("quote", "").option("escape quote", "").schema(schema).csv(loc)

上面的df1给出了以下结果

 +------+---------+--------+-----------+
 |emp_id| emp_name|emp_city|emp_country|
 +------+---------+--------+-----------+
 |   100|"surender|    CHN"|        IND|
 |   101|    "ajay|    HYD"|        IND|
 +------+---------+--------+-----------+

【问题讨论】:

【参考方案1】:

一个简单的解决方案是在读取 CSV 后清理数据

df
  .withColumn("emp_name", split(col("column_with_quotes"), ",").getItem(0))
  .withColumn("emp_city", split(col("column_with_quotes"), ",").getItem(1))
  .drop("column_with_quotes")

稍后更新

我查看了CSV options。你检查过这个选项吗?

unescapedQuoteHandling BACK_TO_DELIMITER #Defines how the CsvParser will handle values with unescaped quotes

【讨论】:

@gater ,这会起作用,但这对我没有帮助,因为 column_with_quotes 后面有大约 100 多列,我不想写很多 .withColumn

以上是关于Spark读取一个文件夹中的所有csv文件的主要内容,如果未能解决你的问题,请参考以下文章

将读取文件的架构存储到 spark scala 中的 csv 文件中

如何使用 spark(python)读取 zip 文件中的 CSV 文件的内容 [重复]

如何使用 spark-shell 读取 .csv 文件

读取 csv 文件时 MS Databricks Spark 中绝对 URI 中的相对路径

Spark s3 csv文件读取顺序

如何读取 csv 文件并将值分配给 spark scala 中的变量