如何从 kafka 中的两个生产者那里摄取数据并使用 Spark 结构化流加入?

Posted

技术标签:

【中文标题】如何从 kafka 中的两个生产者那里摄取数据并使用 Spark 结构化流加入?【英文标题】:How to ingest data from two producers in kafka and join using spark structured streaming? 【发布时间】:2020-05-16 17:16:25 【问题描述】:

我正在尝试从两个 kafka 主题中读取数据,但我无法加入并找到最终数据框。 我的 kafka 主题是 CSVStreamRetail 和 OrderItems。

val spark = SparkSession
      .builder
      .appName("Spark-Stream-Example")
      .master("local[*]")
      .config("spark.sql.warehouse.dir", "file:///C:/temp")
      .getOrCreate()

    val ordersSchema = new StructType()
      .add("order_id", IntegerType)
      .add("order_date", StringType)
      .add("order_customer_id", IntegerType)
      .add("order_status", StringType)

    val orderItemsSchema = new StructType()
      .add("order_item_id",IntegerType)
      .add("order_item_order_id",IntegerType)
      .add("order_item_product_id",IntegerType)
      .add("order_item_quantity",IntegerType)
      .add("order_item_subtotal",DoubleType)
      .add("order_item_product_price", DoubleType)

    import spark.implicits._

    val df1 = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "CSVStreamRetail")
      .load()

    val df2 = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "OrderItems")
      .load()

    val ordersDF = df1.selectExpr("CAST(value AS STRING)", "CAST(timestamp as TIMESTAMP)").as[(String,Timestamp)]
      .select(from_json($"value", ordersSchema).as("orders_data"),$"timestamp")
      .select("orders_data.*","timestamp")

    val orderItemsDF = df2.selectExpr("CAST(value as STRING)", "CAST(timestamp as TIMESTAMP)").as[(String,Timestamp)]
      .select(from_json($"value",orderItemsSchema).as("order_items_data"),$"timestamp")
      .select("order_items_data.*","timestamp")

    val finalDF = orderItemsDF.join(ordersDF, orderItemsDF("order_item_order_id")===ordersDF("order_id"))

    finalDF
      .writeStream
      .format("console")
      .option("truncate", "false")
      .start()
      .awaitTermination()

我收到的输出是一个空数据帧。

【问题讨论】:

【参考方案1】:

首先请检查您是否在kafka 主题中接收数据。 在流-流连接的情况下,您应该始终至少在一个流中提供水印。我看到您想要执行内部联接。 所以我添加了200 seconds 水印,现在它在输出数据框中显示数据。

val spark = SparkSession
  .builder
  .appName("Spark-Stream-Example")
  .master("local[*]")
  .config("spark.sql.warehouse.dir", "file:///C:/temp")
  .getOrCreate()

val ordersSchema = new StructType()
  .add("order_id", IntegerType)
  .add("order_date", StringType)
  .add("order_customer_id", IntegerType)
  .add("order_status", StringType)

val orderItemsSchema = new StructType()
  .add("order_item_id",IntegerType)
  .add("order_item_order_id",IntegerType)
  .add("order_item_product_id",IntegerType)
  .add("order_item_quantity",IntegerType)
  .add("order_item_subtotal",DoubleType)
  .add("order_item_product_price", DoubleType)

import spark.implicits._

val df1 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "CSVStreamRetail")
  .load()

val df2 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "OrderItems")
  .load()

val ordersDF = df1.selectExpr("CAST(value AS STRING)", "CAST(timestamp as TIMESTAMP)").as[(String,Timestamp)]
  .select(from_json($"value", ordersSchema).as("orders_data"),$"timestamp")
  .select("orders_data.*","timestamp")
  .withWatermark("timestamp","200 seconds")

val orderItemsDF = df2.selectExpr("CAST(value as STRING)", "CAST(timestamp as TIMESTAMP)").as[(String,Timestamp)]
  .select(from_json($"value",orderItemsSchema).as("order_items_data"),$"timestamp")
  .select("order_items_data.*","timestamp")
  .withWatermark("timestamp","200 seconds")

val finalDF = orderItemsDF.join(ordersDF, orderItemsDF("order_item_order_id")===ordersDF("order_id"))

finalDF
  .writeStream
  .format("console")
  .option("truncate", "false")
  .start()
  .awaitTermination()

使用 eventTimestamp 加入。 让我知道这是否有帮助。

【讨论】:

我知道加入两个流数据集水印和时间限制是可选的。如果未指定水印和时间约束,则数据将无限期地存储在状态中。在两侧设置水印和时间限制将相应地启用状态清理。 @SambhavKumar 谢谢。它现在正在工作。我现在还要指定一个基于时间戳的选择表达式。

以上是关于如何从 kafka 中的两个生产者那里摄取数据并使用 Spark 结构化流加入?的主要内容,如果未能解决你的问题,请参考以下文章

kafka专栏如何保证数据生产不重不漏的高可靠性

Druid Kafka 摄取(imply-2.2.3):kafka 错误 NoReplicaOnlineException

Apache Druid 数据摄取---本地数据和kafka流式数据

Oracle 到 Hadoop 数据的实时摄取

如何从同一生产者向不同的 Kafka 主题和模式注册表生成消息

Kafka 是如何实现事务的