如何将csv文件转换为rdd

Posted

技术标签:

【中文标题】如何将csv文件转换为rdd【英文标题】:How do I convert csv file to rdd 【发布时间】:2014-06-19 05:35:27 【问题描述】:

我是新来的火花。我想对 CSV 记录中的特定数据执行一些操作。

我正在尝试读取 CSV 文件并将其转换为 RDD。我的进一步操作基于 CSV 文件中提供的标题。

(来自 cmets) 到目前为止,这是我的代码:

final JavaRDD<String> File = sc.textFile(Filename).cache();
final JavaRDD<String> lines = File.flatMap(new FlatMapFunction<String, String>()  
    @Override public Iterable<String> call(String s)  
    return Arrays.asList(EOL.split(s)); 
     
);
final String heading=lines.first().toString();

我可以像这样获取标题值。我想将此映射到 CSV 文件中的每条记录。

final String[] header=heading.split(" "); 

我可以像这样获取标题值。我想将此映射到 CSV 文件中的每条记录。

在 java 中,我使用 CSVReader record.getColumnValue(Column header) 来获取特定值。我需要在这里做类似的事情。

【问题讨论】:

到目前为止你尝试过什么?你被困在哪里了? “Zend me da codez”不受欢迎。是事先知道 csv 结构还是需要从 headers 中发现? 我需要从标题中发现结构。 【参考方案1】:

一种简单的方法是保留标题。

假设您有一个类似的 file.csv:

user, topic, hits
om,  scala, 120
daniel, spark, 80
3754978, spark, 1

我们可以定义一个使用第一行的解析版本的头类:

class SimpleCSVHeader(header:Array[String]) extends Serializable 
  val index = header.zipWithIndex.toMap
  def apply(array:Array[String], key:String):String = array(index(key))

我们可以使用该标头来进一步处理数据:

val csv = sc.textFile("file.csv")  // original file
val data = csv.map(line => line.split(",").map(elem => elem.trim)) //lines in rows
val header = new SimpleCSVHeader(data.take(1)(0)) // we build our header with the first line
val rows = data.filter(line => header(line,"user") != "user") // filter the header out
val users = rows.map(row => header(row,"user")
val usersByHits = rows.map(row => header(row,"user") -> header(row,"hits").toInt)
...

请注意,header 只不过是助记符到数组索引的简单映射。几乎所有这些都可以在数组中元素的序数位置完成,例如user = row(0)

PS:欢迎使用 Scala :-)

【讨论】:

PS:我知道你是 *** 的新手。欢迎。将来,您将通过尽可能具体地提出问题来获得更好的帮助,展示您到目前为止所做的事情以及您遇到的问题。尝试显示当前问题的小代码示例。 这是非常危险的:这是一个有效的 csv 行:"a,b,c,,d,,,,,," 拆分后它将是 ("a","b","c","","d"),因此您将在最后一个非空列之后丢失所有空列!【参考方案2】:

您可以使用 spark-csv 库:https://github.com/databricks/spark-csv

这直接来自文档:

import org.apache.spark.sql.SQLContext

SQLContext sqlContext = new SQLContext(sc);

HashMap<String, String> options = new HashMap<String, String>();
options.put("header", "true");
options.put("path", "cars.csv");

DataFrame df = sqlContext.load("com.databricks.spark.csv", options);

【讨论】:

这有点不完美/不完整的答案:因为设置 spark-csv 包并非易事,而且 OP 是“新的”。 @javadba 我认为这是这里唯一的通用答案。此处的所有其他尝试都假定您可以天真地用逗号拆分 csv,并且仅在一些非常简单的情况下才是正确的。 @zero323 经过几次尝试,我的 spark-csv 包失败了。我编写了自己的 csv 解析器。这就是为什么我不是这个答案的忠实粉丝。显然其他人的运气更好。【参考方案3】:

首先我必须说,如果将标题放在单独的文件中会简单得多——这是大数据中的惯例。

无论如何,丹尼尔的答案非常好,但是它效率低下和错误,所以我将发布我自己的。效率低下的是你不需要检查每条记录来查看它是否是标题,你只需要检查每个分区的第一条记录。错误在于,使用.split(","),当条目为空字符串并出现在记录的开头或结尾时,您可能会抛出异常或获取错误的列——更正您需要使用.split(",", -1)。所以这里是完整的代码:

val header =
  scala.io.Source.fromInputStream(
    hadoop.fs.FileSystem.get(new java.net.URI(filename), sc.hadoopConfiguration)
    .open(new hadoop.fs.Path(path)))
  .getLines.head

val columnIndex = header.split(",").indexOf(columnName)

sc.textFile(path).mapPartitions(iterator => 
  val head = iterator.next()
  if (head == header) iterator else Iterator(head) ++ iterator
)
.map(_.split(",", -1)(columnIndex))

最后一点,如果您只想找出某些列,请考虑 Parquet。或者,如果您的行很宽,至少可以考虑实现一个惰性求值的拆分函数。

【讨论】:

【参考方案4】:

我们可以使用新的 DataFrameRDD 来读写 CSV 数据。 DataFrameRDD 相对于 NormalRDD 的优势很少:

    DataFrameRDD 比 NormalRDD 快一点,因为我们确定了架构,这有助于在运行时进行大量优化并为我们提供显着的性能提升。 即使列在 CSV 中发生变化,它也会自动获取正确的列,因为我们没有将读取数据时存在的列号硬编码为 textFile,然后将其拆分,然后使用列号来获取数据. 只需几行代码,您就可以直接读取 CSV 文件。

你需要拥有这个库:在 build.sbt 中添加它

libraryDependencies += "com.databricks" % "spark-csv_2.10" % "1.2.0"

Spark Scala 代码:

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val csvInPath = "/path/to/csv/abc.csv"
val df = sqlContext.read.format("com.databricks.spark.csv").option("header","true").load(csvInPath)
//format is for specifying the type of file you are reading
//header = true indicates that the first line is header in it

通过从中获取一些列来转换为普通 RDD

val rddData = df.map(x=>Row(x.getAs("colA")))
//Do other RDD operation on it

将 RDD 保存为 CSV 格式:

val aDf = sqlContext.createDataFrame(rddData,StructType(Array(StructField("colANew",StringType,true))))
aDF.write.format("com.databricks.spark.csv").option("header","true").save("/csvOutPath/aCSVOp")

由于标头设置为 true,我们将在所有输出文件中获取标头名称。

【讨论】:

【参考方案5】:

这是另一个使用 Spark/Scala 到 convert a CSV to RDD 的示例。有关更详细的说明,请参阅此post。

def main(args: Array[String]): Unit = 
  val csv = sc.textFile("/path/to/your/file.csv")

  // split / clean data
  val headerAndRows = csv.map(line => line.split(",").map(_.trim))
  // get header
  val header = headerAndRows.first
  // filter out header (eh. just check if the first val matches the first header name)
  val data = headerAndRows.filter(_(0) != header(0))
  // splits to map (header/value pairs)
  val maps = data.map(splits => header.zip(splits).toMap)
  // filter out the user "me"
  val result = maps.filter(map => map("user") != "me")
  // print result
  result.foreach(println)

【讨论】:

【参考方案6】:

我建议直接从驱动程序读取标题,而不是通过 Spark。造成这种情况的两个原因:1)它是单行。分布式方法没有任何优势。 2) 我们在驱动中需要这条线,而不是工作节点。

它是这样的:

// Ridiculous amount of code to read one line.
val uri = new java.net.URI(filename)
val conf = sc.hadoopConfiguration
val fs = hadoop.fs.FileSystem.get(uri, conf)
val path = new hadoop.fs.Path(filename)
val stream = fs.open(path)
val source = scala.io.Source.fromInputStream(stream)
val header = source.getLines.head

现在,当您制作 RDD 时,您可以丢弃标头。

val csvRDD = sc.textFile(filename).filter(_ != header)

然后我们可以从一列做一个RDD,例如:

val idx = header.split(",").indexOf(columnName)
val columnRDD = csvRDD.map(_.split(",")(idx))

【讨论】:

最终 JavaRDD File = sc.textFile(Filename).cache(); final JavaRDD 行 = File.flatMap(new FlatMapFunction() @Override public Iterable call(String s) return Arrays.asList(EOL.split(s)); ) ;最终字符串标题=lines.first().toString(); final String[] header=heading.split(" ");我可以得到这样的标题值。我想将它映射到 csv 文件中的每条记录。 啊,我明白了!那么请把这个放在问题中。我们不是读心者。 (显然。) 谢谢。我是 spark 新手。我想对记录中的特定数据执行一些操作。有没有其他方法可以做到这一点。 @user3754978 re:code -> 我建议你使用 Scala for Spark。很难阅读,该代码是。 re:记录操作->有几个选项,都取决于你想用它们做什么。你需要更具体。 根据大数据约定,我们有两个文件,分别是 HeaderFile 和 DataFile。Header 文件很小(读取不以分布式格式存储),因此我们可以直接通过 Hadoop(HDFS) 和DataFile 因为它很大,所以我们使用 Spark Context,因为数据本身将以分布式格式存储。所以这种方法对实时使用有很大帮助。【参考方案7】:

另一种选择是使用mapPartitionsWithIndex 方法,因为您将获得分区索引号和该分区内所有行的列表。分区 0 和行 0 将是标题

val rows = sc.textFile(path)
  .mapPartitionsWithIndex( (index: Int, rows: Iterator[String]) => 
    val results = new ArrayBuffer[(String, Int)]

    var first = true
    while (rows.hasNext) 
      // check for first line
      if (index == 0 && first) 
        first = false
        rows.next // skip the first row
       else 
        results += rows.next
      
    

    results.toIterator
, true)

rows.flatMap  row => row.split(",") 

【讨论】:

【参考方案8】:

这个怎么样?

val Delimeter = ","
val textFile = sc.textFile("data.csv").map(line => line.split(Delimeter))

【讨论】:

转换为文本文件后得到这样的输出 "id" "fname" "lname" "Address" "1" "xxxx" "yyyy" "xxxx" "-#-EOL-#-" " 2" "yyyy" "aaaa" "zzzz" "-#-EOL-#-" "3" "ssss" "ssss" "zzzz" "-#-EOL-#-" 所以重点是您不希望标头出现在 RDD 中,对吗?我根据这个假设添加了一个答案。 我需要一个类似于每个值的映射与标题形成这样的记录 <1 fname:xxx lname:yyyy address:zzzz> 【参考方案9】:

对于 spark scala,我通常在无法使用 spark csv 包时使用...

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val rawdata = sc.textFile("hdfs://example.host:8020/user/example/example.csv")
val header = rawdata.first()
val tbldata = rawdata.filter(_(0) != header(0))

【讨论】:

【参考方案10】:

建议你试试

https://spark.apache.org/docs/latest/sql-programming-guide.html#rdds

JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").map(
  new Function<String, Person>() 
    public Person call(String line) throws Exception 
      String[] parts = line.split(",");

      Person person = new Person();
      person.setName(parts[0]);
      person.setAge(Integer.parseInt(parts[1].trim()));

      return person;
    
  );

您必须在此示例中拥有一个具有文件头规范的类,并将您的数据与架构相关联并应用 mysql 中的标准......以获得所需的结果

【讨论】:

【参考方案11】:

我认为您可以尝试将该 csv 加载到 RDD 中,然后从该 RDD 创建一个数据帧,这是从 rdd 创建数据帧的文档:http://spark.apache.org/docs/latest/sql-programming-guide.html#interoperating-with-rdds

【讨论】:

【参考方案12】:

从 Spark 2.0 开始,CSV 可以直接读入 DataFrame

如果数据文件没有标题行,则为:

val df = spark.read.csv("file://path/to/data.csv")

这将加载数据,但为每一列提供通用名称,如 _c0_c1 等。

如果有标题,那么添加.option("header", "true") 将使用第一行来定义DataFrame 中的列:

val df = spark.read
  .option("header", "true")
  .csv("file://path/to/data.csv")

举个具体的例子,假设你有一个包含内容的文件:

user,topic,hits
om,scala,120
daniel,spark,80
3754978,spark,1

那么下面会得到按主题分组的总命中数:

import org.apache.spark.sql.functions._
import spark.implicits._

val rawData = spark.read
  .option("header", "true")
  .csv("file://path/to/data.csv")

// specifies the query, but does not execute it
val grouped = rawData.groupBy($"topic").agg(sum($"hits))

// runs the query, pulling the data to the master node
// can fail if the amount of data is too much to fit 
// into the master node's memory!
val collected = grouped.collect

// runs the query, writing the result back out
// in this case, changing format to Parquet since that can
//   be nicer to work with in Spark
grouped.write.parquet("hdfs://some/output/directory/")

// runs the query, writing the result back out
// in this case, in CSV format with a header and 
// coalesced to a single file.  This is easier for human 
// consumption but usually much slower.
grouped.coalesce(1)
  .write
  .option("header", "true")
  .csv("hdfs://some/output/directory/")

【讨论】:

以上是关于如何将csv文件转换为rdd的主要内容,如果未能解决你的问题,请参考以下文章

将 RDD 转换为 kmeans 的有效输入

如何将具有多类的 LibSVM 文件转换为 RDD [labelPoint]

如何使用 shell 脚本将 xlsx 文件转换为 csv 文件?

如何将CSV文件转换为python字典

如何将csv文件转换成excel文件呢?

Spark:scala - 如何将集合从 RDD 转换为另一个 RDD