在 spark 中循环遍历 csv 文件的最佳方法

Posted

技术标签:

【中文标题】在 spark 中循环遍历 csv 文件的最佳方法【英文标题】:Best approach for looping through a csv file in spark 【发布时间】:2016-08-05 18:48:27 【问题描述】:

我是新来的火花。我正在关注文档中的一些基本示例。

我有一个这样的csv文件:(简化版,真实的有近4万行)

date,category
19900108,apples
19900108,apples
19900308,peaches
19900408,peaches
19900508,pears
19910108,pears
19910108,peaches
19910308,apples
19910408,apples
19910508,apples
19920108,pears
19920108,peaches
19920308,apples
19920408,peaches
19920508,pears

这段 scala 代码可以很好地计算类别总数

val textFile = sc.textFile("sample.csv")
textFile.filter(line => line.contains("1990")).filter(line =>line.contains("peaches")).count()
textFile.filter(line => line.contains("1990")).filter(line => line.contains("apples")).count()
textFile.filter(line => line.contains("1990")).filter(line => line.contains("pears")).count()

遍历每一行的最佳方法是什么,按年份添加类别总数,以便我最终编写一个这样的 csv 文件:

date,apples,peaches,pears
1990,2,2,1
1991,3,1,1
1992,1,2,2

任何帮助将不胜感激。

【问题讨论】:

Pivot Spark Dataframe的可能重复 【参考方案1】:
//Create Spark SQL Context    
val sqlContext = new SQLContext(sc)

//read csv
var df = sqlContext.read.format("com.databricks.spark.csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load("sample.csv")
df = df.withColumn("year", df.col("date").substr(0,4))
df = df.groupBy("year").pivot("category").agg("category"->"count")
df.withColumn("total", df.col("apples").+(df.col("peaches")).+(df.col("pears"))).show()

//Dependency required:
<dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-csv_2.10</artifactId>
        <version>1.4.0</version>
</dependency>

【讨论】:

Spark 2.0 最近发布,现在支持原生 CSV :) 感谢 VenkatN 的回答。过去几天我一直在天气不好,所以我没能早点看到这个。当我运行此脚本时,我在此行收到“错误重新分配 val”消息: df = df.withColumn("year", df.col("date").substr(0,4)) With Spark 2,0 does这意味着我没有包含 databricks csv 包? 出于同样的原因,我将“df”声明为 var 而不是 val。因此,为了避免“错误重新分配 val”,您必须做同样的事情,或者您可以初始化一个新变量而不是将其重新分配给 df,例如:val df2 = df.withColumn("year", df.col("date").substr(0,4))df2.groupBy("year").pivot("category").agg("category"-&gt;"count").show() 谢谢,好像可以了。我的错误是使用“val df”而不是“var df”过去几天我一直在与感冒作斗争,我想不通如果我想要一个总列怎么办,所以我的 CSV 文件看起来像这样:日期,总,苹果,桃子,梨 1990,5,2,2,1 请检查更新的答案,这应该对你有帮助。

以上是关于在 spark 中循环遍历 csv 文件的最佳方法的主要内容,如果未能解决你的问题,请参考以下文章

如何设置循环遍历CSV文件中每个值的jmeter测试?

spark中将每个组作为新数据帧并在循环中传递另一个函数的最佳方法是啥?

使用 Talend 循环遍历 .csv 文件

遍历 for 循环并将检索到的数据保存在每个循环的唯一 csv 文件中 | Python

Python 循环遍历 csv 文件中的 url 返回 \ufeffhttps://

Spark中的最佳重新分区方式