如何在 Spark 结构化流式连接中选择最新记录

Posted

技术标签:

【中文标题】如何在 Spark 结构化流式连接中选择最新记录【英文标题】:How to pick latest record in spark structured streaming join 【发布时间】:2020-01-03 14:04:25 【问题描述】:

我使用 spark-sql 2.4.x 版本,datastax-spark-cassandra-connector 用于 Cassandra-3.x 版本。和卡夫卡一起。

我有如下货币样本的汇率元数据:

val ratesMetaDataDf = Seq(
     ("EUR","5/10/2019","1.130657","USD"),
     ("EUR","5/9/2019","1.13088","USD")
     ).toDF("base_code", "rate_date","rate_value","target_code")
.withColumn("rate_date", to_date($"rate_date" ,"MM/dd/yyyy").cast(DateType))
.withColumn("rate_value", $"rate_value".cast(DoubleType))

我从 kafka 主题收到的销售记录如下(示例) :

val kafkaDf = Seq((15,2016, 4, 100.5,"USD","2021-01-20","EUR",221.4)
                                ).toDF("companyId", "year","quarter","sales","code","calc_date","c_code","prev_sales")

要计算 "prev_sales" ,我需要获取它的 "c_code" 各自的 "rate_value",它最接近 "calc_date",即 rate_date"

我在做什么如下

val w2 = Window.orderBy(col("rate_date") desc)
val rateJoinResultDf = kafkaDf.as("k").join(ratesMetaDataDf.as("e"))
                                   .where( ($"k.c_code" === $"e.base_code") &&
                                           ($"rate_date" < $"calc_date")
                                         ).orderBy($"rate_date" desc)
                                  .withColumn("row",row_number.over(w2))
                                  .where($"row" === 1).drop("row")
                                  .withColumn("prev_sales", (col("prev_sales") * col("rate_value")).cast(DoubleType))
                                  .select("companyId", "year","quarter","sales","code","calc_date","prev_sales")

在上面为给定的“rate_date”获取最近的记录(即“5/10/2019”来自 ratesMetaDataDf)我使用 window 和 row_number 函数并按“desc”对记录进行排序。

但是在 spark-sql 流中它会导致如下错误

"
Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode;;"

那么如何获取第一条记录加入到上面。

【问题讨论】:

【参考方案1】:

用下面的代码替换你最后的代码部分。此代码将执行left join 并计算日期差异calc_daterate_date。接下来Window 函数我们将选择最近的日期并使用与您相同的计算来计算prev_sales

请注意我添加了一个过滤条件filter(col("diff") &gt;=0), 它将处理calc_date &lt; rate_date 的情况。我加了几个 更多记录,以便更好地了解此案例。

scala> ratesMetaDataDf.show
+---------+----------+----------+-----------+
|base_code| rate_date|rate_value|target_code|
+---------+----------+----------+-----------+
|      EUR|2019-05-10|  1.130657|        USD|
|      EUR|2019-05-09|   1.12088|        USD|
|      EUR|2019-12-20|    1.1584|        USD|
+---------+----------+----------+-----------+


scala> kafkaDf.show
+---------+----+-------+-----+----+----------+------+----------+
|companyId|year|quarter|sales|code| calc_date|c_code|prev_sales|
+---------+----+-------+-----+----+----------+------+----------+
|       15|2016|      4|100.5| USD|2021-01-20|   EUR|     221.4|
|       15|2016|      4|100.5| USD|2019-06-20|   EUR|     221.4|
+---------+----+-------+-----+----+----------+------+----------+


scala>  val W = Window.partitionBy("companyId","year","quarter","sales","code","calc_date","c_code","prev_sales").orderBy(col("diff"))

scala>   val rateJoinResultDf= kafkaDf.alias("k").join(ratesMetaDataDf.alias("r"), col("k.c_code") === col("r.base_code"), "left")
                                         .withColumn("diff",datediff(col("calc_date"), col("rate_date")))
                                         .filter(col("diff") >= 0)
                                         .withColumn("closedate", row_number.over(W))
                                         .filter(col("closedate") === 1)
                                         .drop("diff", "closedate")
                                         .withColumn("prev_sales", (col("prev_sales") * col("rate_value")).cast("Decimal(14,5)"))
                                         .select("companyId", "year","quarter","sales","code","calc_date","prev_sales")

scala> rateJoinResultDf.show
+---------+----+-------+-----+----+----------+----------+
|companyId|year|quarter|sales|code| calc_date|prev_sales|
+---------+----+-------+-----+----+----------+----------+
|       15|2016|      4|100.5| USD|2021-01-20| 256.46976|
|       15|2016|      4|100.5| USD|2019-06-20| 250.32746|
+---------+----+-------+-----+----+----------+----------+ 

【讨论】:

我在 KafkaDF 的所有列上都使用了分区。当我们将它与 ratesMetaDataDf 连接时,可能会有一对多连接,在这种情况下,我们将为 KafkaDF 的单个特定行提供多条记录,并且需要选择具有最接近日期的单行记录。如果我使用特定列,那么 KafkaDF 的单行就不可能存在唯一性。在我的 KafkaDF 中,如果我在分区中跳过“calc_date”,那么 Windows 将给出两条记录的单条记录。 您可以在 Join like year 中添加更多列。这将提供更少的数据。我可以将 min 用于关闭日期,但最后我们必须检查所有日期差异的最小值。所以如果我使用最小或行号是一样的。我尝试删除分区并使用 groupBy 但问题是我可以对所有列 companyId、year、quarter、sales、code、calc_date、c_code、prev_sales 进行 groupBy,因为 prev_sales 值在计算后会有所不同。如果我稍后进行计算,则 rate_value 列将不可用。如果可能的话,我仍在寻找任何优化,如果有什么我会更新 其他解决方案您可以尝试将流数据写入暂存表并使用另一个火花作业对其进行操作。这取决于您在流媒体中获得了多少数据以及在ratesMetaDataDf 中获得了多少数据 我想弄清楚我在回答中使用的案例,这可能是场景吗?我的意思是你会在 calc_date 中得到过去的日期吗?他们将以与 Meta DF 不同的日期加入。就像 calc_date 2021-01-20 最接近的日期是 2019-12-20 和 2019-06-20 最接近的日期是 2019-05-10。如果这是正确的场景,那么将很难解决这个问题,如果不是,那么您可以先从 Meta 表中选择最新日期,然后将其与不需要任何 snuffle(通过 paritionBy 或 GroupBy)的 KafkaDF 加入。这会更快 将 KafkaDF 分成两部分,如 df = kafkadf.filter(col("A") === col("B")) 和 df_1 = kafkadf.filter(col("A") = != col(“B”))。在它只加入 df_1 并生成 df_1_join 之后。在最终合并 df 和 df_1_join 使用 Union

以上是关于如何在 Spark 结构化流式连接中选择最新记录的主要内容,如果未能解决你的问题,请参考以下文章

Spark 结构化流式蓝/绿部署

从多个 Kafka 主题读取的 Spark 结构化流式应用程序

Spark Structured Streaming - 如何按最新和聚合计数进行重复数据删除

Spark:如何使用 RowEncoder 创建流式数据集?

从 spark 数据框中选择最新记录

带有广播连接的 Spark 流式传输