比较Scala中连续行中的列值

Posted

技术标签:

【中文标题】比较Scala中连续行中的列值【英文标题】:Compare column values in consecutive rows in Scala 【发布时间】:2017-01-03 13:29:36 【问题描述】:

我是 Spark Scala 的新手。我遇到了一种情况,我必须比较数据集中特定列的值,例如:

源数据

Source  Destination Distance
Austin  Houston     200
Dallas  Houston     400
Kansas  Dallas      700 

结果

Source1  Destination1   Distance1   Source2  Destination2 Distance2  DistDiff     
Dallas   Houston        400         Kansas   Dallas       700        300

根据情况,我必须比较后续行的距离,如果差异大于或等于 300,则将记录保存在 Resultant 数据集中 700 - 400 = 300

我遇到的示例是具有在任何特定数据集上按行执行的函数,但我的场景是使用连续行。

【问题讨论】:

Spark RDD/DataFrame 条目没有排序。那么如何维护秩序呢? 我最近做了类似的事情......你可以使用mapPartitions 拼凑一些大部分时间都有效的东西。无论如何,您最终都会在部分边缘丢弃数据。 它将按我未包含在示例中的日期时间排序。 【参考方案1】:

您提到您可以按日期时间对行进行排序。因此,假设它使用sortBysortByKey 进行排序以创建 ordered rdd,并且假设您有偶数行(因此每一行都有另一行来计算差异),您可以:

    使用zipWithIndex为每一行指定一个索引。 通过过滤创建的索引,将 RDD 拆分为两个 RDD,一个具有偶数索引,一个具有奇数索引。 zip 将 RDD 拆分在一起,创建一个新的 Tuple2 的 RDD,其中偶数索引行位于左侧,奇数索引行位于右侧。 map计算每行左右差的结果。

【讨论】:

如果您愿意接受几天的远程工作,请与我联系。我们需要你拥有的 scala/sparkexpertise 这不是一个完整的答案。首先,这只会计算偶数行和它们前面的奇数行之间的差异。它将计算奇数行与它们之前的偶数行之间的差异。其次,没有提到 rdd 分区。这很重要,因为如果 rdd 的分区方式不同(至少在 pyspark 中),压缩将不起作用。【参考方案2】:

这可以通过以下方式完成:

    向排序的 rdd 添加索引列 确保 rdd 有偶数行 N 创建一个 rdd rdd_even1 以包含索引为 [0, N-2] 的偶数行 创建一个 rdd rdd_odd1 以包含奇数行 [1, N-1] 创建一个 rdd rdd_even2 以包含偶数行 [2, N-2] 创建一个 rdd rdd_odd2 以包含奇数行 [1, N-3] 现在您需要在压缩之前重新分区 rdd_even1 和 *rdd_odd1,因为如果两个 rdd 在所有分区中的元素数量不同(至少在 pyspark 中),压缩将不起作用。您可以使用 collectparallelize 在内存中执行此操作,但很可能您必须将 rdd 写入 HDFS 并重新读取它们,以控制分区 对 rdd_even2rdd_odd2 执行相同的操作 将第 7 步中的 rdd 压缩到 rdd_zip1 将第 8 步中的 rdd 压缩到 rdd_zip2 调用rdd_zip1.union(rdd_zip2) 现在您可以在联合上调用 map() 以获得具有所需差异的“结果”

祝你好运。

【讨论】:

感谢指正!另请注意,可以使用repartition 直接修改 RDD 的分区。在这种情况下,由于两个偶数/奇数 RDD 的大小相同(您可以填充其中一个以确保这一点),这应该为您提供zip 的匹配分区。

以上是关于比较Scala中连续行中的列值的主要内容,如果未能解决你的问题,请参考以下文章

根据火花数据框scala中的列值过滤行

使用 Scala 在以 Spark 中的列值为条件的广播 Map 上执行查找

选择最后一组连续行中的第一行

添加一列,这是熊猫中连续行差异的结果

查询以计算Mysql中连续行中距离(经度,纬度)的总和

将包含列表的记录值与 Postgres 中的列值进行比较