Spark 无法识别提供给 withWatermark() 的事件时间列
Posted
技术标签:
【中文标题】Spark 无法识别提供给 withWatermark() 的事件时间列【英文标题】:Spark can't identify the event time column being supplied to withWatermark() 【发布时间】:2018-06-07 17:32:56 【问题描述】:我试图找到一种方法,使用 Spark 根据事件时间戳对不同事件中的功能进行重新会话,我发现了一个代码示例,该示例使用 mapGroupsWithState
在其存储库中使用处理时间戳来重新会话事件。
https://github.com/apache/spark/blob/v2.3.0/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala
为了快速测试这个会话化东西是否适用于事件时间戳,我添加了 withWatermark("timestamp", "10 seconds") (将处理时间视为事件时间戳)并将ProcessingTimeTimeout
更改为EventTimeTimeout
。
val lines = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.option("includeTimestamp", value = true)
.load()
// Split the lines into words, treat words as sessionId of events
val events = lines
.withWatermark("timestamp", "10 seconds") // added
.as[(String, Timestamp)]
.flatMap case (line, timestamp) =>
line.split(" ").map(word => Event(sessionId = word, timestamp))
val sessionUpdates = events
.groupByKey(event => event.sessionId)
.mapGroupsWithState[SessionInfo, SessionUpdate].(GroupStateTimeout.EventTimeTimeout)
...
// Start running the query that prints the session updates to the console
val query = sessionUpdates
.writeStream
.outputMode("update")
.format("console")
.start()
query.awaitTermination()
但是,当我运行它时,Spark 抛出了org.apache.spark.sql.AnalysisException
并说
必须在查询中使用“[Dataset/DataFrame].withWatermark()”指定水印,以便在 [map|flatMap]GroupsWithState 中使用事件时间超时。不支持不带水印的事件时间超时`
这不是真的且令人困惑,因为那个“时间戳”列显然是在该异常消息之后的物理计划中:
...
+- EventTimeWatermark timestamp#3: timestamp, interval 10 seconds
+- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@394a6d2b,socket,List(),..., [value#2, timestamp#3]
我是否错过了什么或做错了什么?
提前致谢!
【问题讨论】:
【参考方案1】:在平面图操作后添加水印。 这应该有效:
val events = lines
.as[(String, Timestamp)]
.flatMap case (line, timestamp) =>
line.split(" ").map(word => Event(sessionId = word, timestamp))
.withWatermark("timestamp", "10 seconds")
val sessionUpdates = events
.groupByKey(event => event.sessionId)
.mapGroupsWithState[SessionInfo, SessionUpdate].(GroupStateTimeout.EventTimeTimeout)
...
【讨论】:
以上是关于Spark 无法识别提供给 withWatermark() 的事件时间列的主要内容,如果未能解决你的问题,请参考以下文章
使用 BouncyCastle 生成的证书作为服务器进行身份验证时出现“无法识别提供给包的凭据”错误