Pysparkling 2 从 1 重置 monotonically_increasing_id
Posted
技术标签:
【中文标题】Pysparkling 2 从 1 重置 monotonically_increasing_id【英文标题】:Pysparkling 2 reset monotonically_increasing_id from 1 【发布时间】:2017-09-11 22:39:16 【问题描述】:我想将 spark 数据帧分成两部分,并为每个子数据帧定义行号。但我发现函数 monotonically_increasing_id 仍然会从原始数据帧中定义行号。
这是我在 python 中所做的:
# df is the original sparkframe
splits = df.randomSplit([7.0,3.0],400)
# add column rowid for the two subframes
set1 = splits[0].withColumn("rowid", monotonically_increasing_id())
set2 = splits[1].withColumn("rowid", monotonically_increasing_id())
# check the results
set1.select("rowid").show()
set2.select("rowid").show()
我希望两帧的 rowid 的前五个元素都是 1 到 5(或 0 到 4,记不清了):
set1: 1 2 3 4 5
set2: 1 2 3 4 5
但我实际得到的是:
set1: 1 3 4 7 9
set2: 2 5 6 8 10
这两个子帧的行 id 实际上是它们在原始 sparkframe df 中的行 id 而不是新的。
作为火花的新手,我正在寻求有关为什么会发生这种情况以及如何解决它的帮助。
【问题讨论】:
【参考方案1】:首先,您使用的是什么版本的 Spark? monotonically_increasing_id
方法的实现已经改变了几次。我可以在 Spark 2.0 中重现您的问题,但在 spark 2.2 中的行为似乎有所不同。所以这可能是在较新的 spark 版本中修复的错误。
话虽如此,您应该不期望monotonically_increasing_id
产生的值会连续增加。在您的代码中,我相信数据框只有一个分区。根据http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html
生成的 ID 保证单调递增且 唯一的,但不是连续的。 当前的实现将 高 31 位的分区 ID,以及每个分区内的记录号 分区在低 33 位。假设是数据框 少于10亿个分区,每个分区少于8个 十亿条记录。
例如,考虑一个有两个分区的 DataFrame,每个分区有 3 个 记录。此表达式将返回以下 ID:0、1、2、 8589934592 (1L
因此,如果您的代码不应该期望 rowid 连续增加。
此外,您还应该考虑缓存场景和故障场景。即使 monotonically_increase_id 像你期望的那样工作——连续增加值,它仍然不起作用。如果您的节点发生故障怎么办?故障节点上的分区将从源或最后一个缓存/检查点重新生成,它们可能具有不同的顺序,因此具有不同的 rowid。逐出缓存也会导致问题。假设在生成数据帧并将其缓存到内存之后。如果它被逐出内存怎么办。未来的行动将尝试再次重新生成数据帧,从而给出不同的 rowid。
【讨论】:
谢谢兰。我正在使用火花 2.0。您对标记 rowid 连续增加有什么建议吗? 我想不出适用于所有场景的解决方案。假设 monotonically_increase_id 像你期望的那样工作——连续增加,它仍然对你不起作用。为什么?如果您的节点发生故障怎么办?故障节点上的分区将从源或最后一个缓存/检查点重新生成,它们可能具有不同的顺序,因此具有不同的 rowid。失败场景很少见。不过,另一种情况更为常见。假设在生成数据帧并将其缓存到内存之后。如果它被逐出内存怎么办。未来的操作将尝试再次重新生成数据帧。以上是关于Pysparkling 2 从 1 重置 monotonically_increasing_id的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Windows 上构建 Mono 3.4.0/3.4.1
合并两个 Mono 并获得一个 Flux。然后从那个 Flux 中提取一个 Mono
Mono SVN最新代码或者Mono 1.2.5 支持IronPython 2.0