在 Scala / Spark 中通过 CSV 文件中的行有条件地映射以生成另一个 CSV 文件
Posted
技术标签:
【中文标题】在 Scala / Spark 中通过 CSV 文件中的行有条件地映射以生成另一个 CSV 文件【英文标题】:Conditionally map through rows in CSV file in Scala / Spark to produce another CSV file 【发布时间】:2016-07-30 10:47:36 【问题描述】:我对 Scala / Spark 还很陌生,我已经陷入了困境。几周以来,我一直在努力为 Scala 2.11.8 上一个看似简单的问题找到解决方案,但一直无法找到一个好的解决方案。 我有一个接近 150 GB 的 csv 格式的大型数据库,其中包含大量空值,需要根据各个列的值进行缩减和清理。
原始CSV文件的架构如下:
第 1 列:双倍 第 2 列:整数 第 3 列:双倍 第 4 列:双倍 第 5 列:整数 第 6 列:双倍 第 7 列:整数所以,我想有条件地映射 CSV 文件的所有行并将结果导出到另一个 CSV 文件,每行具有以下条件:
如果第 4 列的值不为空,则该行的第 4、5、6 和 7 列的值应存储为名为 lastValuesOf4to7 的数组。 (数据集中如果第4列元素不为null,则第1、2、3列为null,可以忽略)
如果第 3 列的值不为空,则如上所述,第 1、2 和 3 列的值以及 lastValuesOf4to7 数组中的四个元素应作为新行导出到另一个 CSV 文件中称为 condensed.csv。 (数据集中如果第3列元素不为空,则第4、5、6、7列为空,可以忽略)
所以最后我应该得到一个名为 condensed.csv 的 csv 文件,它有 7 列。
我曾尝试在 Scala 中使用以下代码,但未能取得进一步进展:
import scala.io.Source
object structuringData
def main(args: Array[String])
val data = Source.fromFile("/path/to/file.csv")
var lastValuesOf4to7 = Array("0","0","0","0")
val lines = data.getLines // Get the lines of the file
val splitLine = lines.map(s => s.split(',')).toArray // This gives an out of memory error since the original file is huge.
data.close
正如您从上面的代码中看到的那样,我尝试将其移动到一个数组中,但由于无法单独处理每一行,因此无法进一步进行。
我很确定在 Scala / Spark 上处理 csv 文件必须有直接的解决方案。
【问题讨论】:
【参考方案1】:使用 Spark-csv 包,然后使用 Sql 查询查询数据并根据您的用例制作过滤器,然后在最后导出。
如果您使用的是 spark 2.0.0,则 spark-csv 将存在于 spark-sql 中,否则如果您使用的是旧版本,请相应地添加依赖项。
您可以在此处找到指向spark-csv 的链接。
您也可以在这里查看示例:http://blog.madhukaraphatak.com/analysing-csv-data-in-spark/
【讨论】:
感谢您的回复,Shivansh。我还不熟悉 spark-sql,必须弄清楚这一点。我正在为 Hadoop 2.6 使用 Spark 1.6.1。 @Moose :我已经用示例链接更新了我的答案。如果您觉得答案足够好,请点赞并接受!谢谢:)【参考方案2】:感谢您的回复。我设法自己使用 Bash Script 创建了一个解决方案。我必须先从一个空白的 condensed.csv 文件开始。我的代码显示了实现这一目标是多么容易:
#!/bin/bash
OLDIFS=$IFS
IFS=","
last1=0
last2=0
last3=0
last4=0
while read f1 f2 f3 f4 f5 f6 f7
do
if [[ $f4 != "" ]];
then
last1=$f4
last2=$f5
last3=$f6
last4=$f7
elif [[ $f3 != "" ]];
then
echo "$f1,$f2,$f3,$last1,$last2,$last3,$last4" >> path/to/condensed.csv
fi
done < $1
IFS=$OLDIFS
如果脚本以名称 extractcsv.sh 保存,那么它应该使用以下格式运行:
$ ./extractcsv.sh path/to/original/file.csv
这只是为了证实我的观察,即 ETL 在 Bash 上比在 Scala 上更容易。不过,感谢您的帮助。
【讨论】:
以上是关于在 Scala / Spark 中通过 CSV 文件中的行有条件地映射以生成另一个 CSV 文件的主要内容,如果未能解决你的问题,请参考以下文章