如何使用新列scala在内存转换中应用窗口函数
Posted
技术标签:
【中文标题】如何使用新列scala在内存转换中应用窗口函数【英文标题】:how to apply window function in memory transformation with new column scala 【发布时间】:2020-10-16 21:48:20 【问题描述】:我有一个数据框,我想将其转换为以下输出,其中每一行 start_duration 和 end_duration 将由前一行 start_duration 和 end_duration 产生,请告诉我如何使用 scala 在 spark 中实现它。
以下是计算 start_duration 和 end_duration 的公式:
start_duration = max(previous end_duration + 1, current date);
end_duration = min(presciption_end date, start_duration + duration – 1)
下面是我的输入数据框:
+--------
--------+-----------+---------+-----------+----------------+----------+--------+----------+----------+
|prescription_uid|patient_uid|ndc |label |dispensation_uid|date |duration|start_date|end_date |
+----------------+-----------+---------+-----------+----------------+----------+--------+----------+----------+
|0 |0 |16714-128|sinvastatin|0 |2015-06-10|30 |2015-06-01|2015-12-01|
|0 |0 |16714-128|sinvastatin|1 |2015-07-15|30 |2015-06-01|2015-12-01|
|0 |0 |16714-128|sinvastatin|2 |2015-08-01|30 |2015-06-01|2015-12-01|
|0 |0 |16714-128|sinvastatin|3 |2015-10-01|30 |2015-06-01|2015-12-01|
+----------------+-----------+---------+-----------+----------------+----------+--------+----------+----------+
预期输出数据帧:
EXPECTED RESULT:
+--------
--------+-----------+---------+-----------+----------------+----------+--------+----------+----------+--------------------+------------------+--------------+------------+
|prescription_uid|patient_uid|ndc |label |dispensation_uid|date |duration|start_date|end_date |first_start_duration|first_end_duration|start_duration|end_duration|
+----------------+-----------+---------+-----------+----------------+----------+--------+----------+----------+--------------------+------------------+--------------+------------+
|0 |0 |16714-128|sinvastatin|0 |2015-06-10|30 |2015-06-01|2015-12-01|2015-06-10 |2015-07-09 |2015-06-10 |2015-07-09 |
|0 |0 |16714-128|sinvastatin|1 |2015-07-15|30 |2015-06-01|2015-12-01|2015-06-10 |2015-07-09 |2015-07-15 |2015-08-13 |
|0 |0 |16714-128|sinvastatin|2 |2015-08-01|30 |2015-06-01|2015-12-01|2015-06-10 |2015-07-09 |2015-08-14 |2015-09-13 |
|0 |0 |16714-128|sinvastatin|3 |2015-10-01|30 |2015-06-01|2015-12-01|2015-06-10 |2015-07-09 |2015-10-01 |2015-10-30 |
+----------------+-----------+---------+-----------+----------------+----------+--------+----------+----------+--------------------+------------------+--------------+------------+
Code tried :
val windowByPatient = Window.partitionBy($"patient_uid").orderBy($"date")
val windowByPatientBeforeCurrentRow = windowByPatient.rowsBetween(Window.unboundedPreceding, -1)
joinedPrDF = joinedPrDF
.withColumn("first_start_duration", firstStartDuration(first($"date").over(windowByPatient), $"start_date"))
.withColumn("first_end_duration", firstEndDuration($"first_start_duration", $"end_date", $"duration"))
.withColumn("start_duration", when(count("*").over(windowByPatient) === 1, $"first_start_duration")
.otherwise(startDurationCalc($"first_start_duration", $"date", $"start_date", coalesce(sum($"duration").over(windowByPatientBeforeCurrentRow), lit("0")))))
.withColumn("end_duration", when(count("*").over(windowByPatient) === 1, $"first_end_duration")
.otherwise(endDurationCalc($"end_date", $"start_duration", $"duration")))
UDF:
val startDurationCalc = udf( (firstStrtDur:java.sql.Date, currentDsDate:java.sql.Date,
prsStartDate:java.sql.Date,duration:Int) =>
println("==="+firstStrtDur+"==="+currentDsDate +"==="+prsStartDate +"==="+duration )
var startDate = java.sql.Date.valueOf(firstStrtDur.toLocalDate.plusDays(duration))
if (startDate.after(currentDsDate))
startDate
else
currentDsDate
: java.sql.Date)
val endDurationCalc = udf( (prsEndDate:java.sql.Date, startDuration:java.sql.Date,duration:Int) =>
println("endDateCalcContRow ==="+prsEndDate+"==="+startDuration +"==="+duration )
val currEndDate = java.sql.Date.valueOf(startDuration.toLocalDate.plusDays(duration-1))
if (currEndDate.before(prsEndDate))
currEndDate
else
prsEndDate
: java.sql.Date)
【问题讨论】:
能否请您提供startDateCalcContRow
和endDateCalcContRow
的代码
两个 udf 的发布代码
【参考方案1】:
您不应该期望窗口函数对数据框中不存在的数据进行计算,而是在执行期间计算(您称其为“内存行中”)。这是不可能的。
您可以尝试不同的方法。根据duration
计算第一个start_duration
中的每个duration
(您可以考虑可能的差距)。
val windowByPatient = Window.partitionBy("patient_uid").orderBy("date")
val windowByPatientBeforeCurrentRow = windowByPatient.rowsBetween(Window.unboundedPreceding, -1)
data
.withColumn("previous_date", lag("date", 1).over(windowByPatient))
.withColumn("diff_from_prev", datediff(col("date"), coalesce(col("previous_date"), col("date"))))
.withColumn("diff_with_duration", when(col("diff_from_prev") >= lag("duration", 1, 0).over(windowByPatient), col("diff_from_prev")).otherwise(col("duration")))
.withColumn("first_date_by_patient", first("date").over(windowByPatient))
.withColumn("duration_from_first_with_gaps", col("diff_with_duration") + coalesce(sum("diff_from_prev").over(windowByPatientBeforeCurrentRow), lit("0")))
.withColumn("start_duration", expr("date_add(first_date_by_patient, duration_from_first_with_gaps)"))
.withColumn("end_duration", expr("date_add(start_duration, duration - 1)"))
.select((data.columns ++ Seq("start_duration", "end_duration")).map(col): _*)
.show()
date_add
被包裹在expr
中,因为它以Int
作为第二个参数,但可以与sql 上下文中的列一起使用。
【讨论】:
是的,end_duration 等于“start_duration + duration -1”,我用date_add(start_duration, diff_in_days)
计算它,其中 diff_in_days = col("duration") - 1
最后一行的示例数据结果应该是:start_duration = 2015-09-08, end_duration = 2015-10-07
@SanjeevThakur 我已经发布了我的解决方案的输出,我认为这正是你所期望的
如果中间有间隙,则将持续时间添加到第一次开始持续时间将不起作用。
gap是什么意思,能发个例子吗?【参考方案2】:
以下是使用滞后窗口函数计算上一个持续时间和上一个分配日期的最终开始持续时间计算器:
val startDurationCalc = udf((currentDsDate: java.sql.Date, prevDsDate: java.sql.Date, prevDuration: Int, prsEndDate: java.sql.Date,
firstStrtDur:java.sql.Date,acDuration:Int) =>
println("startDurationCalc===currentDsDate===" + currentDsDate + "===prevDsDate===" + prevDsDate +
"===prevDuration===" + prevDuration +"===prsEndDate==="+prsEndDate+"===firstStrtDur=="+firstStrtDur+"===acDuration==="+acDuration)
val prevDurStartDate = prevDsDate.toLocalDate.plusDays(prevDuration - 1)
var derivedDsDate = java.sql.Date.valueOf(prevDurStartDate.plusDays(1))
val accumulatedDSDate = java.sql.Date.valueOf(firstStrtDur.toLocalDate.plusDays(acDuration))
if (derivedDsDate.before(accumulatedDSDate))
derivedDsDate = accumulatedDSDate
if (derivedDsDate.after(prsEndDate))
val derPrsEndDate = java.sql.Date.valueOf(prsEndDate.toLocalDate.plusDays(1))
derPrsEndDate
else
if (currentDsDate.after(derivedDsDate))
currentDsDate
else
derivedDsDate
: java.sql.Date).asNondeterministic()
【讨论】:
以上是关于如何使用新列scala在内存转换中应用窗口函数的主要内容,如果未能解决你的问题,请参考以下文章
如何在窗口 scala/spark 中使用 partitionBy 函数