Spark / Scala如何编写一个复杂的查询来遍历数据框并添加一列

Posted

技术标签:

【中文标题】Spark / Scala如何编写一个复杂的查询来遍历数据框并添加一列【英文标题】:Spark / Scala how to write a complex query that iterates through a dataframe and adds a column 【发布时间】:2018-03-17 11:47:39 【问题描述】:

我有一个包含以下行的数据框(我们称之为Dataframe 1):

+----------+------------+----------+
|COLLECTION|START_DAY_ID|END_DAY_ID|
+----------+------------+----------+
|  HIVER 19|    20190602|  20191130|
|    ETE 19|    20181202|  20190601|
+----------+------------+----------+

我有一个函数作为输入:start_dateend_date,并返回一个排列在 start_date 和 end_date 之间的日期数组。

这是函数的签名:

def days_in_range(start_day: String, end_day: String, df_calendar: DataFrame): Array[String]

我需要执行的是遍历 Dataframe 1 的每一行,从行中获取 start_date 和 end_date,然后将新值添加到包含 start_date 之间的天数数组的新列(中间天数)和 end_date 通过调用函数。

结果应该是这样的:

+------------+-----------------------+
| COLLECTION |                  days |
+------------+-----------------------+
|   ETE 2019 |[20191226,20191225,...]|
| HIVER 2019 |[20190626,20190825,...]|
+------------+-----------------------+

我尝试了一些方法,例如 WithColumn / map 和 reduce / udf 函数,但主要问题是,例如在 map 函数内部,我使用了另一个导致空指针异常的数据帧(与 udf 相同的问题)。

任何有关如何解决此问题的帮助将不胜感激。

【问题讨论】:

不要使用图片,而是复制文本格式 你为什么提到dataframe2?我觉得不用提了 你说得对,我在实现函数时已经使用了 Dataframe2 你能显示不起作用的代码吗?另外,您需要使用该功能吗?令我震惊的是,这可以作为join 后跟groupBy 来实现。 【参考方案1】:

您可以使用下面的udf 函数在START_DAY_IDEND_DAY_ID 列之间生成字符串日期数组

import org.apache.spark.sql.functions._
def days_in_range = udf((start_day: String, diff:Int)=>
  val format = new SimpleDateFormat("yyyyMMdd")
  val calStart = Calendar.getInstance
  val startDate = calStart.setTime(format.parse(start_day))
  val listBuffer = new ListBuffer[String]
  for(day <- 1 until diff) 
    calStart.add(Calendar.DATE, 1)
    listBuffer.append(format.format(calStart.getTime))
  
  listBuffer
)

diff 整数 是使用dateDiff 内置函数 派生的,同时将udf 函数调用为

df1.select(col("COLLECTION"), days_in_range(col("START_DAY_ID"), datediff(to_date(col("END_DAY_ID"), "yyyyMMdd"), to_date(col("START_DAY_ID"), "yyyyMMdd"))).as("days"))
  .show()

这应该给你

+----------+--------------------+
|COLLECTION|                days|
+----------+--------------------+
|  HIVER_19|[20190903, 201909...|
|    ETE_19|[20181203, 201812...|
+----------+--------------------+

希望回答对你有帮助

【讨论】:

这很有帮助。谢谢! 我很高兴@HelaChikhoui :) 很高兴听到这个答案有帮助

以上是关于Spark / Scala如何编写一个复杂的查询来遍历数据框并添加一列的主要内容,如果未能解决你的问题,请参考以下文章

spark sql - 如何在 spark sql 中编写动态查询

如何将复杂的 Java 类对象作为参数传递给 Spark 中的 Scala UDF?

如何使用 Spark scala 从字符串格式的复杂 JSON 创建数据帧

如何使用 spark/scala 检查是不是存在大查询表

如何在 Scala/Spark 中为数据框中的每一行编写一个 Json 文件并重命名文件

Scala - 如何在 Spark SQL 查询中将日期字符串转换为时间戳?