如何将 Spark 结构化流与 Kafka Direct Stream 结合使用?

Posted

技术标签:

【中文标题】如何将 Spark 结构化流与 Kafka Direct Stream 结合使用?【英文标题】:How to use Spark Structured Streaming with Kafka Direct Stream? 【发布时间】:2017-01-09 14:08:01 【问题描述】:

我遇到了Structured Streaming with Spark,它有一个从 S3 存储桶持续消费并将处理后的结果写入 mysql 数据库的示例。

// Read data continuously from an S3 location
val inputDF = spark.readStream.json("s3://logs")

// Do operations using the standard DataFrame API and write to MySQL
inputDF.groupBy($"action", window($"time", "1 hour")).count()
       .writeStream.format("jdbc")
       .start("jdbc:mysql//...")

这如何与Spark Kafka Streaming 一起使用?

val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

有没有办法在不使用stream.foreachRDD(rdd => )的情况下将这两个示例结合起来?

【问题讨论】:

【参考方案1】:

有没有办法在不使用的情况下结合这两个例子 stream.foreachRDD(rdd => )?

还没有。 Spark 2.0.0 不支持结构化流的 Kafka 接收器。这是 Spark Streaming 的创建者之一 Spark 2.1.0 according to Tathagata Das 应该提供的功能。 Here is the relevant JIRA issue。

编辑:(2018 年 11 月 29 日)

是的,Spark 2.2 及更高版本是可能的。

stream
  .writeStream // use `write` for batch, like DataFrame
  .format("kafka")
  .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
  .option("topic", "target-topic1")
  .start()

查看SO post(read and write on Kafka topic with Spark streaming) 了解更多信息。

编辑:(06/12/2016)

结构化流的 Kafka 0.10 集成现在是 expiramentaly supported in Spark 2.0.2:

val ds1 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

ds1
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

【讨论】:

有没有办法跟踪进度功能?例如。 Jira 故事、功能请求等? @ike_love 是的,你可以找到它here (Kafka)Alpha 版本请参考以下文档:spark.apache.org/docs/latest/…【参考方案2】:

我在从 Kafka 源读取并写入 Cassandra 接收器时遇到了类似的问题。在这里创建了一个简单的项目kafka2spark2cassandra,分享以防对任何人有帮助。

【讨论】:

@Sokia - 您的项目运行良好、干净且独立。谢谢。

以上是关于如何将 Spark 结构化流与 Kafka Direct Stream 结合使用?的主要内容,如果未能解决你的问题,请参考以下文章

将 Spark 结构化流与 Confluent Schema Registry 集成

Apache Spark 结构化流与 Apache Flink:有啥区别?

Spark Streaming + Kafka vs Just Kafka

卡夫卡流与卡夫卡消费者如何决定使用啥

如何将 from_json 与 Kafka connect 0.10 和 Spark Structured Streaming 一起使用?

如何在 Spark 结构化流中手动设置 group.id 并提交 kafka 偏移量?