使用跳过行在 Spark 中读取 csv

Posted

技术标签:

【中文标题】使用跳过行在 Spark 中读取 csv【英文标题】:Read csv in Spark with skip rows 【发布时间】:2019-06-10 17:08:43 【问题描述】:

我正在尝试将数据从 csv 文件加载到 DataFrame。我必须使用spark.read.csv() 函数,因为rdd sc.fileText() 不适用于文件中的特定数据(csv 数据中有不可见的逗号字符,rdd sc.fileText() 将它们拆分)。 csv 文件在开始时有几行要跳过:

Report <- to skip
Generated by YYYY-MM-DD <- to skip
Sessions <- to skip
<- to skip
Session of all hosts <- to skip
col1,col2,col3,col4,col5 <- it is my header
tst1,tst2,tst3,tst4,tst5 <- my data start here
tst6,tst7,tst8,tst9,tst10
...

我想要一个这样的数据框:

tst1,tst2,tst3,tst4,tst5 <- data
tst6,tst7,tst8,tst9,tst10

我尝试使用地图和过滤功能,但无法正常工作:

.rdd.map(lambda line: str(line).split(','))\
.filter(lambda line: len(line)>3).collect()

【问题讨论】:

【参考方案1】:

我找不到任何可以让我跳过指定行数的东西,但我能够通过指定架构然后读取数据并过滤掉我不想带来的任何东西来完成相同的结果。

import org.apache.spark.sql.types.StructType, StringType

val fileSchema = new StructType()
  .add("column1", StringType)
  .add("column2", StringType)
  .add("column3", StringType)
  .add("column4", StringType)
  .add("column5", StringType)

val data = spark.read.schema(fileSchema).csv("s3a://aws-s3-test-bucket/jeremy/foo.txt")
data.show(false)

产量

+-----------------------+-------+-------+-------+-------+
|column1                |column2|column3|column4|column5|
+-----------------------+-------+-------+-------+-------+
|Report                 |null   |null   |null   |null   |
|Generated by YYYY-MM-DD|null   |null   |null   |null   |
|Sessions               |null   |null   |null   |null   |
|Session of all hosts   |null   |null   |null   |null   |
|col1                   |col2   |col3   |col4   |col5   |
|tst1                   |tst2   |tst3   |tst4   |tst5   |
|tst6                   |tst7   |tst8   |tst9   |tst10  |
+-----------------------+-------+-------+-------+-------+

然后,您可以过滤掉包含您知道不应该在其中的数据的行,例如您想跳过的行和列名。

我建议在创建架构时使用实际的列名,而不是使用 column1 -> real_name_of_column1 来获取所需数据帧的占位符。我在示例中使用了占位符来强调这样一个事实,即我在创建架构时使用的列名不一定需要由数据通知。

+-----------------------+-------+-------+-------+-------+
|col1                   |col2   |col3   |col4   |col5   |
+-----------------------+-------+-------+-------+-------+
|tst1                   |tst2   |tst3   |tst4   |tst5   |
|tst6                   |tst7   |tst8   |tst9   |tst10  |
...
+-----------------------+-------+-------+-------+-------+

【讨论】:

以上是关于使用跳过行在 Spark 中读取 csv的主要内容,如果未能解决你的问题,请参考以下文章

Python Pandas read_csv 跳过行但保留标题

Python Pandas,读取文件并在标题前跳过行

如果所有非可选字段为空,如何在 CsvHelper.Configuration 中配置跳过记录

如何从mysql中的表中跳过行

Pandas-如何添加条目跳过行的列?

SQL SELECT 语句根据 WHERE 语句跳过行