如何使用 Scala Case Class 在 Spark Structured Streaming 中映射 Kafka 源

Posted

技术标签:

【中文标题】如何使用 Scala Case Class 在 Spark Structured Streaming 中映射 Kafka 源【英文标题】:How to use Scala Case Class to map Kafka source in Spark Structured Streaming 【发布时间】:2018-07-02 10:07:04 【问题描述】:

我正在尝试在 spark 中使用结构化流,因为它非常适合我的用例。但是,我似乎找不到将来自 Kafka 的传入数据映射到案例类的方法。根据官方文档,这是我可以走多远。

import sparkSession.sqlContext.implicits._                          
val kafkaDF:DataFrame = sparkSession
                                          .readStream
                                          .format("kafka")
                                          .option("kafka.bootstrap.servers", bootstrapServers_CML)
                                          .option("subscribe", topics_ME)
                                          .option("startingOffsets", "latest")
                                          .load()
                                          .selectExpr("cast (value as string) as json") //Kakfa sends data in a specific schema (key, value, topic, offset, timestamp etc)    

val schema_ME = StructType(Seq(
    StructField("Parm1", StringType, true),
    StructField("Parm2", StringType, true),
    StructField("Parm3", TimestampType, true)))  

val mobEventDF:DataFrame = kafkaDF
                         .select(from_json($"json", schema_ME).as("mobEvent")) //Using a StructType to convert to application specific schema. Cant seem to use a case class for schema directly yet. Perhaps with later API??
                         .na.drop()

mobEventDF 有这样的模式

root
 |-- appEvent: struct (nullable = true)
 |    |-- Parm1: string (nullable = true)
 |    |-- Parm2: string (nullable = true)
 |    |-- Parm3: string (nullable = true)

有没有更好的方法来做到这一点?如何将其直接映射到如下所示的 Scala Case 类中?

case class ME(name: String, 
                 factory: String,
                 delay: Timestamp)

【问题讨论】:

【参考方案1】:

选择并重命名所有字段,然后调用as方法

kafkaDF.select($"mobEvent.*").toDF("name", "factory", "delay").as[ME]

【讨论】:

谢谢。这绝对有效。但是有没有办法直接指定案例类呢?在上述方法中,我必须指定两次相同的架构,一个结构,然后一个案例类。 你可以使用这个:***.com/questions/36746055/…

以上是关于如何使用 Scala Case Class 在 Spark Structured Streaming 中映射 Kafka 源的主要内容,如果未能解决你的问题,请参考以下文章

scala伴生对象与case class用法

scala伴生对象与case class用法

Scala class和case class的区别

Scala class和case class的区别

Scala 之 Case Class

scala case class