基于kafka分区的结构化流式读取

Posted

技术标签:

【中文标题】基于kafka分区的结构化流式读取【英文标题】:structured streaming read based on kafka partitions 【发布时间】:2018-10-09 20:26:22 【问题描述】:

我正在使用 spark 结构化 Streaming 从 Kafka 主题读取传入消息并根据传入消息写入多个 parquet 表 因此,我创建了一个 readStream,因为 Kafka 源很常见,并且为每个 parquet 表在循环中创建了单独的写入流。这工作正常,但 readstream 正在创建一个瓶颈,因为它为每个 writeStream 创建一个 readStream 并且没有办法缓存已经读取的数据帧。

val kafkaDf=spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", conf.servers)
      .option("subscribe", conf.topics)
      //  .option("earliestOffset","true")
      .option("failOnDataLoss",false)
      .load()

foreach table     
//filter the data from source based on table name
//write to parquet
 parquetDf.writeStream.format("parquet")
        .option("path", outputFolder + File.separator+ tableName)
        .option("checkpointLocation", "checkpoint_"+tableName)
        .outputMode("append")
        .trigger(Trigger.Once())
       .start()

现在每个写入流都在创建一个新的消费者组并从 Kafka 读取全部数据,然后进行过滤并写入 Parquet。这会产生巨大的开销。为了避免这种开销,我可以对 Kafka 主题进行分区,使其具有与表数量一样多的分区,然后读取流应该只从给定的分区中读取。但我没有看到将分区详细信息指定为 Kafka 读取流的一部分的方法。

【问题讨论】:

您不需要手动将 Spark 任务分配给 Kafka 分区。您只需要将执行程序的数量扩大到分区的数量。 谢谢板球。所以基本上在写给 kafka 时,我使用表名作为键。所以在 1 个 readStream 到多个 writeStream 的情况下。我看到每个作家都再次从 kafka 阅读,所以有没有办法限制它只阅读它要写的消息?说我 hv table1 和 table2 并且在写作时我将 kafka 密钥作为 table1 和 table2 。现在有一种方法可以让 table1 的 writestream 只读取 table1 的消息而不读取 table2 的消息。我大约有 50 张桌子,所以这造成了巨大的瓶颈 @Ajith .. 你有什么解决办法吗?我正在寻找相同的 【参考方案1】:

如果数据量不是很高,编写自己的接收器,收集每个微批次的数据,那么你应该能够缓存该数据帧并写入不同的位置,虽然需要一些调整,但它会起作用

【讨论】:

【参考方案2】:

您可以使用 foreachBatch 接收器并缓存数据帧。希望它有效

【讨论】:

以上是关于基于kafka分区的结构化流式读取的主要内容,如果未能解决你的问题,请参考以下文章

从多个 Kafka 主题读取的 Spark 结构化流式应用程序

Spark结构化流式kafka在没有模式的情况下转换JSON(推断模式)

结构化流式处理:由于检查点数据而重新启动时出现流式处理异常

Kafka 结构化流式处理 KafkaSourceProvider 无法实例化

Kafka 原理以及分区分配策略剖析

kafka主题分区的数量和数据中不同键的数量