Spark 结构化流:Scala 中的模式推理

Posted

技术标签:

【中文标题】Spark 结构化流:Scala 中的模式推理【英文标题】:Spark structured streaming: Schema Inference in Scala 【发布时间】:2020-08-04 16:47:56 【问题描述】:

我正在尝试从 kafka 主题推断动态 json 架构。在博客中找到了这段代码,它使用 PYSPARK 推断架构。

  def read_kafka_topic(topic):
    
    df_json = (spark.read
               .format("kafka")
               .option("kafka.bootstrap.servers", kafka_broker)
               .option("subscribe", topic)
               .option("startingOffsets", "earliest")
               .option("endingOffsets", "latest")
               .option("failOnDataLoss", "false")
               .load()
               .withColumn("value", expr("string(value)"))
               .filter(col("value").isNotNull())
               .select("key", expr("struct(offset, value) r"))
               .groupBy("key").agg(expr("max(r) r")) 
               .select("r.value"))

    df_read = spark.read.json(
    df_json.rdd.map(lambda x: x.value), multiLine=True)**

尝试使用 SCALA:

**val df_read = spark.read.json(df_json.rdd.map(x=>x))**

但是我遇到了错误。

不能应用于 (org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]) val df_read = spark.read.json(df_json.rdd.map(x=>x))

有什么办法吗?请帮忙。

【问题讨论】:

【参考方案1】:

结构化流不支持 RDD。

结构化流不允许架构推断。

需要定义架构。

例如对于文件源

val dataSchema = "Recorded_At timestamp, Device string, Index long, Model string, User string, _corrupt_record String, gt string, x double, y double, z double"
val dataPath = "dbfs:/mnt/training/definitive-guide/data/activity-data-stream.json"

val initialDF = spark
  .readStream                             // Returns DataStreamReader
  .option("maxFilesPerTrigger", 1)        // Force processing of only 1 file per trigger 
  .schema(dataSchema)                     // Required for all streaming DataFrames
  .json(dataPath)                         // The stream's source directory and file type

例如Databricks 教你的 Kafka 情况

spark.conf.set("spark.sql.shuffle.partitions", sc.defaultParallelism)

val kafkaServer = "server1.databricks.training:9092"  // US (Oregon)
// kafkaServer = "server2.databricks.training:9092"   // Singapore

val editsDF = spark.readStream                        // Get the DataStreamReader
  .format("kafka")                                    // Specify the source format as "kafka"
  .option("kafka.bootstrap.servers", kafkaServer)     // Configure the Kafka server name and port
  .option("subscribe", "en")                          // Subscribe to the "en" Kafka topic 
  .option("startingOffsets", "earliest")              // Rewind stream to beginning when we restart notebook
  .option("maxOffsetsPerTrigger", 1000)               // Throttle Kafka's processing of the streams
  .load()                                             // Load the DataFrame
  .select($"value".cast("STRING"))                    // Cast the "value" column to STRING

import org.apache.spark.sql.types.StructType, StructField, StringType, IntegerType, DoubleType, BooleanType, TimestampType

lazy val schema = StructType(List(
  StructField("channel", StringType, true),
  StructField("comment", StringType, true),
  StructField("delta", IntegerType, true),
  StructField("flag", StringType, true),
  StructField("geocoding", StructType(List(            //  (OBJECT): Added by the server, field contains IP address geocoding information for anonymous edit.
    StructField("city", StringType, true),
    StructField("country", StringType, true),
    StructField("countryCode2", StringType, true),
    StructField("countryCode3", StringType, true),
    StructField("stateProvince", StringType, true),
    StructField("latitude", DoubleType, true),
    StructField("longitude", DoubleType, true)
  )), true),
  StructField("isAnonymous", BooleanType, true),
  StructField("isNewPage", BooleanType, true),
  StructField("isRobot", BooleanType, true),
  StructField("isUnpatrolled", BooleanType, true),
  StructField("namespace", StringType, true),           //   (STRING): Page's namespace. See https://en.wikipedia.org/wiki/Wikipedia:Namespace 
  StructField("page", StringType, true),                //   (STRING): Printable name of the page that was edited
  StructField("pageURL", StringType, true),             //   (STRING): URL of the page that was edited
  StructField("timestamp", TimestampType, true),        //   (STRING): Time the edit occurred, in ISO-8601 format
  StructField("url", StringType, true),
  StructField("user", StringType, true),                //   (STRING): User who made the edit or the IP address associated with the anonymous editor
  StructField("userURL", StringType, true),
  StructField("wikipediaURL", StringType, true),
  StructField("wikipedia", StringType, true)            //   (STRING): Short name of the Wikipedia that was edited (e.g., "en" for the English)
))

import org.apache.spark.sql.functions.from_json

val jsonEdits = editsDF.select(
  from_json($"value", schema).as("json")) 
...
...

【讨论】:

感谢您的快速更新。但是这个博客,说模式推断在 PYSPARK 中是可能的。请看看。 keestalkstech.com/2019/11/kafka-spark-and-schema-inference 好吧,由于covid,我也参加了Databricks课程,这就是他们的看法。你也在谈论斯卡拉。我去看看。 很有趣,但不是 Databricks 人教我的方式。 我认为这是关于流而不是结构化流。 从这个意义上说是一篇定义不明确的文章。

以上是关于Spark 结构化流:Scala 中的模式推理的主要内容,如果未能解决你的问题,请参考以下文章

附加模式下的 Spark 结构化流,每个时间窗口输出多行

Spark 结构化流在附加模式下显示结果为时已晚

Spark结构化流 - 使用模式从文件中读取时间戳

Spark 从浅入深(第一集)

当结构中的所有值都为空时,如何在 Scala spark 中使结构为空?

大数据Spark 从浅入深(第一集)