在 Spark 中过滤 KeyValueGrouped 数据集
Posted
技术标签:
【中文标题】在 Spark 中过滤 KeyValueGrouped 数据集【英文标题】:filter KeyValueGrouped Dataset in spark 【发布时间】:2018-10-01 12:53:20 【问题描述】:我有一个自定义类的类型化数据集,并在其上使用 groupbykey 方法。你知道它会产生一个 KeyValueGroupedDataset。我想过滤这个新的数据集,但是这种类型的数据集没有过滤方法。所以,我的问题是:如何过滤这种类型的数据集? (需要Java解决方案。火花版本:2.3.1)。
样本数据:
"id":1,"fname":"Gale","lname":"Willmett","email":"gwillmett0@nhs.uk","gender":"Female"
"id":2,"fname":"Chantalle","lname":"Wilcher","email":"cwilcher1@blinklist.com","gender":"Female"
"id":3,"fname":"Polly","lname":"Grandisson","email":"pgrandisson2@linkedin.com","gender":"Female"
"id":3,"fname":"Moshe","lname":"Pink","email":"mpink3@twitter.com","gender":"Male"
"id":2,"fname":"Yorke","lname":"Ginnelly","email":"yginnelly4@apple.com","gender":"Male"
我做了什么:
Dataset<Person> peopleDS = spark.read().format("parquet").load("\path").as(Encoders.bean(Person.class));
KeyValueGroupedDataset<String, Person> KVDS = peopleDS.groupByKey( (MapFunction<Person, String> ) f -> f.getGender() , Encoders.STRING());
//How Can I filter on KVDS's id field?
Update1(使用 flatMapGroups):
Dataset<Person> persons = KVDS.flatMapGroups((FlatMapGroupsFunction <String,Person,Person>) (f,k) -> (Iterator<Person>) k , Encoders.bean(Person.class));
Update2(使用 MapGroups)
Dataset<Person> peopleMap = KVDS.mapGroups((MapGroupsFunction <String,Person,Person>) (f,g) ->
while (g.hasNext())
//What can I do here?
,Encoders.bean(Person.Class);
Update3 :我想过滤那些他们的 id 大于 1 的组。例如在下图中:我只想要女性组,因为他们的 id 大于 1(首先字段为id,其他为fname、lname、email和gender)。
Update4:我用“RDD”做了我想要的,但我想用“Dataset”做这部分代码:
List<Tuple2<String, Iterable<Person>>> f = PersonRDD
.mapToPair(s -> new Tuple2<>(s.getGender(), s)).groupByKey()
.filter(t -> ((Collection<Person>) t._2()).stream().mapToInt(e -> e.getId).distinct().count() > 1)
.collect();
【问题讨论】:
对我来说很难回答的问题 简单的问题形式:我有一个按“性别”分组的“人”数据集,现在我想过滤结果。我该怎么做? 真的很简单 你的解决方案是什么? 我看到你更新了。为考试而学习,但会介于两者之间 【参考方案1】:为什么不在分组前过滤 id ? GroupByKey 是一项昂贵的操作,首先过滤应该更快。
如果你真的想先分组,你可能不得不使用带有标识函数的 .flatMapGroups。
不确定 java 代码,但 scala 版本如下:
peopleDS
.groupByKey(_.gender)
.mapGroups case (gender, persons) => persons.filter(your condition)
但同样,您应该先过滤 :)。特别是因为您的 ID 字段在分组之前已经可用。
【讨论】:
我不先过滤,因为我想要针对不同性别的不同过滤器。 而“mapGroups”给了我一个值的迭代器,我不知道它对我有什么帮助。请参阅我的问题中的 update2。 你能发布你的过滤逻辑吗?你想保留哪些元素?不要打扰 mapGroups,我的意思是 flatMapGroups。【参考方案2】:分组用于聚合函数,您可以在“KeyValueGroupedDataset”类中找到“agg”等函数。如果您为 ex 应用聚合函数。 “count”,你会得到“Dataset”,并且可以使用“filter”功能。
没有聚合函数的“groupBy”看起来很奇怪,其他函数,例如。可以使用“distinct”。
使用“FlatMapGroupsFunction”过滤示例:
.flatMapGroups(
(FlatMapGroupsFunction<String, Person, Person>) (f, k) ->
List<Person> result = new ArrayList<>();
while (k.hasNext())
Person value = k.next();
// filter condition here
if (value != null)
result.add(value);
return result.iterator();
,
Encoders.bean(Person.class))
【讨论】:
我同意你的观点:“分组用于聚合函数”,但在这种情况下我不需要聚合函数。我只需要将性别字段作为“键”并将所有对象(人)的列表作为“值”,然后我想通过过滤来处理“值”。有没有比 groupbykey 更好的方法? 对于过滤器,只能使用“flatMapGroups”值。 我使用 flatMapGroup 函数更新了我的问题,但我认为 flatMapGroups 在分组之前给了我相同的数据集。 请参阅我的问题的 Update4。我用 RDD 做了我想做的事。我想将该部分完全转换为数据集。 数据集可以转换为RDD,并应用“Update4”中的代码:“ad.rdd().toJavaRDD()”以上是关于在 Spark 中过滤 KeyValueGrouped 数据集的主要内容,如果未能解决你的问题,请参考以下文章
在 Spark 中过滤 KeyValueGrouped 数据集