如何在 pyspark 中启用 csv 文件的多行读取

Posted

技术标签:

【中文标题】如何在 pyspark 中启用 csv 文件的多行读取【英文标题】:How to enable multiline reading of a csv file in pyspark 【发布时间】:2018-11-18 07:04:10 【问题描述】:

我正在通过 PySpark 读取 CSV 文件。它是一个插入符号分隔的文件。 它有 5 列。我只需要 3 列。

rdd = sc.textFile("test.csv").map(lambda x: x.split("^")).filter(lambda x: len(x)>1).map(lambda x: (x[0], x[2], x[3]))

print rdd.take(5)

如下图所示,csv 文件中的数据在第 4 条记录处有一个多行数据,最后只有一列。因此,尽管该文件只有 5 条记录,但 spark 将其视为 6 条记录。所以我面临索引超出范围错误。

file.csv 中的数据:

a1^b1^c1^d1^e1
a2^b2^c2^d2^e2
a3^b3^c3^d3^e3
a4^b4^c4^d4 is 
multiline^e4
a5^b5^c5^d5^e5

如何在创建rddsc.textFile() 时启用multiline

【问题讨论】:

当我们将 rdd 创建为 spark.read.csv.option("multiLine", "true").('file.csv') 时,我在网上看到了启用多行的示例但我不能为 sc.textFile() 找到任何地方 您的意思是您只想从文本文件中读取 5 列? 让我们说如下所述,一个文件中有 5 列,只有 4 条记录。我只读最后一列。如果您看到最后一条记录在最后但一列中有多行。因此,我收到了一个错误。 a1^b1^c1^d1^e1 a2^b2^c2^d2^e2 a3^b3^c3^d3^e3 a4^b4^c4^d4 is very lenghty^e4 @Sri - 你的问题不清楚。您能否请edit您的问题,并使用输入样本和预期输出更新您的问题。您正在运行的代码和代码输出。 @Sri - 你为什么希望使用sc.textFile / rdd 而不是使用spark.read.csv.option("multiLine", "true").('file.csv') 【参考方案1】:

在我的分析中我知道,它不能通过 sc.textFile() 来完成,原因是一旦我们将 s3 文件加载到 rdd,那么 rdd 就会有元素列表作为每个记录一个s3文件。在这个级别本身,多行中的每一行都被分成不同的记录。所以不能通过sc.textFile()来实现。

【讨论】:

【参考方案2】:
from pyspark.sql.session import SparkSession

spark = SparkSession(sc)
rdd = spark.read.csv("csv.csv", multiLine=True, header="False",sep = "^", escape= "\"")

【讨论】:

虽然这段代码 sn-p 可以解决问题,但including an explanation 确实有助于提高帖子的质量。请记住,您是在为将来的读者回答问题,而这些人可能不知道您提出代码建议的原因。

以上是关于如何在 pyspark 中启用 csv 文件的多行读取的主要内容,如果未能解决你的问题,请参考以下文章

如何在 pyspark 中使用 df.write.csv 附加到 csv 文件?

如何从 pyspark 数据框中更快地保存 csv 文件?

如何在 pyspark 数据框中读取 csv 文件时读取选定的列?

如何使用pyspark流计算csv文件中的条目数

如何从pyspark中的文件中匹配/提取多行模式

如何在PySpark中调用python函数?