Spark 无法识别提供给 withWatermark() 的事件时间列

Posted

技术标签:

【中文标题】Spark 无法识别提供给 withWatermark() 的事件时间列【英文标题】:Spark can't identify the event time column being supplied to withWatermark() 【发布时间】:2018-06-07 17:32:56 【问题描述】:

我试图找到一种方法,使用 Spark 根据事件时间戳对不同事件中的功能进行重新会话,我发现了一个代码示例,该示例使用 mapGroupsWithState在其存储库中使用处理时间戳来重新会话事件。

https://github.com/apache/spark/blob/v2.3.0/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala

为了快速测试这个会话化东西是否适用于事件时间戳,我添加了 withWatermark("timestamp", "10 seconds") (将处理时间视为事件时间戳)并将ProcessingTimeTimeout 更改为EventTimeTimeout

  val lines = spark.readStream
  .format("socket")
  .option("host", host)
  .option("port", port)
  .option("includeTimestamp", value = true)
  .load()

 // Split the lines into words, treat words as sessionId of events
 val events = lines
  .withWatermark("timestamp", "10 seconds") // added
  .as[(String, Timestamp)]
  .flatMap  case (line, timestamp) =>
    line.split(" ").map(word => Event(sessionId = word, timestamp))
  

 val sessionUpdates = events
  .groupByKey(event => event.sessionId)
  .mapGroupsWithState[SessionInfo, SessionUpdate].(GroupStateTimeout.EventTimeTimeout) 
   ...
  

  // Start running the query that prints the session updates to the console
 val query = sessionUpdates
  .writeStream
  .outputMode("update")
  .format("console")
  .start()

 query.awaitTermination()

但是,当我运行它时,Spark 抛出了org.apache.spark.sql.AnalysisException 并说

必须在查询中使用“[Dataset/DataFrame].withWatermark()”指定水印,以便在 [map|flatMap]GroupsWithState 中使用事件时间超时。不支持不带水印的事件时间超时`

这不是真的且令人困惑,因为那个“时间戳”列显然是在该异常消息之后的物理计划中:

...
+- EventTimeWatermark timestamp#3: timestamp, interval 10 seconds
   +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@394a6d2b,socket,List(),..., [value#2, timestamp#3]

我是否错过了什么或做错了什么?

提前致谢!

【问题讨论】:

【参考方案1】:

在平面图操作后添加水印。 这应该有效:

val events = lines
  .as[(String, Timestamp)]
  .flatMap  case (line, timestamp) =>
    line.split(" ").map(word => Event(sessionId = word, timestamp))
  .withWatermark("timestamp", "10 seconds") 

 val sessionUpdates = events
  .groupByKey(event => event.sessionId)
  .mapGroupsWithState[SessionInfo, SessionUpdate].(GroupStateTimeout.EventTimeTimeout) 
   ...
  

【讨论】:

以上是关于Spark 无法识别提供给 withWatermark() 的事件时间列的主要内容,如果未能解决你的问题,请参考以下文章

为啥 spark 无法识别我的“数据框布尔表达式”?

Pyspark 命令无法识别

使用 BouncyCastle 生成的证书作为服务器进行身份验证时出现“无法识别提供给包的凭据”错误

Spark SQL在拆分后无法识别空值

spark 的 approxQuantile 问题,无法识别 List<String>

Kubernetes 集群上的 spark-submit 无法识别 k8s --master 属性