如何通过 Delimiter 拆分 Spark RDD 的行
Posted
技术标签:
【中文标题】如何通过 Delimiter 拆分 Spark RDD 的行【英文标题】:How to split rows of a Spark RDD by Deliminator 【发布时间】:2015-04-20 16:36:43 【问题描述】:我正在尝试将 Spark 中的数据拆分为 Array[String]
的 RDD 形式。目前我已将文件加载到String
的RDD 中。
> val csvFile = textFile("/input/spam.csv")
我想在 ,
分隔符上进行拆分。
【问题讨论】:
spark.apache.org/docs/latest/quick-start.html @ipoteka:我已经浏览了这个链接,但找不到任何 CSV 文件 ***.com/questions/24299427/… @ipoteka:谢谢,但你能不能把它写成代码,因为我是相对论新手,我无法理解它。 如果您的截止日期很紧,我建议您研究并研究该主题,而不是等待答案。提供的链接包含代码。经历一遍,尝试一下,如有必要,提出新问题。 'Plz zend me de codez' 不受欢迎'。 【参考方案1】:这个:
val csvFile = textFile("/input/spam.csv").map(line => line.split(","))
回复你RDD[Array[String]]
。
如果您需要第一列作为RDD
,则使用map
函数仅返回数组中的第一个索引:
val firstCol = csvFile.map(_.(0))
【讨论】:
谢谢,但我的第一行包含列标题,我想将其排除在外,并将其用于架构定义。 查看此链接how-do-i-convert-csv-file-to-rdd【参考方案2】:您应该使用spark-csv 库,它能够根据标题解析您的文件并允许您指定分隔符。此外,它在推断模式方面做得很好。我会让你阅读文档以发现大量的options 供你使用。
这可能看起来像这样:
sqlContext.read.format("com.databricks.spark.csv")
.option("header","true")
.option("delimiter","your delimitor")
.load(pathToFile)
请注意,这会返回一个 DataFrame,您可能必须使用 .rdd
函数将其转换为 rdd。
当然,您必须将包加载到驱动程序中才能工作。
【讨论】:
【参考方案3】:// create spark session
val spark = org.apache.spark.sql.SparkSession.builder
.master("local")
.appName("Spark CSV Reader")
.getOrCreate;
// read csv
val df = spark.read
.format("csv")
.option("header", "true") //reading the headers
.option("mode", "DROPMALFORMED")
.option("delimiter", ",")
.load("/your/csv/dir/simplecsv.csv")
// convert dataframe to rdd[row]
val rddRow = df.rdd
// print 2 rows
rddRow.take(2)
// convert df to rdd[string] for specific column
val oneColumn = df.select("colName").as[(String)].rdd
oneColumn.take(2)
// convert df to rdd[string] for multiple columns
val multiColumn = df.select("col1Name","col2Name").as[(String, String)].rdd
multiColumn.take(2)
【讨论】:
以上是关于如何通过 Delimiter 拆分 Spark RDD 的行的主要内容,如果未能解决你的问题,请参考以下文章
在 spark 中设置 textinputformat.record.delimiter
通过保留顺序,根据 id 列将 Spark DataFrame 拆分为两个 DataFrame(70% 和 30%)
拆分数据数据类型后的Spark RDD如何在不更改数据类型的情况下拆分
spark.read.options(header=True, delimiter="|").csv("mycsv") PySpark 中的 3 行花费了太多时