在 Spark 中创建数据帧时出错

Posted

技术标签:

【中文标题】在 Spark 中创建数据帧时出错【英文标题】:Error creating dataframes in spark 【发布时间】:2017-09-27 07:01:05 【问题描述】:

我正在尝试在 kafka-spark 流中创建一个数据框,我已经成功地将值映射到案例类,但是每当我调用 toDF 方法时它都会给我错误。 **

值 toDF 不是 Array[WeatherEvent] [error] 可能的成员 原因:可能在“value toDF”之前缺少分号? [错误] ).toDF("经度", "纬度", "国家", "日出", "日落", “温度”、“温度最小值”、“温度最大值”、[错误] ^ [错误] 发现一个错误 [错误] (compile:compileIncremental) 编译失败 [错误] 总时间:2 秒,2017 年 9 月 27 日完成 上午 11:49:23

这是我的代码

 val inputStream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, String ](Array("test"), kafkaParams))
 //  val json = parse(inputStream)


  val processedStream = inputStream
  .flatMap(record => record.value.split(" ").map(payload => 
        //val ts = Timestamp.valueOf(payload(3))
        WeatherEvent(payload(0).toDouble, payload(1).toDouble, payload(2).toString , payload(3).toInt,
                    payload(4).toInt, payload(5).toDouble, payload(6).toDouble, payload(7).toDouble, 
                    payload(8).toDouble, payload(9).toInt, payload(10).toInt, payload(11).toInt, 
                    payload(12).toDouble, payload(13).toDouble)
      ).toDF("longitude", "latitude", "country", "sunrise", "sunset", "temperature", "temperatureMin", "temperatureMax", 
              "pressure", "humidity", "cloudiness", "id", "wind_speed", "wind_deg")
 )

谢谢**

【问题讨论】:

我猜 WeatherEvent 是一个案例类。如果那是正确的,那么您不需要在 toDF 中提供标题名称,只需执行 .toDF 就足够了。 @RameshMaharjan 谢谢,但它仍然一样。 value toDF 不是 Array[WeatherEvent] [error] 的成员可能的原因:可能在 `value toDF' 之前缺少分号? [error] ).toDF() [error] ^ [error] 发现一个错误 @matesio 你可以尝试导入这个import ssc.implicits._ 是的。我同意 Akash 的观点,即您需要导入implicits._ 以应用 .toDF 是的,我试过 ** val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ ** 【参考方案1】:

toDF() 是 sqlContext 中定义的隐式方法。 toDF() 用于将 RDD 转换为 Dataframe。在这里,您从 Kafka 获取流,我的意思是 Dstreams。要将其转换为 DF,您需要使用 transform API 或 foreachRDD API 处理 Dstreams 中的每个 RDD。下面我使用foreachRDD转换将RDD转换为Dataframe

val data=KafkaUtils.createStream(ssc, zkQuorum, "GroupName", topics).map(x=>x._2)
val lines12=data.foreachRDD(x=>
  val df=x.flatMap(x => x.split(",")).map(x=>(x(0),x(1))).toDF()

【讨论】:

以上是关于在 Spark 中创建数据帧时出错的主要内容,如果未能解决你的问题,请参考以下文章

使用Scala在Spark中创建DataFrame时出错

使用 PySpark 写入数据帧时出错

在 Spring Boot 中创建数据源时出错

在列表中创建 Spark 数据框后如何使用它们?

在 Cloudera Impala(虚拟机)中创建数据库时出错

在 bigquery 中创建数据集时,谷歌 App 引擎出错