Spark:按组对记录进行排序?
Posted
技术标签:
【中文标题】Spark:按组对记录进行排序?【英文标题】:Spark: Sort records in groups? 【发布时间】:2015-02-16 14:21:43 【问题描述】:我有一组我需要的记录:
1) 按“日期”、“城市”和“种类”分组
2) 按“奖品”对每个组进行排序
在我的代码中:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object Sort
case class Record(name:String, day: String, kind: String, city: String, prize:Int)
val recs = Array (
Record("n1", "d1", "k1", "c1", 10),
Record("n1", "d1", "k1", "c1", 9),
Record("n1", "d1", "k1", "c1", 8),
Record("n2", "d2", "k2", "c2", 1),
Record("n2", "d2", "k2", "c2", 2),
Record("n2", "d2", "k2", "c2", 3)
)
def main(args: Array[String]): Unit =
val conf = new SparkConf()
.setAppName("Test")
.set("spark.executor.memory", "2g")
val sc = new SparkContext(conf)
val rs = sc.parallelize(recs)
val rsGrp = rs.groupBy(r => (r.day, r.kind, r.city)).map(_._2)
val x = rsGrp.mapr =>
val lst = r.toList
lst.mape => (e.prize, e)
x.sortByKey()
当我尝试对组进行排序时出现错误:
value sortByKey is not a member of org.apache.spark.rdd.RDD[List[(Int,
Sort.Record)]]
怎么了?如何排序?
【问题讨论】:
如果你将排序参数作为键的一部分,看起来你也可以使用 repartitionAndSortWithinPartitions() 来获得“tera-sort”规模。见spark.apache.org/docs/1.3.0/api/scala/… 【参考方案1】:您需要定义一个 Key 然后 mapValues 对它们进行排序。
import org.apache.spark.SparkContext, SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
object Sort
case class Record(name:String, day: String, kind: String, city: String, prize:Int)
// Define your data
def main(args: Array[String]): Unit =
val conf = new SparkConf()
.setAppName("Test")
.setMaster("local")
.set("spark.executor.memory", "2g")
val sc = new SparkContext(conf)
val rs = sc.parallelize(recs)
// Generate pair RDD neccesary to call groupByKey and group it
val key: RDD[((String, String, String), Iterable[Record])] = rs.keyBy(r => (r.day, r.city, r.kind)).groupByKey
// Once grouped you need to sort values of each Key
val values: RDD[((String, String, String), List[Record])] = key.mapValues(iter => iter.toList.sortBy(_.prize))
// Print result
values.collect.foreach(println)
【讨论】:
我在 spark 文档中读到 groupBy 的成本很高。是否有其他方法可以更有效地实现这一目标。 我不知道其他对值进行排序更有效的方法。 Group By Key 通常不会单独使用,因为您将使用这些值进行归约或其他操作。 这是将整个分组转换为内存中的列表。如果这个分组很大怎么办?【参考方案2】:groupByKey 很昂贵,它有两个含义:
-
平均而言,大部分数据在剩余的 N-1 个分区中被打乱。
同一键的所有记录都会在单个执行程序中加载到内存中,这可能会导致内存错误。
根据您的用例,您有不同的更好选择:
-
如果您不关心排序,请使用 reduceByKey 或 aggregateByKey。
如果您只想在不进行任何转换的情况下进行分组和排序,最好使用 repartitionAndSortWithinPartitions (Spark 1.3.0+ http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.OrderedRDDFunctions),但要非常小心您指定的分区器并对其进行测试,因为您现在依赖于可能会产生的副作用在不同的环境中改变行为。另请参阅此存储库中的示例:https://github.com/sryza/aas/blob/master/ch08-geotime/src/main/scala/com/cloudera/datascience/geotime/RunGeoTime.scala。
如果您将转换或不可约聚合(折叠或扫描)应用于已排序记录的可迭代,请查看此库:spark-sorted https://github.com/tresata/spark-sorted。它为配对 rdds 提供了 3 个 API:mapStreamByKey、foldLeftByKey 和 scanLeftByKey。
【讨论】:
【参考方案3】:将map
替换为flatMap
val x = rsGrp.mapr =>
val lst = r.toList
lst.mape => (e.prize, e)
这会给你一个
org.apache.spark.rdd.RDD[(Int, Record)] = FlatMappedRDD[10]
然后您可以在上面的 RDD 上调用 sortBy(_._1)。
【讨论】:
【参考方案4】:作为@gasparms 解决方案的替代方案,我认为可以尝试使用过滤器,然后执行 rdd.sortyBy 操作。您过滤满足关键条件的每条记录。先决条件是您需要跟踪所有键(过滤器组合)。您也可以在遍历记录时构建它。
【讨论】:
以上是关于Spark:按组对记录进行排序?的主要内容,如果未能解决你的问题,请参考以下文章