从多个 Kafka 主题读取的 Spark 结构化流式应用程序

Posted

技术标签:

【中文标题】从多个 Kafka 主题读取的 Spark 结构化流式应用程序【英文标题】:Spark structured streaming app reading from multiple Kafka topics 【发布时间】:2019-09-19 14:57:42 【问题描述】:

我有一个 Spark 结构化流应用程序 (v2.3.2),它需要从多个 Kafka 主题中读取数据,进行一些相对简单的处理(主要是聚合和一些连接)并将结果发布到其他一些 Kafka 主题.所以多个流在同一个应用程序中处理。

我想知道如果我只设置 1 个订阅多个主题的直接 readStream 然后使用选择拆分流,从资源的角度(内存、执行程序、线程、Kafka 侦听器等)是否会有所不同,与每个主题 1 个 readStream 相比。

类似

df = spark.readStream.format("kafka").option("subscribe", "t1,t2,t3")
...
t1df = df.select(...).where("topic = 't1'")...
t2df = df.select(...).where("topic = 't2'")...

对比

t1df = spark.readStream.format("kafka").option("subscribe", "t1")
t2df = spark.readStream.format("kafka").option("subscribe", "t2")

其中一个比另一个更“有效”吗?我找不到任何关于这是否有影响的文档。

谢谢!

【问题讨论】:

【参考方案1】:

每个操作都需要完整的沿袭执行。你最好把它分成三个单独的 kafka 读取。否则,您将阅读每个主题 N 次,其中 N 是写入次数。

我真的不建议这样做,但如果您想将所有主题放在同一个阅读中,请执行以下操作:

streamingDF.writeStream.foreachBatch  (batchDF: DataFrame, batchId: Long) =>
  batchDF.persist()
  batchDF.filter().write.format(...).save(...)  // location 1
  batchDF.filter().write.format(...).save(...)  // location 2
  batchDF.unpersist()

【讨论】:

我认为你的第一句话极大地提高了我对 Spark 结构化流中正在发生的事情的直觉。当“从头到尾”看它时,很明显,永远不应该在执行的血统中包含不必要的依赖关系。感谢您的洞察力! 每个流式写入器都有一个检查点目录,用于跟踪读取器中的读取偏移量。 Kafka 不在乎一条消息被阅读了多少次。 所以根据“动作”评论,如果我将 2 个流连接在一起并写入 1 个位置,那么一起读取 2 个流是否可以?意味着一次订阅多个主题。正确的? @JoeWiden 我很想知道检查点位置等其他考虑因素。【参考方案2】:

从资源(内存和核心)的角度来看,如果您在集群上将其作为多个流(多个驱动器-执行器)运行,则会有所不同。

对于第一种情况,你提到了-

df = spark.readStream.format("kafka").option("subscribe", "t1,t2,t3")... t1df = df.select(...).where("topic = 't1'")... t2df = df.select(...).where("topic = 't2'")...

考虑到您在上面提供了一个驱动程序和两个执行器。

第二种情况-

t1df = spark.readStream.format("kafka").option("subscribe", "t1") t2df = spark.readStream.format("kafka").option("subscribe", "t2")

您可以将它们作为不同的流运行 - 2 个驱动程序和 2 个执行程序(每个 1 个执行程序)。 在第二种情况下,需要更多的内存和内核来提供额外的驱动程序。

【讨论】:

我认为 Spark 应用程序只能有一个驱动程序。你是说核心吗? 是的,你是对的,Spark 应用程序只能有一个驱动程序。我的意思是说,您可以将第二种情况代码 sn-p 作为两个不同的应用程序运行,一个用于主题 t1,一个用于 t2。 感谢您的解释。我明确不想将流分成 2 个单独的应用程序。我的设置有很多不同的主题,但每个主题只有少量流量(最多每秒几条消息)。每个主题有 1 个应用程序会导致集群资源的大量管理开销,然后每个应用程序只会做很少的工作。所以我真的在考虑优化 1 个应用程序 那么最终的建议是什么? @jammann - 你能分享你的解决方案吗? 我正在运行少量应用程序 3-4,每个应用程序都包含少量查询 5-10。每个查询有 1 个应用程序对我来说开销太大。并且只有 1 个包含所有查询的应用程序对我来说效果不佳,往往会耗尽内存

以上是关于从多个 Kafka 主题读取的 Spark 结构化流式应用程序的主要内容,如果未能解决你的问题,请参考以下文章

基于kafka分区的结构化流式读取

从 Kafka 主题读取数据处理数据并使用 scala 和 spark 写回 Kafka 主题

Spark Streaming - 从Kafka读取json并将json写入其他Kafka主题

如何从 kafka 中的两个生产者那里摄取数据并使用 Spark 结构化流加入?

从Kafka主题中读取结构化流

如何从 Spark 结构化流中的 Cassandra 等外部存储读取 Kafka 和查询?