如何通过 Delimiter 拆分 Spark RDD 的行

Posted

技术标签:

【中文标题】如何通过 Delimiter 拆分 Spark RDD 的行【英文标题】:How to split rows of a Spark RDD by Deliminator 【发布时间】:2015-04-20 16:36:43 【问题描述】:

我正在尝试将 Spark 中的数据拆分为 Array[String] 的 RDD 形式。目前我已将文件加载到String 的RDD 中。

> val csvFile = textFile("/input/spam.csv")

我想在 , 分隔符上进行拆分。

【问题讨论】:

spark.apache.org/docs/latest/quick-start.html @ipoteka:我已经浏览了这个链接,但找不到任何 CSV 文件 ***.com/questions/24299427/… @ipoteka:谢谢,但你能不能把它写成代码,因为我是相对论新手,我无法理解它。 如果您的截止日期很紧,我建议您研究并研究该主题,而不是等待答案。提供的链接包含代码。经历一遍,尝试一下,如有必要,提出新问题。 'Plz zend me de codez' 不受欢迎'。 【参考方案1】:

这个:

val csvFile = textFile("/input/spam.csv").map(line => line.split(","))

回复你RDD[Array[String]]

如果您需要第一列作为RDD,则使用map 函数仅返回数组中的第一个索引:

  val firstCol = csvFile.map(_.(0))

【讨论】:

谢谢,但我的第一行包含列标题,我想将其排除在外,并将其用于架构定义。 查看此链接how-do-i-convert-csv-file-to-rdd【参考方案2】:

您应该使用spark-csv 库,它能够根据标题解析您的文件并允许您指定分隔符。此外,它在推断模式方面做得很好。我会让你阅读文档以发现大量的options 供你使用。

这可能看起来像这样:

sqlContext.read.format("com.databricks.spark.csv")
.option("header","true")
.option("delimiter","your delimitor")
.load(pathToFile)

请注意,这会返回一个 DataFrame,您可能必须使用 .rdd 函数将其转换为 rdd。

当然,您必须将包加载到驱动程序中才能工作。

【讨论】:

【参考方案3】:
// create spark session
val spark = org.apache.spark.sql.SparkSession.builder
        .master("local")
        .appName("Spark CSV Reader")
        .getOrCreate;

// read csv
val df = spark.read
         .format("csv")
         .option("header", "true") //reading the headers
         .option("mode", "DROPMALFORMED")
         .option("delimiter", ",")
         .load("/your/csv/dir/simplecsv.csv")

// convert dataframe to rdd[row]
val rddRow = df.rdd
// print 2 rows
rddRow.take(2)

// convert df to rdd[string] for specific column
val oneColumn = df.select("colName").as[(String)].rdd
oneColumn.take(2)

// convert df to rdd[string] for multiple columns
val multiColumn = df.select("col1Name","col2Name").as[(String, String)].rdd
multiColumn.take(2)

【讨论】:

以上是关于如何通过 Delimiter 拆分 Spark RDD 的行的主要内容,如果未能解决你的问题,请参考以下文章

在 spark 中设置 textinputformat.record.delimiter

Spark 如何跟踪 randomSplit 中的拆分?

通过保留顺序,根据 id 列将 Spark DataFrame 拆分为两个 DataFrame(70% 和 30%)

拆分数据数据类型后的Spark RDD如何在不更改数据类型的情况下拆分

spark.read.options(header=True, delimiter="|").csv("mycsv") PySpark 中的 3 行花费了太多时

如何在 Spark DataFrame/DataSet 中将行拆分为不同的列?