如何在 Spark 结构化流中获取书面记录的数量?
Posted
技术标签:
【中文标题】如何在 Spark 结构化流中获取书面记录的数量?【英文标题】:How to get number of written records in spark structured streaming? 【发布时间】:2018-02-01 14:43:46 【问题描述】:我在单个 Spark 会话中配置了一些结构化流。我需要知道在每个流中读取和写入了多少条记录。 例如,我有这两个流:
-
read-s3 -> 转换 -> write-s3
read-s3 -> 转换 -> write-db
我知道使用 SparkListener().onTaskEnd(),但那时我没有查询名称,taskEnd.taskMetrics().outputMetrics().recordsWritten()
始终为 0,所以它不是一个选项。
另一种方法是使用 dataset.map() 中的累加器进行增量计算。但这不是要写入的记录数,而是要写入的记录(如果 sink 不失败)。
除了我尝试使用 StreamingQueryListener(我用它来获取 numInputRows
)之外,我找不到任何关于写入记录数量的指标。
是否有可能获得这种指标?
【问题讨论】:
还是custom sink吗? 没有。那篇文章是关于修复一些输入行的,但在这里我问的是输出记录的数量,使用哪个接收器并不重要。当然,有了自定义接收器,我可以自己添加这些指标,但如果我使用 FileStreamSink 该怎么办? FileStreamSink (issues.apache.org/jira/browse/SPARK-23288) 中存在一个错误,已在 2.3.1 版中修复 请将其作为答案发布 (***.com/help/self-answer)。它将对未来的访问者更加可见,并有助于解决问题。 【参考方案1】:a bug in FileStreamSink 已在 2.3.1 版中修复。
作为一种解决方法,在 map 函数中使用 accumulators 在写入接收器之前计算记录数。
【讨论】:
以上是关于如何在 Spark 结构化流中获取书面记录的数量?的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Spark 结构化流中手动设置 group.id 并提交 kafka 偏移量?
如何在 pyspark 结构化流中使用 maxOffsetsPerTrigger?
在 Spark 结构化流中,我如何将完整的聚合输出到外部源,如 REST 服务