如何从 kafka 中的两个生产者那里摄取数据并使用 Spark 结构化流加入?
Posted
技术标签:
【中文标题】如何从 kafka 中的两个生产者那里摄取数据并使用 Spark 结构化流加入?【英文标题】:How to ingest data from two producers in kafka and join using spark structured streaming? 【发布时间】:2020-05-16 17:16:25 【问题描述】:我正在尝试从两个 kafka
主题中读取数据,但我无法加入并找到最终数据框。
我的 kafka 主题是 CSVStreamRetail 和 OrderItems。
val spark = SparkSession
.builder
.appName("Spark-Stream-Example")
.master("local[*]")
.config("spark.sql.warehouse.dir", "file:///C:/temp")
.getOrCreate()
val ordersSchema = new StructType()
.add("order_id", IntegerType)
.add("order_date", StringType)
.add("order_customer_id", IntegerType)
.add("order_status", StringType)
val orderItemsSchema = new StructType()
.add("order_item_id",IntegerType)
.add("order_item_order_id",IntegerType)
.add("order_item_product_id",IntegerType)
.add("order_item_quantity",IntegerType)
.add("order_item_subtotal",DoubleType)
.add("order_item_product_price", DoubleType)
import spark.implicits._
val df1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "CSVStreamRetail")
.load()
val df2 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "OrderItems")
.load()
val ordersDF = df1.selectExpr("CAST(value AS STRING)", "CAST(timestamp as TIMESTAMP)").as[(String,Timestamp)]
.select(from_json($"value", ordersSchema).as("orders_data"),$"timestamp")
.select("orders_data.*","timestamp")
val orderItemsDF = df2.selectExpr("CAST(value as STRING)", "CAST(timestamp as TIMESTAMP)").as[(String,Timestamp)]
.select(from_json($"value",orderItemsSchema).as("order_items_data"),$"timestamp")
.select("order_items_data.*","timestamp")
val finalDF = orderItemsDF.join(ordersDF, orderItemsDF("order_item_order_id")===ordersDF("order_id"))
finalDF
.writeStream
.format("console")
.option("truncate", "false")
.start()
.awaitTermination()
我收到的输出是一个空数据帧。
【问题讨论】:
【参考方案1】:首先请检查您是否在kafka
主题中接收数据。
在流-流连接的情况下,您应该始终至少在一个流中提供水印。我看到您想要执行内部联接。
所以我添加了200 seconds
水印,现在它在输出数据框中显示数据。
val spark = SparkSession
.builder
.appName("Spark-Stream-Example")
.master("local[*]")
.config("spark.sql.warehouse.dir", "file:///C:/temp")
.getOrCreate()
val ordersSchema = new StructType()
.add("order_id", IntegerType)
.add("order_date", StringType)
.add("order_customer_id", IntegerType)
.add("order_status", StringType)
val orderItemsSchema = new StructType()
.add("order_item_id",IntegerType)
.add("order_item_order_id",IntegerType)
.add("order_item_product_id",IntegerType)
.add("order_item_quantity",IntegerType)
.add("order_item_subtotal",DoubleType)
.add("order_item_product_price", DoubleType)
import spark.implicits._
val df1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "CSVStreamRetail")
.load()
val df2 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "OrderItems")
.load()
val ordersDF = df1.selectExpr("CAST(value AS STRING)", "CAST(timestamp as TIMESTAMP)").as[(String,Timestamp)]
.select(from_json($"value", ordersSchema).as("orders_data"),$"timestamp")
.select("orders_data.*","timestamp")
.withWatermark("timestamp","200 seconds")
val orderItemsDF = df2.selectExpr("CAST(value as STRING)", "CAST(timestamp as TIMESTAMP)").as[(String,Timestamp)]
.select(from_json($"value",orderItemsSchema).as("order_items_data"),$"timestamp")
.select("order_items_data.*","timestamp")
.withWatermark("timestamp","200 seconds")
val finalDF = orderItemsDF.join(ordersDF, orderItemsDF("order_item_order_id")===ordersDF("order_id"))
finalDF
.writeStream
.format("console")
.option("truncate", "false")
.start()
.awaitTermination()
使用 eventTimestamp 加入。 让我知道这是否有帮助。
【讨论】:
我知道加入两个流数据集水印和时间限制是可选的。如果未指定水印和时间约束,则数据将无限期地存储在状态中。在两侧设置水印和时间限制将相应地启用状态清理。 @SambhavKumar 谢谢。它现在正在工作。我现在还要指定一个基于时间戳的选择表达式。以上是关于如何从 kafka 中的两个生产者那里摄取数据并使用 Spark 结构化流加入?的主要内容,如果未能解决你的问题,请参考以下文章
Druid Kafka 摄取(imply-2.2.3):kafka 错误 NoReplicaOnlineException
Apache Druid 数据摄取---本地数据和kafka流式数据