比较Scala中连续行中的列值
Posted
技术标签:
【中文标题】比较Scala中连续行中的列值【英文标题】:Compare column values in consecutive rows in Scala 【发布时间】:2017-01-03 13:29:36 【问题描述】:我是 Spark Scala 的新手。我遇到了一种情况,我必须比较数据集中特定列的值,例如:
源数据
Source Destination Distance
Austin Houston 200
Dallas Houston 400
Kansas Dallas 700
结果
Source1 Destination1 Distance1 Source2 Destination2 Distance2 DistDiff
Dallas Houston 400 Kansas Dallas 700 300
根据情况,我必须比较后续行的距离,如果差异大于或等于 300,则将记录保存在 Resultant 数据集中 700 - 400 = 300
我遇到的示例是具有在任何特定数据集上按行执行的函数,但我的场景是使用连续行。
【问题讨论】:
Spark RDD/DataFrame 条目没有排序。那么如何维护秩序呢? 我最近做了类似的事情......你可以使用mapPartitions
拼凑一些大部分时间都有效的东西。无论如何,您最终都会在部分边缘丢弃数据。
它将按我未包含在示例中的日期时间排序。
【参考方案1】:
您提到您可以按日期时间对行进行排序。因此,假设它使用sortBy
或sortByKey
进行排序以创建 ordered rdd,并且假设您有偶数行(因此每一行都有另一行来计算差异),您可以:
-
使用
zipWithIndex
为每一行指定一个索引。
通过过滤创建的索引,将 RDD 拆分为两个 RDD,一个具有偶数索引,一个具有奇数索引。
zip
将 RDD 拆分在一起,创建一个新的 Tuple2
的 RDD,其中偶数索引行位于左侧,奇数索引行位于右侧。
map
计算每行左右差的结果。
【讨论】:
如果您愿意接受几天的远程工作,请与我联系。我们需要你拥有的 scala/sparkexpertise 这不是一个完整的答案。首先,这只会计算偶数行和它们前面的奇数行之间的差异。它将不计算奇数行与它们之前的偶数行之间的差异。其次,没有提到 rdd 分区。这很重要,因为如果 rdd 的分区方式不同(至少在 pyspark 中),压缩将不起作用。【参考方案2】:这可以通过以下方式完成:
-
向排序的 rdd 添加索引列
确保 rdd 有偶数行 N
创建一个 rdd rdd_even1 以包含索引为 [0, N-2] 的偶数行
创建一个 rdd rdd_odd1 以包含奇数行 [1, N-1]
创建一个 rdd rdd_even2 以包含偶数行 [2, N-2]
创建一个 rdd rdd_odd2 以包含奇数行 [1, N-3]
现在您需要在压缩之前重新分区 rdd_even1 和 *rdd_odd1,因为如果两个 rdd 在所有分区中的元素数量不同(至少在 pyspark 中),压缩将不起作用。您可以使用 collect 和 parallelize 在内存中执行此操作,但很可能您必须将 rdd 写入 HDFS 并重新读取它们,以控制分区
对 rdd_even2 和 rdd_odd2 执行相同的操作
将第 7 步中的 rdd 压缩到 rdd_zip1
将第 8 步中的 rdd 压缩到 rdd_zip2
调用rdd_zip1.union(rdd_zip2)
现在您可以在联合上调用 map() 以获得具有所需差异的“结果”
祝你好运。
【讨论】:
感谢指正!另请注意,可以使用repartition 直接修改 RDD 的分区。在这种情况下,由于两个偶数/奇数 RDD 的大小相同(您可以填充其中一个以确保这一点),这应该为您提供zip
的匹配分区。以上是关于比较Scala中连续行中的列值的主要内容,如果未能解决你的问题,请参考以下文章