如何获得 Spark RDD 的 SQL row_number 等效项?
Posted
技术标签:
【中文标题】如何获得 Spark RDD 的 SQL row_number 等效项?【英文标题】:How do I get a SQL row_number equivalent for a Spark RDD? 【发布时间】:2015-01-18 22:37:48 【问题描述】:我需要为包含许多列的数据表生成完整的行号列表。
在 SQL 中,这看起来像这样:
select
key_value,
col1,
col2,
col3,
row_number() over (partition by key_value order by col1, col2 desc, col3)
from
temp
;
现在,假设在 Spark 中我有一个形式为 (K, V) 的 RDD,其中 V=(col1, col2, col3),所以我的条目是这样的
(key1, (1,2,3))
(key1, (1,4,7))
(key1, (2,2,3))
(key2, (5,5,5))
(key2, (5,5,9))
(key2, (7,5,5))
etc.
我想使用 sortBy()、sortWith()、sortByKey()、zipWithIndex 等命令对它们进行排序,并拥有一个具有正确 row_number 的新 RDD
(key1, (1,2,3), 2)
(key1, (1,4,7), 1)
(key1, (2,2,3), 3)
(key2, (5,5,5), 1)
(key2, (5,5,9), 2)
(key2, (7,5,5), 3)
etc.
(我不关心括号,所以形式也可以是(K, (col1,col2,col3,rownum)) 代替)
我该怎么做?
这是我的第一次尝试:
val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3))
val temp1 = sc.parallelize(sample_data)
temp1.collect().foreach(println)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
// ((1,2),1,2,3)
// ((1,2),1,4,7)
// ((1,2),2,2,3)
temp1.map(x => (x, 1)).sortByKey().zipWithIndex.collect().foreach(println)
// ((((1,2),1,2,3),1),0)
// ((((1,2),1,4,7),1),1)
// ((((1,2),2,2,3),1),2)
// ((((3,4),5,5,5),1),3)
// ((((3,4),5,5,9),1),4)
// ((((3,4),7,5,5),1),5)
// note that this isn't ordering with a partition on key value K!
val temp2 = temp1.???
还要注意,函数 sortBy 不能直接应用于 RDD,必须先运行 collect(),然后输出的也不是 RDD,而是数组
temp1.collect().sortBy(a => a._2 -> -a._3 -> a._4).foreach(println)
// ((1,2),1,4,7)
// ((1,2),1,2,3)
// ((1,2),2,2,3)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
这里有一点进展,但还没有分区:
val temp2 = sc.parallelize(temp1.map(a => (a._1,(a._2, a._3, a._4))).collect().sortBy(a => a._2._1 -> -a._2._2 -> a._2._3)).zipWithIndex.map(a => (a._1._1, a._1._2._1, a._1._2._2, a._1._2._3, a._2 + 1))
temp2.collect().foreach(println)
// ((1,2),1,4,7,1)
// ((1,2),1,2,3,2)
// ((1,2),2,2,3,3)
// ((3,4),5,5,5,4)
// ((3,4),5,5,9,5)
// ((3,4),7,5,5,6)
【问题讨论】:
这个问题是其他几个部分回答的问题的延伸,即***.com/questions/23838614/…、qnalist.com/questions/5086896/…、mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/…、***.com/questions/27022059/…、***.com/questions/24677180/… 我也想回答这个问题。 Hive added analytic functions (includingrow_number()
) in 0.11,Spark 1.1 支持 HiveQL / Hive 0.12。所以看起来sqlContext.hql("select row_number() over(partition by ...
应该可以工作,但我遇到了一个错误。
【参考方案1】:
这是您提出的一个有趣的问题。我会用 Python 回答这个问题,但我相信你能够无缝地翻译成 Scala。
以下是我的处理方法:
1- 简化您的数据:
temp2 = temp1.map(lambda x: (x[0],(x[1],x[2],x[3])))
temp2 现在是一个“真正的”键值对。看起来是这样的:
[
((3, 4), (5, 5, 5)),
((3, 4), (5, 5, 9)),
((3, 4), (7, 5, 5)),
((1, 2), (1, 2, 3)),
((1, 2), (1, 4, 7)),
((1, 2), (2, 2, 3))
]
2- 然后,使用 group-by 函数重现 PARTITION BY 的效果:
temp3 = temp2.groupByKey()
temp3 现在是一个有 2 行的 RDD:
[((1, 2), <pyspark.resultiterable.ResultIterable object at 0x15e08d0>),
((3, 4), <pyspark.resultiterable.ResultIterable object at 0x15e0290>)]
3- 现在,您需要为 RDD 的每个值应用一个排名函数。在 python 中,我会使用简单的排序函数(枚举将创建你的 row_number 列):
temp4 = temp3.flatMap(lambda x: tuple([(x[0],(i[1],i[0])) for i in enumerate(sorted(x[1]))])).take(10)
请注意,要实现您的特定订单,您需要提供正确的“关键”参数(在 python 中,我只需创建一个类似的 lambda 函数:
lambda tuple : (tuple[0],-tuple[1],tuple[2])
最后(没有key参数函数,看起来是这样的):
[
((1, 2), ((1, 2, 3), 0)),
((1, 2), ((1, 4, 7), 1)),
((1, 2), ((2, 2, 3), 2)),
((3, 4), ((5, 5, 5), 0)),
((3, 4), ((5, 5, 9), 1)),
((3, 4), ((7, 5, 5), 2))
]
希望对您有所帮助!
祝你好运。
【讨论】:
第 3 步是天才!【参考方案2】:row_number() over (partition by ... order by ...)
功能已添加到 Spark 1.4。此答案使用 PySpark/DataFrames。
创建一个测试数据框:
from pyspark.sql import Row, functions as F
testDF = sc.parallelize(
(Row(k="key1", v=(1,2,3)),
Row(k="key1", v=(1,4,7)),
Row(k="key1", v=(2,2,3)),
Row(k="key2", v=(5,5,5)),
Row(k="key2", v=(5,5,9)),
Row(k="key2", v=(7,5,5))
)
).toDF()
添加分区行号:
from pyspark.sql.window import Window
(testDF
.select("k", "v",
F.rowNumber()
.over(Window
.partitionBy("k")
.orderBy("k")
)
.alias("rowNum")
)
.show()
)
+----+-------+------+
| k| v|rowNum|
+----+-------+------+
|key1|[1,2,3]| 1|
|key1|[1,4,7]| 2|
|key1|[2,2,3]| 3|
|key2|[5,5,5]| 1|
|key2|[5,5,9]| 2|
|key2|[7,5,5]| 3|
+----+-------+------+
【讨论】:
【参考方案3】:val test = Seq(("key1", (1,2,3)),("key1",(4,5,6)), ("key2", (7,8,9)), ("key2", (0,1,2)))
测试:Seq[(String, (Int, Int, Int))] = List((key1,(1,2,3)), (key1,(4,5,6)), (key2,( 7,8,9)), (key2,(0,1,2)))
test.foreach(println)
(key1,(1,2,3))
(key1,(4,5,6))
(key2,(7,8,9))
(key2,(0,1,2))
val rdd = sc.parallelize(test, 2)
rdd: org.apache.spark.rdd.RDD[(String, (Int, Int, Int))] = ParallelCollectionRDD[41] at parallelize at :26
val rdd1 = rdd.groupByKey.map(x => (x._1,x._2.toArray)).map(x => (x._1, x._2.sortBy(x => x._1).zipWithIndex))
rdd1: org.apache.spark.rdd.RDD[(String, Array[((Int, Int, Int), Int)])] = MapPartitionsRDD[44] at map at :25
val rdd2 = rdd1.flatMap
elem =>
val key = elem._1
elem._2.map(row => (key, row._1, row._2))
rdd2: org.apache.spark.rdd.RDD[(String, (Int, Int, Int), Int)] = MapPartitionsRDD[45] at flatMap at :25
rdd2.collect.foreach(println)
(key1,(1,2,3),0)
(key1,(4,5,6),1)
(key2,(0,1,2),0)
(key2,(7,8,9),1)
【讨论】:
【参考方案4】:从 spark sql,读取数据文件... val df = spark.read.json("s3://s3bukcet/key/activity/year=2018/month=12/date=15/*");
上述文件有字段 user_id、pageviews 和 clicks
生成按 user_id 分区并按点击排序的活动 Id (row_number)
val output = df.withColumn("activity_id", functions.row_number().over(Window.partitionBy("user_id").orderBy("clicks")).cast(DataTypes.IntegerType));
【讨论】:
以上是关于如何获得 Spark RDD 的 SQL row_number 等效项?的主要内容,如果未能解决你的问题,请参考以下文章
如何将列添加到 mapPartitions 内的 org.apache.spark.sql.Row