Spark 结构化流:Scala 中的模式推理
Posted
技术标签:
【中文标题】Spark 结构化流:Scala 中的模式推理【英文标题】:Spark structured streaming: Schema Inference in Scala 【发布时间】:2020-08-04 16:47:56 【问题描述】:我正在尝试从 kafka 主题推断动态 json 架构。在博客中找到了这段代码,它使用 PYSPARK 推断架构。
def read_kafka_topic(topic):
df_json = (spark.read
.format("kafka")
.option("kafka.bootstrap.servers", kafka_broker)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.option("failOnDataLoss", "false")
.load()
.withColumn("value", expr("string(value)"))
.filter(col("value").isNotNull())
.select("key", expr("struct(offset, value) r"))
.groupBy("key").agg(expr("max(r) r"))
.select("r.value"))
df_read = spark.read.json(
df_json.rdd.map(lambda x: x.value), multiLine=True)**
尝试使用 SCALA:
**val df_read = spark.read.json(df_json.rdd.map(x=>x))**
但是我遇到了错误。
不能应用于 (org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]) val df_read = spark.read.json(df_json.rdd.map(x=>x))
有什么办法吗?请帮忙。
【问题讨论】:
【参考方案1】:结构化流不支持 RDD。
结构化流不允许架构推断。
需要定义架构。
例如对于文件源
val dataSchema = "Recorded_At timestamp, Device string, Index long, Model string, User string, _corrupt_record String, gt string, x double, y double, z double"
val dataPath = "dbfs:/mnt/training/definitive-guide/data/activity-data-stream.json"
val initialDF = spark
.readStream // Returns DataStreamReader
.option("maxFilesPerTrigger", 1) // Force processing of only 1 file per trigger
.schema(dataSchema) // Required for all streaming DataFrames
.json(dataPath) // The stream's source directory and file type
例如Databricks 教你的 Kafka 情况
spark.conf.set("spark.sql.shuffle.partitions", sc.defaultParallelism)
val kafkaServer = "server1.databricks.training:9092" // US (Oregon)
// kafkaServer = "server2.databricks.training:9092" // Singapore
val editsDF = spark.readStream // Get the DataStreamReader
.format("kafka") // Specify the source format as "kafka"
.option("kafka.bootstrap.servers", kafkaServer) // Configure the Kafka server name and port
.option("subscribe", "en") // Subscribe to the "en" Kafka topic
.option("startingOffsets", "earliest") // Rewind stream to beginning when we restart notebook
.option("maxOffsetsPerTrigger", 1000) // Throttle Kafka's processing of the streams
.load() // Load the DataFrame
.select($"value".cast("STRING")) // Cast the "value" column to STRING
import org.apache.spark.sql.types.StructType, StructField, StringType, IntegerType, DoubleType, BooleanType, TimestampType
lazy val schema = StructType(List(
StructField("channel", StringType, true),
StructField("comment", StringType, true),
StructField("delta", IntegerType, true),
StructField("flag", StringType, true),
StructField("geocoding", StructType(List( // (OBJECT): Added by the server, field contains IP address geocoding information for anonymous edit.
StructField("city", StringType, true),
StructField("country", StringType, true),
StructField("countryCode2", StringType, true),
StructField("countryCode3", StringType, true),
StructField("stateProvince", StringType, true),
StructField("latitude", DoubleType, true),
StructField("longitude", DoubleType, true)
)), true),
StructField("isAnonymous", BooleanType, true),
StructField("isNewPage", BooleanType, true),
StructField("isRobot", BooleanType, true),
StructField("isUnpatrolled", BooleanType, true),
StructField("namespace", StringType, true), // (STRING): Page's namespace. See https://en.wikipedia.org/wiki/Wikipedia:Namespace
StructField("page", StringType, true), // (STRING): Printable name of the page that was edited
StructField("pageURL", StringType, true), // (STRING): URL of the page that was edited
StructField("timestamp", TimestampType, true), // (STRING): Time the edit occurred, in ISO-8601 format
StructField("url", StringType, true),
StructField("user", StringType, true), // (STRING): User who made the edit or the IP address associated with the anonymous editor
StructField("userURL", StringType, true),
StructField("wikipediaURL", StringType, true),
StructField("wikipedia", StringType, true) // (STRING): Short name of the Wikipedia that was edited (e.g., "en" for the English)
))
import org.apache.spark.sql.functions.from_json
val jsonEdits = editsDF.select(
from_json($"value", schema).as("json"))
...
...
【讨论】:
感谢您的快速更新。但是这个博客,说模式推断在 PYSPARK 中是可能的。请看看。 keestalkstech.com/2019/11/kafka-spark-and-schema-inference 好吧,由于covid,我也参加了Databricks课程,这就是他们的看法。你也在谈论斯卡拉。我去看看。 很有趣,但不是 Databricks 人教我的方式。 我认为这是关于流而不是结构化流。 从这个意义上说是一篇定义不明确的文章。以上是关于Spark 结构化流:Scala 中的模式推理的主要内容,如果未能解决你的问题,请参考以下文章