如何使用用户定义的类和 toDF 将 RDD 转换为数据帧

Posted

技术标签:

【中文标题】如何使用用户定义的类和 toDF 将 RDD 转换为数据帧【英文标题】:How to convert an RDD into a dataframe using a user defined class and toDF 【发布时间】:2019-05-27 15:49:15 【问题描述】:

当我尝试通过sbt package创建以下包时:

import org.apache.spark.sql.SparkSession

class Log(val cip: String, val scstatus: Int) 
    var src: String = cip
    var status: Int = scstatus


object IISHttpLogs 
  def main(args: Array[String]) 
    val logFiles = "D:/temp/tests/wwwlogs" 
    val spark = SparkSession.builder.appName("LogParser").getOrCreate()
    val sc = spark.sparkContext;
    sc.setLogLevel("ERROR")

    val logs = sc.textFile(logFiles)        

    import spark.implicits._
    val rowDF = logs.filter(l => !l.startsWith("#"))
        .map(l => l.split(" "))
        .map(c => new Log(c(8), c(11).trim.toInt))
        .toDF();
    println(s"line count: $rowDF.count()")        
    rowDF.createOrReplaceTempView("rows")
    val maxHit = spark.sql("SELECT top 1 src, count(*) FROM rows group by src order by count(*) desc")
    maxHit.show()

    spark.stop()
  

我收到以下错误:

值 toDF 不是 org.apache.spark.rdd.RDD[Log] 的成员

我尝试了几件事,例如:

到DFlog 创建一个 sql 上下文并从此 sqlContext 导入 imlicits._

我只是无法编译我的代码。

欢迎提供任何线索来覆盖此错误。


我很好地阅读了Generate a Spark StructType / Schema from a case class 并写道:

val schema =
    StructType(
        StructField("src", StringType, false) ::
        StructField("status", IntegerType, true) :: Nil)

val rowRDD = logs.filter(l => !l.startsWith("#"))
    .map(l => l.split(" "))
    .map(c => Row(c(8), c(11).trim.toInt));

val rowDF = spark.sqlContext.createDataFrame(rowRDD, schema); 

但是这样做我不使用Log 类。我想知道是否有办法通过使用定义的Log 类来获取DataFrame,或者官方/最佳方法是否是使用Row 类?

例如我不会写:

val rowRDD = logs.filter(l => !l.startsWith("#"))
    .map(l => l.split(" "))
    .map(c => new Log(c(8), c(11).trim.toInt));
val rowDF = spark.sqlContext.createDataFrame(
    rowRDD,
    ScalaReflection.schemaFor[Log].dataType.asInstanceOf[StructType]);

我就是不知道为什么?

【问题讨论】:

How to convert rdd object to dataframe in spark的可能重复 【参考方案1】:

您必须使用案例类。至少这对我有用:

case class Log(cip: String,  scstatus: Int)
//...
.map(c =>  Log(c(8), c(11).trim.toInt) // ommit 'new'
.toDF()

我不太确定这是否是一般规则。但是在 Dataset API 的公告中,明确提到了案例类的使用:

Spark 1.6 支持为各种类型自动生成编码器,包括原始类型(例如 String、Integer、Long)、Scala 案例类和 Java Bean。 (https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html)

如果不能使用案例类,this answer 似乎比较合适。

【讨论】:

我正在使用 scala 2.11.12,即使使用案例类我也会遇到同样的错误。 您是否尝试将案例类放入“IISHttpLogs”对象中? 是的,我也试试这个。我认为 sbt 文件中缺少导入或 libraryDependencies 其实我好像没试过用对象外的case类...

以上是关于如何使用用户定义的类和 toDF 将 RDD 转换为数据帧的主要内容,如果未能解决你的问题,请参考以下文章

将 rdd 转换为数据框:AttributeError: 'RDD' object has no attribute 'toDF' using PySpark

如何将 RDD[Row] 转换为 RDD[Vector]

值 toDF 不是 org.apache.spark.rdd.RDD[(Long, org.apache.spark.ml.linalg.Vector)] 的成员

toDF() 不处理 RDD

如何在 ipython 中将 Spark RDD 转换为 pandas 数据帧?

如何在 ipython 中将 Spark RDD 转换为 pandas 数据帧?