在 Spark 结构化流中将数据内部连接到左连接 DataFrame 时丢失条目

Posted

技术标签:

【中文标题】在 Spark 结构化流中将数据内部连接到左连接 DataFrame 时丢失条目【英文标题】:Losing entries when inner-joining data to a left-joined DataFrame in Spark Structured Streaming 【发布时间】:2021-02-06 17:27:37 【问题描述】:

我正在尝试将数据与 DataFrame 连接起来,而 DataFrame 又是由左连接产生的。虽然在批处理中这按预期工作,但在流处理中一些条目会丢失......

在下面我创建了一个“会话”的最小示例,它具有“开始”和“结束”事件以及可选的一些“元数据”。

脚本生成两个输出:sessionStartsWithMetadata 来自“开始”事件的结果,这些事件与“元数据”事件左连接,基于sessionId。使用“左连接”,因为即使不存在相应的元数据,我们也希望获得输出事件。

另外一个 DataFrame endedSessionsWithMetadata 是通过将“end”事件连接到先前创建的 DataFrame 来创建的。这里使用了“内连接”,因为我们只需要在会话确定结束时得到一些输出。

这段代码可以在spark-shell中执行:

import java.sql.Timestamp
import org.apache.spark.sql.execution.streaming.MemoryStream, StreamingQueryWrapper
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.DataFrame, SQLContext
import org.apache.spark.sql.functions.col, expr, lit

import spark.implicits._
implicit val sqlContext: SQLContext = spark.sqlContext

// Main data processing, regardless whether batch or stream processing
def process(
    sessionStartEvents: DataFrame,
    sessionOptionalMetadataEvents: DataFrame,
    sessionEndEvents: DataFrame
): (DataFrame, DataFrame) = 
  val sessionStartsWithMetadata: DataFrame = sessionStartEvents
    .join(
      sessionOptionalMetadataEvents,
      sessionStartEvents("sessionId") === sessionOptionalMetadataEvents("sessionId") &&
        sessionStartEvents("sessionStartTimestamp").between(
          sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").minus(expr(s"INTERVAL 1 seconds")),
          sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").plus(expr(s"INTERVAL 1 seconds"))
        ),
      "left" // metadata is optional
    )
    .select(
      sessionStartEvents("sessionId"),
      sessionStartEvents("sessionStartTimestamp"),
      sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp")
    )

  val endedSessionsWithMetadata = sessionStartsWithMetadata.join(
    sessionEndEvents,
    sessionStartsWithMetadata("sessionId") === sessionEndEvents("sessionId") &&
      sessionStartsWithMetadata("sessionStartTimestamp").between(
        sessionEndEvents("sessionEndTimestamp").minus(expr(s"INTERVAL 10 seconds")),
        sessionEndEvents("sessionEndTimestamp")
      )
  )

  (sessionStartsWithMetadata, endedSessionsWithMetadata)


def streamProcessing(
    sessionStartData: Seq[(Timestamp, Int)],
    sessionOptionalMetadata: Seq[(Timestamp, Int)],
    sessionEndData: Seq[(Timestamp, Int)]
): (StreamingQuery, StreamingQuery) = 

  val sessionStartEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
  sessionStartEventsStream.addData(sessionStartData)

  val sessionStartEvents: DataFrame = sessionStartEventsStream
    .toDS()
    .toDF("sessionStartTimestamp", "sessionId")
    .withWatermark("sessionStartTimestamp", "1 second")

  val sessionOptionalMetadataEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
  sessionOptionalMetadataEventsStream.addData(sessionOptionalMetadata)

  val sessionOptionalMetadataEvents: DataFrame = sessionOptionalMetadataEventsStream
    .toDS()
    .toDF("sessionOptionalMetadataTimestamp", "sessionId")
    .withWatermark("sessionOptionalMetadataTimestamp", "1 second")

  val sessionEndEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
  sessionEndEventsStream.addData(sessionEndData)

  val sessionEndEvents: DataFrame = sessionEndEventsStream
    .toDS()
    .toDF("sessionEndTimestamp", "sessionId")
    .withWatermark("sessionEndTimestamp", "1 second")

  val (sessionStartsWithMetadata, endedSessionsWithMetadata) =
    process(sessionStartEvents, sessionOptionalMetadataEvents, sessionEndEvents)

  val sessionStartsWithMetadataQuery = sessionStartsWithMetadata
    .select(lit("sessionStartsWithMetadata"), col("*")) // Add label to see which query's output it is
    .writeStream
    .outputMode("append")
    .format("console")
    .option("truncate", "false")
    .option("numRows", "1000")
    .start()

  val endedSessionsWithMetadataQuery = endedSessionsWithMetadata
    .select(lit("endedSessionsWithMetadata"), col("*")) // Add label to see which query's output it is
    .writeStream
    .outputMode("append")
    .format("console")
    .option("truncate", "false")
    .option("numRows", "1000")
    .start()

  (sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery)


def batchProcessing(
    sessionStartData: Seq[(Timestamp, Int)],
    sessionOptionalMetadata: Seq[(Timestamp, Int)],
    sessionEndData: Seq[(Timestamp, Int)]
): Unit = 

  val sessionStartEvents = spark.createDataset(sessionStartData).toDF("sessionStartTimestamp", "sessionId")
  val sessionOptionalMetadataEvents = spark.createDataset(sessionOptionalMetadata).toDF("sessionOptionalMetadataTimestamp", "sessionId")
  val sessionEndEvents = spark.createDataset(sessionEndData).toDF("sessionEndTimestamp", "sessionId")

  val (sessionStartsWithMetadata, endedSessionsWithMetadata) =
    process(sessionStartEvents, sessionOptionalMetadataEvents, sessionEndEvents)

  println("sessionStartsWithMetadata")
  sessionStartsWithMetadata.show(100, truncate = false)

  println("endedSessionsWithMetadata")
  endedSessionsWithMetadata.show(100, truncate = false)



// Data is represented as tuples of (eventTime, sessionId)...
val sessionStartData = Vector(
  (new Timestamp(1), 0),
  (new Timestamp(2000), 1),
  (new Timestamp(2000), 2),
  (new Timestamp(20000), 10)
)

val sessionOptionalMetadata = Vector(
  (new Timestamp(1), 0),
  // session `1` has no metadata
  (new Timestamp(2000), 2),
  (new Timestamp(20000), 10)
)

val sessionEndData = Vector(
  (new Timestamp(10000), 0),
  (new Timestamp(11000), 1),
  (new Timestamp(12000), 2),
  (new Timestamp(30000), 10)
)

batchProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData)

val (sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery) =
  streamProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData)

在 ID 为 1 的示例会话中没有元数据,因此相应的元数据列是 null

加入数据的主要功能在def process(…)中实现,使用批处理数据和流数据调用。

在批处理版本中,输出如预期:

sessionStartsWithMetadata
+---------+-----------------------+--------------------------------+            
|sessionId|sessionStartTimestamp  |sessionOptionalMetadataTimestamp|
+---------+-----------------------+--------------------------------+
|0        |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001         |
|1        |1970-01-01 01:00:02    |null                            | ← has no metadata ✔
|2        |1970-01-01 01:00:02    |1970-01-01 01:00:02             |
|10       |1970-01-01 01:00:20    |1970-01-01 01:00:20             |
+---------+-----------------------+--------------------------------+

endedSessionsWithMetadata
+---------+-----------------------+--------------------------------+-------------------+---------+
|sessionId|sessionStartTimestamp  |sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
+---------+-----------------------+--------------------------------+-------------------+---------+
|0        |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001         |1970-01-01 01:00:10|0        |
|1        |1970-01-01 01:00:02    |null                            |1970-01-01 01:00:11|1        |  ← has no metadata ✔
|2        |1970-01-01 01:00:02    |1970-01-01 01:00:02             |1970-01-01 01:00:12|2        |
|10       |1970-01-01 01:00:20    |1970-01-01 01:00:20             |1970-01-01 01:00:30|10       |
+---------+-----------------------+--------------------------------+-------------------+---------+

但是当作为流处理运行相同的处理时,endedSessionsWithMetadata 的输出不包含没有元数据的会话 1 的条目:

-------------------------------------------                                     
Batch: 0 ("start event")
-------------------------------------------
+-------------------------+---------+-----------------------+--------------------------------+
|sessionStartsWithMetadata|sessionId|sessionStartTimestamp  |sessionOptionalMetadataTimestamp|
+-------------------------+---------+-----------------------+--------------------------------+
|sessionStartsWithMetadata|10       |1970-01-01 01:00:20    |1970-01-01 01:00:20             |
|sessionStartsWithMetadata|2        |1970-01-01 01:00:02    |1970-01-01 01:00:02             |
|sessionStartsWithMetadata|0        |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001         |
+-------------------------+---------+-----------------------+--------------------------------+

-------------------------------------------                                     
Batch: 0 ("end event")
-------------------------------------------
+-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+
|endedSessionsWithMetadata|sessionId|sessionStartTimestamp  |sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
+-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+
|endedSessionsWithMetadata|10       |1970-01-01 01:00:20    |1970-01-01 01:00:20             |1970-01-01 01:00:30|10       |
|endedSessionsWithMetadata|2        |1970-01-01 01:00:02    |1970-01-01 01:00:02             |1970-01-01 01:00:12|2        |
|endedSessionsWithMetadata|0        |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001         |1970-01-01 01:00:10|0        |
+-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+

-------------------------------------------                                     
Batch: 1 ("start event")
-------------------------------------------
+-------------------------+---------+---------------------+--------------------------------+
|sessionStartsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp|
+-------------------------+---------+---------------------+--------------------------------+
|sessionStartsWithMetadata|1        |1970-01-01 01:00:02  |null                            | ← has no metadata ✔
+-------------------------+---------+---------------------+--------------------------------+

-------------------------------------------                                     
Batch: 1 ("end event")
-------------------------------------------
+-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
|endedSessionsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
+-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
+-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
  ↳ ✘ here I would have expected a line with sessionId=1, that has "start" and "end" information, but no "metadata" ✘


谁能解释为什么在流处理中没有“元数据”(sessionId=1)的“结束”事件不存在?我需要做什么才能让它出现在输出中?

非常感谢!

【问题讨论】:

有水印5秒有什么效果? 我正在度假,所以我不会太沉迷。 @thebluephantom 感谢您的关注!不幸的是,5 秒的水印不会改变结果。 我上面的评论无效。从霍尔斯回来。跑过去,有点好奇。查阅手册。明天会做一些实验。 这没有返回任何内容:... sessionEndEvents("sessionEndTimestamp"), sessionEndEvents("sessionEndTimestamp").plus(expr(s"INTERVAL 10 seconds")) 【参考方案1】:

经过大量测试,环顾四周并重新阅读手册:

这一定是 Spark 中的错误。 我还注意到这个帖子正在流通:https://lists.apache.org/thread.html/cc6489a19316e7382661d305fabd8c21915e5faf6a928b4869ac2b4a@%3Cdev.spark.apache.org%3E 并且虽然理解了全局与链式流-流连接,但这 将 imo 指向此类处理的问题。 我在 Spark Databricks 3.x 上运行无济于事。

【讨论】:

感谢您发送邮件列表。不幸的是,这似乎是 Spark 的一个限制。 但从手册中看不出来。尽管它们仅显示 2 个 DS 的 JOIN,但仅显示 DF。无论如何,我们现在都没有了。 无论如何都是好问题。 根据上述说明,已创建 Spark 票证:issues.apache.org/jira/browse/SPARK-33259

以上是关于在 Spark 结构化流中将数据内部连接到左连接 DataFrame 时丢失条目的主要内容,如果未能解决你的问题,请参考以下文章

Spark 结构化流中的外部连接

我正在学习如何从 Spark the Definitive Guide 一书中将 sqlite 连接到 spark

如何在火花数据框中将列连接到一个

如何在 spark DataFrame 中将多个浮点列连接到一个 ArrayType(FloatType()) 中?

结构化流 - 消费每条消息

教程:Apache Spark SQL入门及实践指南!