使用 Apache Spark Batch 对 Apache Kafka 进行偏移管理
Posted
技术标签:
【中文标题】使用 Apache Spark Batch 对 Apache Kafka 进行偏移管理【英文标题】:Offset Management For Apache Kafka With Apache Spark Batch 【发布时间】:2017-08-28 14:04:57 【问题描述】:我正在编写一个从 Kafka 主题读取的 Spark (v2.2) 批处理作业。 Spark 作业使用 cron 进行调度。 我不能使用 Spark Structured Streaming,因为不支持非基于时间的窗口。
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "...")
.option("subscribe", s"kafka_topic")
我需要为 kafka 主题设置偏移量,以便知道从哪里开始下一个批处理作业。我该怎么做?
【问题讨论】:
【参考方案1】:我猜你正在使用 KafkaUtils 创建流,你可以将它作为参数传递。
val inputDStream = KafkaUtils.createDirectStream[String,String](ssc,PreferConsistent,
Assign[String, String](fromOffsets.keys,kafkaParams,fromOffsets))
希望这会有所帮助!
【讨论】:
投反对票 接受批量查询,不允许使用最新的偏移量。 spark.apache.org/docs/latest/… 我必须传递给 Spark Streaming 才能从上次查询中断的地方恢复新查询。以上是关于使用 Apache Spark Batch 对 Apache Kafka 进行偏移管理的主要内容,如果未能解决你的问题,请参考以下文章
Apache Spark 使用 SQL 函数 nTile 对数据进行分区