如何在pyspark中使用第一个和最后一个函数?

Posted

技术标签:

【中文标题】如何在pyspark中使用第一个和最后一个函数?【英文标题】:How to use first and last function in pyspark? 【发布时间】:2017-03-30 09:57:10 【问题描述】:

我使用 first 和 last 函数来获取一列的第一个和最后一个值。但是,我发现这两个功能都不像我想象的那样工作。我提到了answer @zero323,但我仍然对两者感到困惑。代码如下:

df = spark.sparkContext.parallelize([
    ("a", None), ("a", 1), ("a", -1), ("b", 3), ("b", 1)
]).toDF(["k", "v"])
w = Window().partitionBy("k").orderBy('k','v')

df.select(F.col("k"), F.last("v",True).over(w).alias('v')).show()

结果:

+---+----+
|  k|   v|
+---+----+
|  b|   1|
|  b|   3|
|  a|null|
|  a|  -1|
|  a|   1|
+---+----+

我想应该是这样的:

+---+----+
|  k|   v|
+---+----+
|  b|   3|
|  b|   3|
|  a|   1|
|  a|   1|
|  a|   1|
+---+----+

因为,我通过 orderBy 对 'k' 和 'v' 的操作显示了 df:

df.orderBy('k','v').show()
    +---+----+
    |  k|   v|
    +---+----+
    |  a|null|
    |  a|  -1|
    |  a|   1|
    |  b|   1|
    |  b|   3|
    +---+----+

此外,我想出了另一个解决方案来测试这类问题,我的代码如下:

df.orderBy('k','v').groupBy('k').agg(F.first('v')).show()

我发现每次在上面运行后它的结果可能都不一样。有人遇到和我一样的经历吗?我希望在我的项目中使用这两个功能,但我发现这些解决方案都没有定论。

【问题讨论】:

看来你需要f.max 我只是用那些代码来模拟我遇到的情况。在我的项目中,我实际上需要 first 和 last 之类的功能。也许,有一些替代方案。 【参考方案1】:

尝试使用.desc() 反转排序顺序,然后first() 将给出所需的输出。

w2 = Window().partitionBy("k").orderBy(df.v.desc())
df.select(F.col("k"), F.first("v",True).over(w2).alias('v')).show()
F.first("v",True).over(w2).alias('v').show()

输出:

+---+---+
|  k|  v|
+---+---+
|  b|  3|
|  b|  3|
|  a|  1|
|  a|  1|
|  a|  1|
+---+---+

您还应该注意 partitionBy 与 orderBy。由于您按“k”进行分区,因此任何给定窗口中的所有 k 值都是相同的。按“k”排序什么都不做。

就返回窗口中的哪个项目而言,最后一个函数与第一个函数并不真正相反。它返回它看到的最后一个非空值,因为它在有序行中前进。

为了比较它们的效果,这里有一个包含功能/排序组合的数据框。请注意,在“last_w2”列中,空值已被 -1 替换。

df = spark.sparkContext.parallelize([
    ("a", None), ("a", 1), ("a", -1), ("b", 3), ("b", 1)]).toDF(["k", "v"])

#create two windows for comparison.
w = Window().partitionBy("k").orderBy('v')
w2 = Window().partitionBy("k").orderBy(df.v.desc())

df.select('k','v',
   F.first("v",True).over(w).alias('first_w1'),
   F.last("v",True).over(w).alias('last_w1'),
   F.first("v",True).over(w2).alias('first_w2'),
   F.last("v",True).over(w2).alias('last_w2')
).show()

输出:

+---+----+--------+-------+--------+-------+
|  k|   v|first_w1|last_w1|first_w2|last_w2|
+---+----+--------+-------+--------+-------+
|  b|   1|       1|      1|       3|      1|
|  b|   3|       1|      3|       3|      3|
|  a|null|    null|   null|       1|     -1|
|  a|  -1|      -1|     -1|       1|     -1|
|  a|   1|      -1|      1|       1|      1|
+---+----+--------+-------+--------+-------+

【讨论】:

我不明白,为什么last_w2不是1、1、-1、-1、-1? spark 文档说The function is non-deterministic because its results depends on the order of the rows which may be non-deterministic after a shuffle. 这是否意味着每个分区中的第一个已给出并且这可以根据洗牌而改变? spark.apache.org/docs/latest/api/python/reference/api/…【参考方案2】:

看看Question 47130030。 问题不在于 last() 函数,而在于框架,该框架仅包括当前行之前的行。 使用

w = Window().partitionBy("k").orderBy('k','v').rowsBetween(W.unboundedPreceding,W.unboundedFollowing)

将为 first() 和 last() 产生正确的结果。

【讨论】:

以上是关于如何在pyspark中使用第一个和最后一个函数?的主要内容,如果未能解决你的问题,请参考以下文章

从pyspark中的文本文件中删除第一行和最后一行

PySpark - 如何将列表传递给用户定义函数?

如何在PySpark中调用python函数?

快速泛型返回第一个和最后一个元素

php中怎么删除数组的第一个元素和最后一个元素

如何在 QComboBox 中制作 addItem 函数以在第一个 positionfist 中添加最后一个 Item