错误:值 toDF 不是 org.apache.spark.rdd.RDD[org.apache.kafka.clients.consumer.ConsumerRecord[String,String

Posted

技术标签:

【中文标题】错误:值 toDF 不是 org.apache.spark.rdd.RDD[org.apache.kafka.clients.consumer.ConsumerRecord[String,String]] 的成员【英文标题】:error: value toDF is not a member of org.apache.spark.rdd.RDD[org.apache.kafka.clients.consumer.ConsumerRecord[String,String]] 【发布时间】:2018-10-24 08:49:18 【问题描述】:

我正在尝试使用 Scala 中的 sparkStreaming 捕获 Kafka 事件(我以序列化形式获取)。

这是我的代码-sn-p:

val spark = SparkSession.builder().master("local[*]").appName("Spark-Kafka-Integration").getOrCreate()
spark.conf.set("spark.driver.allowMultipleContexts", "true")

val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(5))

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

val topics=Set("<topic-name>")
val brokers="<some-list>"
val groupId="spark-streaming-test"

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> brokers,
  "auto.offset.reset" -> "earliest",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
  "group.id" -> groupId,
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val messages: InputDStream[ConsumerRecord[String, String]] =
  KafkaUtils.createDirectStream[String, String](
    ssc,
    LocationStrategies.PreferConsistent,
    ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
  )

messages.foreachRDD  rdd =>
  println(rdd.toDF())


ssc.start()
ssc.awaitTermination()

我收到如下错误消息: 错误:(59, 19) value toDF is not a member of org.apache.spark.rdd.RDD[org.apache.kafka.clients.consumer.ConsumerRecord[String,String]] println(rdd.toDF())

【问题讨论】:

【参考方案1】:

toDF 来自DatasetHolder

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.SQLImplicits

我没有复制它,但我的猜测是ConsumerRecord[String, String] 没有编码器,因此您可以提供一个编码器或首先将其映射到可以派生Encoder 的东西(案例类或原语)

由于 spark 的分布式特性,foreachRDD 中的 println 也可能不会按照您想要的方式运行

【讨论】:

谢谢。由于我是新手,因此无法理解太多。我替换了代码并尝试直接打印 RDD,如下所示:messages.foreachRDD rdd => println("Messages: " + rdd) 它成功运行,输出为: Messages: KafkaRDD[1] at createDirectStream at kafkaStream.scala:51我想看到的是这些 RDD 背后的数据。 很适合可能无限流,但有点棘手。您可以将数据转换为人类可读的格式并保存到例如HDFS 并在那里查看。 println 在 spark 中很少有用,除非您将数据具体化到驱动程序节点。如何将信息提取为可读格式取决于您的 kafkaconfic 的外观,但很可能您希望使用您感兴趣的字段创建一些案例类,并将 ConsumerRecord 转换为该案例类 Generall 我建议您查看如何spark 和 RDD 在后台工作以及它如何分配计算 谢谢,我把代码改成了:messages.foreachRDD rdd => val dataFrame = rdd.map(row => row.value()).toDF() dataFrame.show(10) 结果为:+--------------------+ |价值| +--------------------+ |��srHcom.egenc...| |��srPcom.egenc...| |��srPcom.egenc...| +--------------------+ 只显示前 10 行它现在正在工作。只需要反序列化这些数据以使其可读。我在代码中使用了 value.deserializer,但它不起作用。你对如何做到这一点有什么建议吗? val df = messages.map( _.value ) 那么一切都是一样的。

以上是关于错误:值 toDF 不是 org.apache.spark.rdd.RDD[org.apache.kafka.clients.consumer.ConsumerRecord[String,String的主要内容,如果未能解决你的问题,请参考以下文章

值 toDF 不是成员 org.apache.spark.rdd.RDD

值 toDF 不是 org.apache.spark.rdd.RDD 的成员

在 Spark 中创建数据帧时出错

toDF 不是 Seq 的成员,toDS 不是 Seq 的成员

RDD toDF() :错误行为

toDF() 不处理 RDD