在 Scala / Spark 中通过 CSV 文件中的行有条件地映射以生成另一个 CSV 文件

Posted

技术标签:

【中文标题】在 Scala / Spark 中通过 CSV 文件中的行有条件地映射以生成另一个 CSV 文件【英文标题】:Conditionally map through rows in CSV file in Scala / Spark to produce another CSV file 【发布时间】:2016-07-30 10:47:36 【问题描述】:

我对 Scala / Spark 还很陌生,我已经陷入了困境。几周以来,我一直在努力为 Scala 2.11.8 上一个看似简单的问题找到解决方案,但一直无法找到一个好的解决方案。 我有一个接近 150 GB 的 csv 格式的大型数据库,其中包含大量空值,需要根据各个列的值进行缩减和清理。

原始CSV文件的架构如下:

第 1 列:双倍 第 2 列:整数 第 3 列:双倍 第 4 列:双倍 第 5 列:整数 第 6 列:双倍 第 7 列:整数

所以,我想有条件地映射 CSV 文件的所有行并将结果导出到另一个 CSV 文件,每行具有以下条件:

    如果第 4 列的值不为空,则该行的第 4、5、6 和 7 列的值应存储为名为 lastValuesOf4to7 的数组。 (数据集中如果第4列元素不为null,则第1、2、3列为null,可以忽略)

    如果第 3 列的值不为空,则如上所述,第 1、2 和 3 列的值以及 lastValuesOf4to7 数组中的四个元素应作为新行导出到另一个 CSV 文件中称为 condensed.csv。 (数据集中如果第3列元素不为空,则第4、5、6、7列为空,可以忽略)

所以最后我应该得到一个名为 condensed.csv 的 csv 文件,它有 7 列。

我曾尝试在 Scala 中使用以下代码,但未能取得进一步进展:

import scala.io.Source

object structuringData 
  def main(args: Array[String]) 

  val data = Source.fromFile("/path/to/file.csv") 

  var lastValuesOf4to7 = Array("0","0","0","0")

  val lines = data.getLines // Get the lines of the file

  val splitLine = lines.map(s => s.split(',')).toArray // This gives an out of memory error since the original file is huge.



  data.close
  

正如您从上面的代码中看到的那样,我尝试将其移动到一个数组中,但由于无法单独处理每一行,因此无法进一步进行。

我很确定在 Scala / Spark 上处理 csv 文件必须有直接的解决方案。

【问题讨论】:

【参考方案1】:

使用 Spark-csv 包,然后使用 Sql 查询查询数据并根据您的用例制作过滤器,然后在最后导出。

如果您使用的是 spark 2.0.0,则 spark-csv 将存在于 spark-sql 中,否则如果您使用的是旧版本,请相应地添加依赖项。

您可以在此处找到指向spark-csv 的链接。

您也可以在这里查看示例:http://blog.madhukaraphatak.com/analysing-csv-data-in-spark/

【讨论】:

感谢您的回复,Shivansh。我还不熟悉 spark-sql,必须弄清楚这一点。我正在为 Hadoop 2.6 使用 Spark 1.6.1。 @Moose :我已经用示例链接更新了我的答案。如果您觉得答案足够好,请点赞并接受!谢谢:)【参考方案2】:

感谢您的回复。我设法自己使用 Bash Script 创建了一个解决方案。我必须先从一个空白的 condensed.csv 文件开始。我的代码显示了实现这一目标是多么容易:

#!/bin/bash
OLDIFS=$IFS
IFS=","
last1=0
last2=0
last3=0
last4=0
while read f1 f2 f3 f4 f5 f6 f7
do
    if [[ $f4 != "" ]];
    then 
        last1=$f4
        last2=$f5
        last3=$f6
        last4=$f7

    elif [[ $f3 != "" ]];
    then 
        echo "$f1,$f2,$f3,$last1,$last2,$last3,$last4" >> path/to/condensed.csv
    fi

done < $1
IFS=$OLDIFS

如果脚本以名称 extractcsv.sh 保存,那么它应该使用以下格式运行:

$ ./extractcsv.sh path/to/original/file.csv

这只是为了证实我的观察,即 ETL 在 Bash 上比在 Scala 上更容易。不过,感谢您的帮助。

【讨论】:

以上是关于在 Scala / Spark 中通过 CSV 文件中的行有条件地映射以生成另一个 CSV 文件的主要内容,如果未能解决你的问题,请参考以下文章

02 使用spark进行词频统计【scala交互】

02 使用spark进行词频统计scala交互

通过Map Spark Scala循环

spark-shell的Scala的一些方法详解

如何使用 scala/python/spark 在 CSV 文件中创建新的时间戳(分钟)列

通过在spark中使用scala加载csv文件来创建数据帧