在 spark 中比较数据框中的行,以根据行的比较为列分配值

Posted

技术标签:

【中文标题】在 spark 中比较数据框中的行,以根据行的比较为列分配值【英文标题】:Compare rows in a dataframe in spark, to assign value to a column based on comparison of rows 【发布时间】:2022-01-20 06:23:33 【问题描述】:

我的数据如下所示

旅程表

SERNR TYPE
123 null
456 null
789 null

分段表

SERNR Sgmnt FROM-Station TO-Station
123 01 A B
123 02 B C
123 03 C B
123 04 B A
456 01 A B
456 02 B C
456 03 C D
456 04 D A
789 04 A B

我想加入这两个数据框/表并检查旅程站FROMTO以确定旅程类型,即如果它的回程某种类型A如果它的镜像返回某种类型@ 987654326@,如果是单程旅行,请输入C

类型计算如下

假设旅程 SERNR 123,旅程详情是 A->B , B->C, C->B,B->A ,这是一个镜像之旅,因为它的ABC-C然后是CB-A

对于 789 它的 A->B 所以这是一个正常的旅程。

对于 456A-> B, B->C , C->D , DA, 简称 ABC 然后 CDA,这是回归而非镜像

我真的不知道如何根据SERNR 对Dataframe 中的行进行比较,以通过检查同一SERNRFROMTo 站来确定类型

如果我能得到一个指针来继续执行相同的操作,我真的很感激。

【问题讨论】:

【参考方案1】:

您可以将FROMTO 旅程列表收集到每个SERNR 的数组列中,然后加入数组元素以获得journey_path (A-B-C...)。

得到每个旅程的旅程路径后,可以使用when表达式确定TYPE

如果第一个 FROM != 最后一个 TO 那么它是 normal else : 如果 Journey_path 的反面 == Journey_path 则为mirror 否则为return

注意,在对FROM - TOs的列表进行分组收集时,需要使用一个Window来保持段的顺序。

import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy("SERNR").orderBy("Sgmnt").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

val result = segment_df.select(
    col("SERNR"),
    array_join(
      collect_list(concat_ws("-", col("FROM"), col("TO"))).over(w),
      "-"
    ).alias("journey_path")
  ).dropDuplicates(Seq("SERNR")).withColumn(
    "TYPE",
    when(
      substring(col("journey_path"), 0, 1) =!= substring(col("journey_path"), -1, 1),
      "normal"
    ).otherwise(
      when(
        reverse(col("journey_path")) === col("journey_path"),
        "mirror"
      ).otherwise("return")
    )
  )
  .drop("journey_path")

result.show
//+-----+------+
//|SERNR|  TYPE|
//+-----+------+
//|  789|normal|
//|  456|return|
//|  123|mirror|
//+-----+------+

【讨论】:

【参考方案2】:

使用 from_station 或 to_station 的 cllect_list,将其与 SERNR 分组并按段排序

【讨论】:

以上是关于在 spark 中比较数据框中的行,以根据行的比较为列分配值的主要内容,如果未能解决你的问题,请参考以下文章

比较熊猫数据框中的行值

如何使用Scala计算Spark中数据框中列的开始索引和结束索引之间的行的平均值?

如何根据 R 中的行标记数据框中的所有变量

对 Spark 数据框中的行进行洗牌

如何在 PySpark 中的大型 Spark 数据框中对行的每个子集进行映射操作

如果所有行的列中只有一个值,则折叠 Pandas 数据框中的行