如何从 Spark 结构化流中的 Cassandra 等外部存储读取 Kafka 和查询?

Posted

技术标签:

【中文标题】如何从 Spark 结构化流中的 Cassandra 等外部存储读取 Kafka 和查询?【英文标题】:How to read from Kafka and Query from an external store like Cassandra in Spark Structured Streaming? 【发布时间】:2017-09-07 08:03:59 【问题描述】:

如何在 Spark Structured Streaming 中从 Kafka 和 Query 等外部存储中读取数据?

我从 Kafka 获得消息流,我想对其应用 Map 操作,对于每个键,我想查询像 Cassandra 这样的数据存储,并获取该键的更多信息并在流上应用进一步的操作。如何使用 Spark Structured Streaming 2.2.0 做到这一点?

【问题讨论】:

【参考方案1】:

Kafka 结构化流可以与静态数据帧连接。根据documentation,您可以这样做:

val staticDf = spark.read. ... // read from Cassandra
val streamingDf = spark.readStream. ... // read from stream


// example of join to get information from both Cassandra and stream
streamingDf.join(staticDf, "type")          // inner equi-join with a static DF
streamingDf.join(staticDf, "type", "right_join")  // right outer join with a static DF

【讨论】:

【参考方案2】:

要以流的形式从 kafka 中读取,

val spark = SparkSession
              .builder
              .appName("kafka-reading")
              .getOrCreate()

   val df = spark
           .readStream
           .format("kafka")
           .option("kafka.bootstrap.servers", "localhost:9092")
           .option("startingOffsets", "latest")
           .option("subscribe", topicName)
           .load()
           .selectExpr("CAST (key AS STRING)", "CAST (value AS STRING)").as[(String, String)]

如果dataframe的结构比较复杂,在cast的时候需要提供dataframe的schema。

要执行操作,首先需要使用水印将一些数据累积一定时间,即 10 秒。加水印后,您可以应用 groupBy 以便您可以对其进行聚合。收集键和值作为列表。然后通过遍历键列表,您可以使用键从 cassandra 获取数据 有关如何进行水印和应用聚合的完整信息。 可以参考Structured Streaming

【讨论】:

以上是关于如何从 Spark 结构化流中的 Cassandra 等外部存储读取 Kafka 和查询?的主要内容,如果未能解决你的问题,请参考以下文章

SPARK 结构化流中的 StructField 是不是存在错误

Spark 结构化流中的临时视图

Spark 结构化流中的外部连接

带有自定义接收器的 Spark 结构化流中的输入行数

如何在 Spark 结构化流中获取书面记录的数量?

有没有办法将生成的 groupby 流加入到 kafka-spark 结构化流中的原始流?