如何从 Spark 结构化流中的 Cassandra 等外部存储读取 Kafka 和查询?
Posted
技术标签:
【中文标题】如何从 Spark 结构化流中的 Cassandra 等外部存储读取 Kafka 和查询?【英文标题】:How to read from Kafka and Query from an external store like Cassandra in Spark Structured Streaming? 【发布时间】:2017-09-07 08:03:59 【问题描述】:如何在 Spark Structured Streaming 中从 Kafka 和 Query 等外部存储中读取数据?
我从 Kafka 获得消息流,我想对其应用 Map 操作,对于每个键,我想查询像 Cassandra 这样的数据存储,并获取该键的更多信息并在流上应用进一步的操作。如何使用 Spark Structured Streaming 2.2.0 做到这一点?
【问题讨论】:
【参考方案1】:Kafka 结构化流可以与静态数据帧连接。根据documentation,您可以这样做:
val staticDf = spark.read. ... // read from Cassandra
val streamingDf = spark.readStream. ... // read from stream
// example of join to get information from both Cassandra and stream
streamingDf.join(staticDf, "type") // inner equi-join with a static DF
streamingDf.join(staticDf, "type", "right_join") // right outer join with a static DF
【讨论】:
【参考方案2】:要以流的形式从 kafka 中读取,
val spark = SparkSession
.builder
.appName("kafka-reading")
.getOrCreate()
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("startingOffsets", "latest")
.option("subscribe", topicName)
.load()
.selectExpr("CAST (key AS STRING)", "CAST (value AS STRING)").as[(String, String)]
如果dataframe的结构比较复杂,在cast的时候需要提供dataframe的schema。
要执行操作,首先需要使用水印将一些数据累积一定时间,即 10 秒。加水印后,您可以应用 groupBy 以便您可以对其进行聚合。收集键和值作为列表。然后通过遍历键列表,您可以使用键从 cassandra 获取数据 有关如何进行水印和应用聚合的完整信息。 可以参考Structured Streaming
【讨论】:
以上是关于如何从 Spark 结构化流中的 Cassandra 等外部存储读取 Kafka 和查询?的主要内容,如果未能解决你的问题,请参考以下文章