在 spark 中比较数据框中的行,以根据行的比较为列分配值
Posted
技术标签:
【中文标题】在 spark 中比较数据框中的行,以根据行的比较为列分配值【英文标题】:Compare rows in a dataframe in spark, to assign value to a column based on comparison of rows 【发布时间】:2022-01-20 06:23:33 【问题描述】:我的数据如下所示
旅程表
SERNR | TYPE |
---|---|
123 | null |
456 | null |
789 | null |
分段表
SERNR | Sgmnt | FROM-Station | TO-Station |
---|---|---|---|
123 | 01 | A | B |
123 | 02 | B | C |
123 | 03 | C | B |
123 | 04 | B | A |
456 | 01 | A | B |
456 | 02 | B | C |
456 | 03 | C | D |
456 | 04 | D | A |
789 | 04 | A | B |
我想加入这两个数据框/表并检查旅程站FROM
和TO
以确定旅程类型,即如果它的回程某种类型A
如果它的镜像返回某种类型@ 987654326@,如果是单程旅行,请输入C
类型计算如下
假设旅程 SERNR 123,旅程详情是 A->B , B->C, C->B,B->A ,这是一个镜像之旅,因为它的ABC-C然后是CB-A。
对于 789 它的 A->B 所以这是一个正常的旅程。
对于 456 其 A-> B, B->C , C->D , DA, 简称 ABC 然后 CDA,这是回归而非镜像
我真的不知道如何根据SERNR
对Dataframe 中的行进行比较,以通过检查同一SERNR
的FROM
和To
站来确定类型
如果我能得到一个指针来继续执行相同的操作,我真的很感激。
【问题讨论】:
【参考方案1】:您可以将FROM
TO
旅程列表收集到每个SERNR
的数组列中,然后加入数组元素以获得journey_path
(A-B-C...
)。
得到每个旅程的旅程路径后,可以使用when
表达式确定TYPE
:
normal
else : 如果 Journey_path 的反面 == Journey_path 则为mirror
否则为return
注意,在对FROM - TO
s的列表进行分组收集时,需要使用一个Window来保持段的顺序。
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy("SERNR").orderBy("Sgmnt").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
val result = segment_df.select(
col("SERNR"),
array_join(
collect_list(concat_ws("-", col("FROM"), col("TO"))).over(w),
"-"
).alias("journey_path")
).dropDuplicates(Seq("SERNR")).withColumn(
"TYPE",
when(
substring(col("journey_path"), 0, 1) =!= substring(col("journey_path"), -1, 1),
"normal"
).otherwise(
when(
reverse(col("journey_path")) === col("journey_path"),
"mirror"
).otherwise("return")
)
)
.drop("journey_path")
result.show
//+-----+------+
//|SERNR| TYPE|
//+-----+------+
//| 789|normal|
//| 456|return|
//| 123|mirror|
//+-----+------+
【讨论】:
【参考方案2】:使用 from_station 或 to_station 的 cllect_list,将其与 SERNR 分组并按段排序
【讨论】:
以上是关于在 spark 中比较数据框中的行,以根据行的比较为列分配值的主要内容,如果未能解决你的问题,请参考以下文章
如何使用Scala计算Spark中数据框中列的开始索引和结束索引之间的行的平均值?