RDD[Array[String]] 到数据框

Posted

技术标签:

【中文标题】RDD[Array[String]] 到数据框【英文标题】:RDD[Array[String]] to Dataframe 【发布时间】:2016-12-30 19:50:07 【问题描述】:

我是 Spark 和 Hive 的新手,我的目标是将分隔的(比如说 csv)加载到 Hive 表中。经过一番阅读,我发现将数据加载到 Hive 的路径是csv->dataframe->Hive。(如果我错了,请纠正我)。

CSV:
1,Alex,70000,Columbus
2,Ryan,80000,New York
3,Johny,90000,Banglore
4,Cook, 65000,Glasgow
5,Starc, 70000,Aus

我使用以下命令读取 csv 文件:

val csv =sc.textFile("employee_data.txt").map(line => line.split(",").map(elem => elem.trim))
csv: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[29] at map at <console>:39

现在我正在尝试将此 RDD 转换为 Dataframe 并使用以下代码:

scala> val df = csv.map  case Array(s0, s1, s2, s3) => employee(s0, s1, s2, s3) .toDF()
df: org.apache.spark.sql.DataFrame = [eid: string, name: string, salary: string, destination: string]

employee 是一个案例类,我将其用作架构定义。

case class employee(eid: String, name: String, salary: String, destination: String)

但是,当我执行df.show 时,出现以下错误:

org.apache.spark.SparkException:作业因阶段失败而中止: 阶段 10.0 中的任务 0 失败 4 次,最近一次失败:丢失任务 0.3 在阶段 10.0 (TID 22, user.hostname): scala.MatchError: [Ljava.lang.String;@88ba3cb (of class [Ljava.lang.String;)

我期待一个数据框作为输出。我知道为什么我可能会收到此错误,因为 RDD 中的值以 Ljava.lang.String;@88ba3cb 格式存储,我需要使用 mkString 来获取实际值,但我无法找到如何去做。感谢您的宝贵时间。

【问题讨论】:

也许您可以与我们分享您的employee 是如何定义的? opps 对不起,员工是案例类case class employee(eid: String, name: String, salary: String, destination: String) 您将所有内容解析为String,因此您的eid: Int 将不起作用 什么版本的火花? Spark 版本:1.5.0-cdh5.5.1 我也尝试将其更改为 String,但仍然出现相同的错误。 【参考方案1】:

如果你修复了你的案例类,那么它应该可以工作:

scala> case class employee(eid: String, name: String, salary: String, destination: String)
defined class employee

scala> val txtRDD = sc.textFile("data.txt").map(line => line.split(",").map(_.trim))
txtRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[30] at map at <console>:24

scala> txtRDD.mapcase Array(s0, s1, s2, s3) => employee(s0, s1, s2, s3).toDF.show
+---+-----+------+-----------+
|eid| name|salary|destination|
+---+-----+------+-----------+
|  1| Alex| 70000|   Columbus|
|  2| Ryan| 80000|   New York|
|  3|Johny| 90000|   Banglore|
|  4| Cook| 65000|    Glasgow|
|  5|Starc| 70000|        Aus|
+---+-----+------+-----------+

否则您可以将String 转换为Int

scala> case class employee(eid: Int, name: String, salary: String, destination: String)
defined class employee

scala> val df = txtRDD.mapcase Array(s0, s1, s2, s3) => employee(s0.toInt, s1, s2, s3).toDF
df: org.apache.spark.sql.DataFrame = [eid: int, name: string ... 2 more fields]

scala> df.show
+---+-----+------+-----------+
|eid| name|salary|destination|
+---+-----+------+-----------+
|  1| Alex| 70000|   Columbus|
|  2| Ryan| 80000|   New York|
|  3|Johny| 90000|   Banglore|
|  4| Cook| 65000|    Glasgow|
|  5|Starc| 70000|        Aus|
+---+-----+------+-----------+

但是最好的解决方案是使用spark-csv(这会将薪水也视为Int)。

还请注意,当您运行 df.show 时会引发错误,因为在此之前所有内容都在延迟评估。 df.show 是一个动作,它将导致所有排队的转换执行(请参阅this article 了解更多信息)。

【讨论】:

感谢@evan058 的回复。我尝试了您的解决方案,但仍然遇到相同的错误。 scala&gt; case class employee(eid: String, name: String, salary: String, destination: String) defined class employeescala&gt; val csv =sc.textFile("employee_data.txt").map(line =&gt; line.split(",").map(elem =&gt; elem.trim)) csv: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[19] at map at &lt;console&gt;:31scala&gt; val df=csv.mapcase Array(s0, s1, s2, s3) =&gt; employee(s0, s1, s2, s3).toDF() df:..spark.sql.DataFrame=[eid: string, name: string, salary: string, destination: string scala&gt; df.show 16/12/30 15:37:25 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 5.0 (TID 11, hostname): scala.MatchError: [Ljava.lang.String;@3297e00f (of class [Ljava.lang.String;) at $line46.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(&lt;console&gt;:35) at $line46.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(&lt;console&gt;:35) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) @Novice 尝试运行csv.foreachprintln。那看起来像什么? 我刚刚运行它,但它没有打印任何东西。 scala&gt; csv.foreachprintln空行scala&gt; 什么都没有?你确定你的文本文件加载正确吗?【参考方案2】:

在数组元素上使用映射,而不是在数组上:

val csv = sc.textFile("employee_data.txt")
    .map(line => line
                     .split(",")
                     .map(e => e.map(_.trim))
     )
val df = csv.map  case Array(s0, s1, s2, s3) => employee(s0, s1, s2, s3) .toDF()

但是,为什么要读取 CSV,然后将 RDD 转换为 DF? Spark 1.5 已经可以通过spark-csv 包读取 CSV:

val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") 
    .option("inferSchema", "true") 
    .option("delimiter", ";") 
    .load("employee_data.txt")

【讨论】:

感谢 Gaweda 的回复。我可以使用,但我有一个要求,我可以使用逗号以外的其他分隔符获取文件。所以我不能使用spark-csv @Novice 有参数delimiter - 您可以将其设置为; 或其他一些 哦,我不知道。谢谢让我搜索更多。 .load(...) 中你提到了employee_data.txt。如果你有一个 RDD 而不是 txt 文件怎么办?【参考方案3】:

正如您在评论中所说,您的案例类员工(应命名为 Employee)接收 Int 作为其构造函数的第一个参数,但您传递的是 String。因此,您应该在实例化或修改将eid 定义为String 的案例之前将其转换为Int

【讨论】:

以上是关于RDD[Array[String]] 到数据框的主要内容,如果未能解决你的问题,请参考以下文章

将包含 BigInt 的 RDD 转换为 Spark Dataframe

获取 RDD[Array[String]] 的一列并将其转换为数据集/数据帧

rdd对象中的数据框参数太多

Scala和Spark,rdd从字典创建数据框

Spark:以 ORC 格式保存数据框

如何通过 Delimiter 拆分 Spark RDD 的行