在 2.0 中将 RDD 转换为 Dataframe
Posted
技术标签:
【中文标题】在 2.0 中将 RDD 转换为 Dataframe【英文标题】:convert RDD to Dataframe in 2.0 【发布时间】:2016-11-16 19:24:44 【问题描述】:我正在尝试将 rdd 转换为 Spark2.0 中的数据帧
val conf=new SparkConf().setAppName("dataframes").setMaster("local")
val sc=new SparkContext(conf)
val sqlCon=new SQLContext(sc)
import sqlCon.implicits._
val rdd=sc.textFile("/home/cloudera/alpha.dat").persist()
val row=rdd.first()
val data=rdd.filter x => !x.contains(row)
data.foreach x => println(x)
case class person(name:String,age:Int,city:String)
val rdd2=data.map x => x.split(",")
val rdd3=rdd2.map x => person(x(0),x(1).toInt,x(2))
val df=rdd3.toDF()
df.printSchema();
df.registerTempTable("alpha")
val df1=sqlCon.sql("select * from alpha")
df1.foreach x => println(x)
但我在 toDF() 处出现错误。 --->“val df=rdd3.toDF()”
Multiple markers at this line:
- Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case
classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
- Implicit conversion found: rdd3 ⇒ rddToDatasetHolder(rdd3): (implicit evidence$4:
org.apache.spark.sql.Encoder[person])org.apache.spark.sql.DatasetHolder[person]
如何使用 toDF() 将上述内容转换为 Dataframe
【问题讨论】:
【参考方案1】:Cloudera 和 Spark 2.0?嗯,不认为我们还支持 :)
无论如何,首先你不需要在你的 RDD 上调用.persist()
,这样你就可以删除那个位。其次,由于Person
是一个案例类,因此您应该将其名称大写。
最后,在 Spark 2.0 中,您不再调用 import sqlContext.implicits._
来隐式构建 DataFrame
架构,现在调用 import spark.implicits._
。您的错误消息暗示了这一点。
【讨论】:
谢谢埃里克。我正在关注spark.apache.org/docs/2.0.0-preview/… 将 RDD 转换为 Dataframes 导入 sqlContext.implicits._,我们可以在 2.0 中使用。看起来问题出在编码器上 我在 pom.xml 中提供的以下依赖项有一个简单的错误,我在 main 方法中定义了 case 类。删除相同的内容后,我可以将 RDD 转换为 DataFrame。
package sparksql
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Encoders
import org.apache.spark.SparkContext
object asw
case class Person(name:String,age:Int,city:String)
def main(args: Array[String]): Unit =
val conf=new SparkConf().setMaster("local").setAppName("Dataframe")
val sc=new SparkContext(conf)
val spark=SparkSession.builder().getOrCreate()
import spark.implicits._
val rdd1=sc.textFile("/home/cloudera/alpha.dat")
val row=rdd1.first()
val data=rdd1.filter x => !x.contains(row)
val rdd2=data.map x => x.split(",")
val df=rdd2.map x => Person(x(0),x(1).toInt,x(2)) .toDF()
df.createOrReplaceTempView("rdd21")
spark.sql("select * from rdd21").show()
【讨论】:
以上是关于在 2.0 中将 RDD 转换为 Dataframe的主要内容,如果未能解决你的问题,请参考以下文章
如何在 ipython 中将 Spark RDD 转换为 pandas 数据帧?
如何在 ipython 中将 Spark RDD 转换为 pandas 数据帧?