pyspark 使用模式将 csv 文件加载到数据框中

Posted

技术标签:

【中文标题】pyspark 使用模式将 csv 文件加载到数据框中【英文标题】:pyspark load csv file into dataframe using a schema 【发布时间】:2019-02-13 13:24:35 【问题描述】:

我是 pyspark 的新手,正在使用 Spark 2.2.0 版和 Python 2.7.12 版开发 pyspark

我正在尝试将 2 个 .csv 文件(具有超过 1 个标题行)读入 2 个具有已知架构的不同数据帧并执行比较操作。

我不确定是否有任何最佳/更好的方法来创建架构文件(包括列名、数据类型、可空性)并在 pyspark 程序中引用它以加载到数据帧中。

我为第一个文件编码如下:

    创建一个 yaml 文件来存储文件路径和架构

    读取模式文件并在循环中动态构造 StructField(column name, datatype, nullanbility)。 例子: [StructField(column1,Integer,true), StructField(column2,string,true), StructField(column3,decimal(10,2),true), ....]

    将数据文件读入RDD并删除2个标题行(将使用减法函数)

    使用 sqlContext.createDataFrame 通过传递 RDD、模式结构创建数据帧。

我可以为第一个文件的示例数据执行这些步骤。

请建议是否有更好的方法(我还没有探索 StructType 的 fromDDL 选项)。为第二个文件创建类似的数据框后,需要应用功能逻辑。

谢谢

【问题讨论】:

只是想明白,你的每个 csv 文件都有 '1' header 。那么,'2' csv 文件你有并且想用你说的方法“删除 2 个标题行”? csv 文件每个有 2 个标题 我能够完成这 4 个步骤并创建数据框。保持开放以了解是否有更好的方法。 【参考方案1】:

如何使用 pyspark spark.read.csv 读取文件,其中 stucttype 用于架构,选项 header=true 和 mode=DROPMALFORMED 将忽略任何与架构不匹配的记录。

【讨论】:

嗨,Ron D,保持 header=true 不起作用。相反,我所做的只是强制执行模式而不指定标头。之后,我使用 dropna 选项过滤掉了 2 个标题记录。保持问题的开放性以了解哪种方法更好。【参考方案2】:

我可以使用 yaml 配置文件(存储架构)并从 pyspark 读取以动态构建 StructType。

它正在工作并满足要求。如果有更好的方法,很高兴听到。

【讨论】:

以上是关于pyspark 使用模式将 csv 文件加载到数据框中的主要内容,如果未能解决你的问题,请参考以下文章

加载csv文件s3 pyspark的随机样本

Pyspark 写入数据帧并将其保存到 csv 文件中会出现错误

pyspark 遍历 hdfs 目录并将数据加载到多个表中

Pyspark 解释了使用和不使用自定义模式来读取 csv 的区别

Pyspark - 将数据帧写入 2 个不同的 csv 文件

如何在 pyspark 中使用 df.write.csv 附加到 csv 文件?