RDD[Array[String]] 到数据框
Posted
技术标签:
【中文标题】RDD[Array[String]] 到数据框【英文标题】:RDD[Array[String]] to Dataframe 【发布时间】:2016-12-30 19:50:07 【问题描述】:我是 Spark 和 Hive 的新手,我的目标是将分隔的(比如说 csv)加载到 Hive 表中。经过一番阅读,我发现将数据加载到 Hive 的路径是csv->dataframe->Hive
。(如果我错了,请纠正我)。
CSV:
1,Alex,70000,Columbus
2,Ryan,80000,New York
3,Johny,90000,Banglore
4,Cook, 65000,Glasgow
5,Starc, 70000,Aus
我使用以下命令读取 csv 文件:
val csv =sc.textFile("employee_data.txt").map(line => line.split(",").map(elem => elem.trim))
csv: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[29] at map at <console>:39
现在我正在尝试将此 RDD 转换为 Dataframe 并使用以下代码:
scala> val df = csv.map case Array(s0, s1, s2, s3) => employee(s0, s1, s2, s3) .toDF()
df: org.apache.spark.sql.DataFrame = [eid: string, name: string, salary: string, destination: string]
employee 是一个案例类,我将其用作架构定义。
case class employee(eid: String, name: String, salary: String, destination: String)
但是,当我执行df.show
时,出现以下错误:
org.apache.spark.SparkException:作业因阶段失败而中止: 阶段 10.0 中的任务 0 失败 4 次,最近一次失败:丢失任务 0.3 在阶段 10.0 (TID 22, user.hostname): scala.MatchError: [Ljava.lang.String;@88ba3cb (of class [Ljava.lang.String;)
我期待一个数据框作为输出。我知道为什么我可能会收到此错误,因为 RDD 中的值以 Ljava.lang.String;@88ba3cb
格式存储,我需要使用 mkString
来获取实际值,但我无法找到如何去做。感谢您的宝贵时间。
【问题讨论】:
也许您可以与我们分享您的employee
是如何定义的?
opps 对不起,员工是案例类case class employee(eid: String, name: String, salary: String, destination: String)
您将所有内容解析为String
,因此您的eid: Int
将不起作用
什么版本的火花?
Spark 版本:1.5.0-cdh5.5.1
我也尝试将其更改为 String,但仍然出现相同的错误。
【参考方案1】:
如果你修复了你的案例类,那么它应该可以工作:
scala> case class employee(eid: String, name: String, salary: String, destination: String)
defined class employee
scala> val txtRDD = sc.textFile("data.txt").map(line => line.split(",").map(_.trim))
txtRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[30] at map at <console>:24
scala> txtRDD.mapcase Array(s0, s1, s2, s3) => employee(s0, s1, s2, s3).toDF.show
+---+-----+------+-----------+
|eid| name|salary|destination|
+---+-----+------+-----------+
| 1| Alex| 70000| Columbus|
| 2| Ryan| 80000| New York|
| 3|Johny| 90000| Banglore|
| 4| Cook| 65000| Glasgow|
| 5|Starc| 70000| Aus|
+---+-----+------+-----------+
否则您可以将String
转换为Int
:
scala> case class employee(eid: Int, name: String, salary: String, destination: String)
defined class employee
scala> val df = txtRDD.mapcase Array(s0, s1, s2, s3) => employee(s0.toInt, s1, s2, s3).toDF
df: org.apache.spark.sql.DataFrame = [eid: int, name: string ... 2 more fields]
scala> df.show
+---+-----+------+-----------+
|eid| name|salary|destination|
+---+-----+------+-----------+
| 1| Alex| 70000| Columbus|
| 2| Ryan| 80000| New York|
| 3|Johny| 90000| Banglore|
| 4| Cook| 65000| Glasgow|
| 5|Starc| 70000| Aus|
+---+-----+------+-----------+
但是最好的解决方案是使用spark-csv
(这会将薪水也视为Int
)。
还请注意,当您运行 df.show
时会引发错误,因为在此之前所有内容都在延迟评估。 df.show
是一个动作,它将导致所有排队的转换执行(请参阅this article 了解更多信息)。
【讨论】:
感谢@evan058 的回复。我尝试了您的解决方案,但仍然遇到相同的错误。scala> case class employee(eid: String, name: String, salary: String, destination: String) defined class employee
scala> val csv =sc.textFile("employee_data.txt").map(line => line.split(",").map(elem => elem.trim)) csv: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[19] at map at <console>:31
scala> val df=csv.mapcase Array(s0, s1, s2, s3) => employee(s0, s1, s2, s3).toDF() df:..spark.sql.DataFrame=[eid: string, name: string, salary: string, destination: string
scala> df.show
16/12/30 15:37:25 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 5.0 (TID 11, hostname): scala.MatchError: [Ljava.lang.String;@3297e00f (of class [Ljava.lang.String;) at $line46.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:35) at $line46.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:35) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
@Novice 尝试运行csv.foreachprintln
。那看起来像什么?
我刚刚运行它,但它没有打印任何东西。 scala> csv.foreachprintln
空行scala>
什么都没有?你确定你的文本文件加载正确吗?【参考方案2】:
在数组元素上使用映射,而不是在数组上:
val csv = sc.textFile("employee_data.txt")
.map(line => line
.split(",")
.map(e => e.map(_.trim))
)
val df = csv.map case Array(s0, s1, s2, s3) => employee(s0, s1, s2, s3) .toDF()
但是,为什么要读取 CSV,然后将 RDD 转换为 DF? Spark 1.5 已经可以通过spark-csv
包读取 CSV:
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.option("delimiter", ";")
.load("employee_data.txt")
【讨论】:
感谢 Gaweda 的回复。我可以使用,但我有一个要求,我可以使用逗号以外的其他分隔符获取文件。所以我不能使用spark-csv
@Novice 有参数delimiter
- 您可以将其设置为;
或其他一些
哦,我不知道。谢谢让我搜索更多。
在.load(...)
中你提到了employee_data.txt
。如果你有一个 RDD 而不是 txt 文件怎么办?【参考方案3】:
正如您在评论中所说,您的案例类员工(应命名为 Employee
)接收 Int
作为其构造函数的第一个参数,但您传递的是 String
。因此,您应该在实例化或修改将eid
定义为String
的案例之前将其转换为Int
。
【讨论】:
以上是关于RDD[Array[String]] 到数据框的主要内容,如果未能解决你的问题,请参考以下文章
将包含 BigInt 的 RDD 转换为 Spark Dataframe