Spark将多组行过滤为单行
Posted
技术标签:
【中文标题】Spark将多组行过滤为单行【英文标题】:Spark filter multiple group of rows to a single row 【发布时间】:2018-05-15 16:46:17 【问题描述】:我正在努力实现以下目标,
假设我有一个包含以下列的数据框
id | name | alias
-------------------
1 | abc | short
1 | abc | ailas-long-1
1 | abc | another-long-alias
2 | xyz | short_alias
2 | xyz | same_length
3 | def | alias_1
我想按 id 和 name 分组并选择较短的别名,
我期待的输出是
id | name | alias
-------------------
1 | abc | short
2 | xyz | short_alias
3 | def | alias_1
我可以使用 window 和 row_number 来实现这一点,有没有其他有效的方法可以获得相同的结果。一般来说,第三列过滤条件可以是任何东西,在这种情况下是字段的长度。
任何帮助将不胜感激。
谢谢。
【问题讨论】:
查看您预期的数据框,似乎简单的过滤器就可以解决问题。df.filter(df['alias'] == 'short-alias')
不,这些是样本,可以有任何值。我将编辑问题以使其清楚
【参考方案1】:
您需要做的就是使用length
内置函数并在window
函数中使用它
from pyspark.sql import functions as f
from pyspark.sql import Window
windowSpec = Window.partitionBy('id', 'name').orderBy('length')
df.withColumn('length', f.length('alias'))\
.withColumn('length', f.row_number().over(windowSpec))\
.filter(f.col('length') == 1)\
.drop('length')\
.show(truncate=False)
这应该给你
+---+----+-----------+
|id |name|alias |
+---+----+-----------+
|3 |def |alias_1 |
|1 |abc |short |
|2 |xyz |short_alias|
+---+----+-----------+
【讨论】:
上面代码中的window有什么用,我看不到它的用法。 谢谢,我使用了你之前提到的使用 row_number 的方法,因为我需要最后一列的长度最短。如果我先使用,它不会确保它总是较短的长度。我想知道是否还有其他方法。 是的,你是绝对正确的@Murali。又是我的错。我已经更新了我的答案,供您接受和投票。 :) 谢谢。我认为这是我心目中最好的方法。【参考方案2】:没有窗口的解决方案(不是很漂亮..),我认为最简单的 rdd 解决方案:
from pyspark.sql import functions as F
from pyspark.sql import HiveContext
hiveCtx = HiveContext(sc)
rdd = sc.parallelize([(1 , "abc" , "short-alias"),
(1 , "abc" , "short"),
(1 , "abc" , "ailas-long-1"),
(1 , "abc" , "another-long-alias"),
(2 , "xyz" , "same_length"),
(2 , "xyz" , "same_length1"),
(3 , "def" , "short_alias") ])
df = hiveCtx.createDataFrame(\
rdd, ["id", "name", "alias"])
len_df = df.groupBy(["id", "name"]).agg(F.min(F.length("alias")).alias("alias_len"))
df = df.withColumn("alias_len", F.length("alias"))
cond = ["alias_len", "id", "name"]
df.join(len_df, cond).show()
print rdd.map(lambda x: ((x[0], x[1]), x[2]))\
.reduceByKey(lambda x,y: x if len(x) < len(y) else y ).collect()
输出:
+---------+---+----+-----------+
|alias_len| id|name| alias|
+---------+---+----+-----------+
| 11| 3| def|short_alias|
| 11| 2| xyz|same_length|
| 5| 1| abc| short|
+---------+---+----+-----------+
[((2, 'xyz'), 'same_length'), ((3, 'def'), 'short_alias'), ((1, 'abc'), 'short')]
【讨论】:
以上是关于Spark将多组行过滤为单行的主要内容,如果未能解决你的问题,请参考以下文章
使用 Spark 中的复杂过滤从 elasticsearch 中获取 esJsonRDD