如何在火花中处理这个

Posted

技术标签:

【中文标题】如何在火花中处理这个【英文标题】:how to handle this in spark 【发布时间】:2020-01-03 10:41:27 【问题描述】:

我正在使用 spark-sql 2.4.x 版本,datastax-spark-cassandra-connector 用于 Cassandra-3.x 版本。和卡夫卡一起。

我有一个来自 kafka 主题的财务数据的场景。 data(基础数据集)包含 companyId、year、prev_year 字段信息。

如果列 year === prev_year 那么我需要加入不同的表,即 exchange_rates。

如果列 year =!= prev_year 那么我需要返回基本数据集本身

如何在 spark-sql 中做到这一点?

【问题讨论】:

这个问题与***.com/questions/59579922/… 不同吗? 【参考方案1】:

您可以针对您的情况参考以下方法。

scala> Input_df.show
+---------+----+---------+----+
|companyId|year|prev_year|rate|
+---------+----+---------+----+
|        1|2016|     2017|  12|
|        1|2017|     2017|21.4|
|        2|2018|     2017|11.7|
|        2|2018|     2018|44.6|
|        3|2016|     2017|34.5|
|        4|2017|     2017|  56|
+---------+----+---------+----+


scala> exch_rates.show
+---------+----+
|companyId|rate|
+---------+----+
|        1|12.3|
|        2|12.5|
|        3|22.3|
|        4|34.6|
|        5|45.2|
+---------+----+


scala> val equaldf = Input_df.filter(col("year") === col("prev_year"))

scala> val notequaldf = Input_df.filter(col("year") =!= col("prev_year"))

scala> val joindf  = notequaldf.alias("n").drop("rate").join(exch_rates.alias("e"), List("companyId"), "left")

scala> val finalDF = equaldf.union(joindf)

scala> finalDF.show()
+---------+----+---------+----+
|companyId|year|prev_year|rate|
+---------+----+---------+----+
|        1|2017|     2017|21.4|
|        2|2018|     2018|44.6|
|        4|2017|     2017|  56|
|        1|2016|     2017|12.3|
|        2|2018|     2017|12.5|
|        3|2016|     2017|22.3|
+---------+----+---------+----+

【讨论】:

@BdEngineer SOF 是什么意思? @BdEngineer 在这里查看我的答案***.com/questions/59669880/…

以上是关于如何在火花中处理这个的主要内容,如果未能解决你的问题,请参考以下文章

如何在火花中处理 Integer.MAX_VALUE? [关闭]

文件压缩格式如何影响我的火花处理

如何使用火花流处理实时流数据/日志?

如何将火花流 DF 写入 Kafka 主题

如何在并行火花中运行转换

如何设置一个空结构,所有字段为空,在火花中为空