基于kafka分区的结构化流式读取
Posted
技术标签:
【中文标题】基于kafka分区的结构化流式读取【英文标题】:structured streaming read based on kafka partitions 【发布时间】:2018-10-09 20:26:22 【问题描述】:我正在使用 spark 结构化 Streaming 从 Kafka 主题读取传入消息并根据传入消息写入多个 parquet 表 因此,我创建了一个 readStream,因为 Kafka 源很常见,并且为每个 parquet 表在循环中创建了单独的写入流。这工作正常,但 readstream 正在创建一个瓶颈,因为它为每个 writeStream 创建一个 readStream 并且没有办法缓存已经读取的数据帧。
val kafkaDf=spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", conf.servers)
.option("subscribe", conf.topics)
// .option("earliestOffset","true")
.option("failOnDataLoss",false)
.load()
foreach table
//filter the data from source based on table name
//write to parquet
parquetDf.writeStream.format("parquet")
.option("path", outputFolder + File.separator+ tableName)
.option("checkpointLocation", "checkpoint_"+tableName)
.outputMode("append")
.trigger(Trigger.Once())
.start()
现在每个写入流都在创建一个新的消费者组并从 Kafka 读取全部数据,然后进行过滤并写入 Parquet。这会产生巨大的开销。为了避免这种开销,我可以对 Kafka 主题进行分区,使其具有与表数量一样多的分区,然后读取流应该只从给定的分区中读取。但我没有看到将分区详细信息指定为 Kafka 读取流的一部分的方法。
【问题讨论】:
您不需要手动将 Spark 任务分配给 Kafka 分区。您只需要将执行程序的数量扩大到分区的数量。 谢谢板球。所以基本上在写给 kafka 时,我使用表名作为键。所以在 1 个 readStream 到多个 writeStream 的情况下。我看到每个作家都再次从 kafka 阅读,所以有没有办法限制它只阅读它要写的消息?说我 hv table1 和 table2 并且在写作时我将 kafka 密钥作为 table1 和 table2 。现在有一种方法可以让 table1 的 writestream 只读取 table1 的消息而不读取 table2 的消息。我大约有 50 张桌子,所以这造成了巨大的瓶颈 @Ajith .. 你有什么解决办法吗?我正在寻找相同的 【参考方案1】:如果数据量不是很高,编写自己的接收器,收集每个微批次的数据,那么你应该能够缓存该数据帧并写入不同的位置,虽然需要一些调整,但它会起作用
【讨论】:
【参考方案2】:您可以使用 foreachBatch 接收器并缓存数据帧。希望它有效
【讨论】:
以上是关于基于kafka分区的结构化流式读取的主要内容,如果未能解决你的问题,请参考以下文章
从多个 Kafka 主题读取的 Spark 结构化流式应用程序
Spark结构化流式kafka在没有模式的情况下转换JSON(推断模式)