在 Spark 结构化流中将数据内部连接到左连接 DataFrame 时丢失条目
Posted
技术标签:
【中文标题】在 Spark 结构化流中将数据内部连接到左连接 DataFrame 时丢失条目【英文标题】:Losing entries when inner-joining data to a left-joined DataFrame in Spark Structured Streaming 【发布时间】:2021-02-06 17:27:37 【问题描述】:我正在尝试将数据与 DataFrame 连接起来,而 DataFrame 又是由左连接产生的。虽然在批处理中这按预期工作,但在流处理中一些条目会丢失......
在下面我创建了一个“会话”的最小示例,它具有“开始”和“结束”事件以及可选的一些“元数据”。
脚本生成两个输出:sessionStartsWithMetadata
来自“开始”事件的结果,这些事件与“元数据”事件左连接,基于sessionId
。使用“左连接”,因为即使不存在相应的元数据,我们也希望获得输出事件。
另外一个 DataFrame endedSessionsWithMetadata
是通过将“end”事件连接到先前创建的 DataFrame 来创建的。这里使用了“内连接”,因为我们只需要在会话确定结束时得到一些输出。
这段代码可以在spark-shell
中执行:
import java.sql.Timestamp
import org.apache.spark.sql.execution.streaming.MemoryStream, StreamingQueryWrapper
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.DataFrame, SQLContext
import org.apache.spark.sql.functions.col, expr, lit
import spark.implicits._
implicit val sqlContext: SQLContext = spark.sqlContext
// Main data processing, regardless whether batch or stream processing
def process(
sessionStartEvents: DataFrame,
sessionOptionalMetadataEvents: DataFrame,
sessionEndEvents: DataFrame
): (DataFrame, DataFrame) =
val sessionStartsWithMetadata: DataFrame = sessionStartEvents
.join(
sessionOptionalMetadataEvents,
sessionStartEvents("sessionId") === sessionOptionalMetadataEvents("sessionId") &&
sessionStartEvents("sessionStartTimestamp").between(
sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").minus(expr(s"INTERVAL 1 seconds")),
sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").plus(expr(s"INTERVAL 1 seconds"))
),
"left" // metadata is optional
)
.select(
sessionStartEvents("sessionId"),
sessionStartEvents("sessionStartTimestamp"),
sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp")
)
val endedSessionsWithMetadata = sessionStartsWithMetadata.join(
sessionEndEvents,
sessionStartsWithMetadata("sessionId") === sessionEndEvents("sessionId") &&
sessionStartsWithMetadata("sessionStartTimestamp").between(
sessionEndEvents("sessionEndTimestamp").minus(expr(s"INTERVAL 10 seconds")),
sessionEndEvents("sessionEndTimestamp")
)
)
(sessionStartsWithMetadata, endedSessionsWithMetadata)
def streamProcessing(
sessionStartData: Seq[(Timestamp, Int)],
sessionOptionalMetadata: Seq[(Timestamp, Int)],
sessionEndData: Seq[(Timestamp, Int)]
): (StreamingQuery, StreamingQuery) =
val sessionStartEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
sessionStartEventsStream.addData(sessionStartData)
val sessionStartEvents: DataFrame = sessionStartEventsStream
.toDS()
.toDF("sessionStartTimestamp", "sessionId")
.withWatermark("sessionStartTimestamp", "1 second")
val sessionOptionalMetadataEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
sessionOptionalMetadataEventsStream.addData(sessionOptionalMetadata)
val sessionOptionalMetadataEvents: DataFrame = sessionOptionalMetadataEventsStream
.toDS()
.toDF("sessionOptionalMetadataTimestamp", "sessionId")
.withWatermark("sessionOptionalMetadataTimestamp", "1 second")
val sessionEndEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
sessionEndEventsStream.addData(sessionEndData)
val sessionEndEvents: DataFrame = sessionEndEventsStream
.toDS()
.toDF("sessionEndTimestamp", "sessionId")
.withWatermark("sessionEndTimestamp", "1 second")
val (sessionStartsWithMetadata, endedSessionsWithMetadata) =
process(sessionStartEvents, sessionOptionalMetadataEvents, sessionEndEvents)
val sessionStartsWithMetadataQuery = sessionStartsWithMetadata
.select(lit("sessionStartsWithMetadata"), col("*")) // Add label to see which query's output it is
.writeStream
.outputMode("append")
.format("console")
.option("truncate", "false")
.option("numRows", "1000")
.start()
val endedSessionsWithMetadataQuery = endedSessionsWithMetadata
.select(lit("endedSessionsWithMetadata"), col("*")) // Add label to see which query's output it is
.writeStream
.outputMode("append")
.format("console")
.option("truncate", "false")
.option("numRows", "1000")
.start()
(sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery)
def batchProcessing(
sessionStartData: Seq[(Timestamp, Int)],
sessionOptionalMetadata: Seq[(Timestamp, Int)],
sessionEndData: Seq[(Timestamp, Int)]
): Unit =
val sessionStartEvents = spark.createDataset(sessionStartData).toDF("sessionStartTimestamp", "sessionId")
val sessionOptionalMetadataEvents = spark.createDataset(sessionOptionalMetadata).toDF("sessionOptionalMetadataTimestamp", "sessionId")
val sessionEndEvents = spark.createDataset(sessionEndData).toDF("sessionEndTimestamp", "sessionId")
val (sessionStartsWithMetadata, endedSessionsWithMetadata) =
process(sessionStartEvents, sessionOptionalMetadataEvents, sessionEndEvents)
println("sessionStartsWithMetadata")
sessionStartsWithMetadata.show(100, truncate = false)
println("endedSessionsWithMetadata")
endedSessionsWithMetadata.show(100, truncate = false)
// Data is represented as tuples of (eventTime, sessionId)...
val sessionStartData = Vector(
(new Timestamp(1), 0),
(new Timestamp(2000), 1),
(new Timestamp(2000), 2),
(new Timestamp(20000), 10)
)
val sessionOptionalMetadata = Vector(
(new Timestamp(1), 0),
// session `1` has no metadata
(new Timestamp(2000), 2),
(new Timestamp(20000), 10)
)
val sessionEndData = Vector(
(new Timestamp(10000), 0),
(new Timestamp(11000), 1),
(new Timestamp(12000), 2),
(new Timestamp(30000), 10)
)
batchProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData)
val (sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery) =
streamProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData)
在 ID 为 1
的示例会话中没有元数据,因此相应的元数据列是 null
。
加入数据的主要功能在def process(…)
中实现,使用批处理数据和流数据调用。
在批处理版本中,输出如预期:
sessionStartsWithMetadata
+---------+-----------------------+--------------------------------+
|sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|
+---------+-----------------------+--------------------------------+
|0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |
|1 |1970-01-01 01:00:02 |null | ← has no metadata ✔
|2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |
|10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |
+---------+-----------------------+--------------------------------+
endedSessionsWithMetadata
+---------+-----------------------+--------------------------------+-------------------+---------+
|sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
+---------+-----------------------+--------------------------------+-------------------+---------+
|0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |1970-01-01 01:00:10|0 |
|1 |1970-01-01 01:00:02 |null |1970-01-01 01:00:11|1 | ← has no metadata ✔
|2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |1970-01-01 01:00:12|2 |
|10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |1970-01-01 01:00:30|10 |
+---------+-----------------------+--------------------------------+-------------------+---------+
但是当作为流处理运行相同的处理时,endedSessionsWithMetadata
的输出不包含没有元数据的会话 1
的条目:
-------------------------------------------
Batch: 0 ("start event")
-------------------------------------------
+-------------------------+---------+-----------------------+--------------------------------+
|sessionStartsWithMetadata|sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|
+-------------------------+---------+-----------------------+--------------------------------+
|sessionStartsWithMetadata|10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |
|sessionStartsWithMetadata|2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |
|sessionStartsWithMetadata|0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |
+-------------------------+---------+-----------------------+--------------------------------+
-------------------------------------------
Batch: 0 ("end event")
-------------------------------------------
+-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+
|endedSessionsWithMetadata|sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
+-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+
|endedSessionsWithMetadata|10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |1970-01-01 01:00:30|10 |
|endedSessionsWithMetadata|2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |1970-01-01 01:00:12|2 |
|endedSessionsWithMetadata|0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |1970-01-01 01:00:10|0 |
+-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+
-------------------------------------------
Batch: 1 ("start event")
-------------------------------------------
+-------------------------+---------+---------------------+--------------------------------+
|sessionStartsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp|
+-------------------------+---------+---------------------+--------------------------------+
|sessionStartsWithMetadata|1 |1970-01-01 01:00:02 |null | ← has no metadata ✔
+-------------------------+---------+---------------------+--------------------------------+
-------------------------------------------
Batch: 1 ("end event")
-------------------------------------------
+-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
|endedSessionsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
+-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
+-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
↳ ✘ here I would have expected a line with sessionId=1, that has "start" and "end" information, but no "metadata" ✘
谁能解释为什么在流处理中没有“元数据”(sessionId=1
)的“结束”事件不存在?我需要做什么才能让它出现在输出中?
非常感谢!
【问题讨论】:
有水印5秒有什么效果? 我正在度假,所以我不会太沉迷。 @thebluephantom 感谢您的关注!不幸的是,5 秒的水印不会改变结果。 我上面的评论无效。从霍尔斯回来。跑过去,有点好奇。查阅手册。明天会做一些实验。 这没有返回任何内容:... sessionEndEvents("sessionEndTimestamp"), sessionEndEvents("sessionEndTimestamp").plus(expr(s"INTERVAL 10 seconds")) 【参考方案1】:经过大量测试,环顾四周并重新阅读手册:
这一定是 Spark 中的错误。 我还注意到这个帖子正在流通:https://lists.apache.org/thread.html/cc6489a19316e7382661d305fabd8c21915e5faf6a928b4869ac2b4a@%3Cdev.spark.apache.org%3E 并且虽然理解了全局与链式流-流连接,但这 将 imo 指向此类处理的问题。 我在 Spark Databricks 3.x 上运行无济于事。
【讨论】:
感谢您发送邮件列表。不幸的是,这似乎是 Spark 的一个限制。 但从手册中看不出来。尽管它们仅显示 2 个 DS 的 JOIN,但仅显示 DF。无论如何,我们现在都没有了。 无论如何都是好问题。 根据上述说明,已创建 Spark 票证:issues.apache.org/jira/browse/SPARK-33259以上是关于在 Spark 结构化流中将数据内部连接到左连接 DataFrame 时丢失条目的主要内容,如果未能解决你的问题,请参考以下文章
我正在学习如何从 Spark the Definitive Guide 一书中将 sqlite 连接到 spark