使用 Apache Spark 将键值对缩减为键列表对

Posted

技术标签:

【中文标题】使用 Apache Spark 将键值对缩减为键列表对【英文标题】:Reduce a key-value pair into a key-list pair with Apache Spark 【发布时间】:2015-01-16 02:30:29 【问题描述】:

我正在编写一个 Spark 应用程序,并希望将一组键值对 (K, V1), (K, V2), ..., (K, Vn) 组合成一个键多值对 (K, [V1, V2, ..., Vn])。我觉得我应该能够使用 reduceByKey 函数来做到这一点:

My_KMV = My_KV.reduce(lambda a, b: a.append([b]))

发生这种情况时我得到的错误是:

“NoneType”对象没有“附加”属性。

我的键是整数,值 V1,...,Vn 是元组。我的目标是使用键和值列表(元组)创建一对。

【问题讨论】:

【参考方案1】:

Map 和 ReduceByKey

reduce 的输入类型和输出类型必须相同,因此如果要聚合一个列表,则必须将map 输入到列表中。然后将这些列表合并为一个列表。

组合列表

您需要一种将列表合并为一个列表的方法。 Python 提供了一些methods to combine lists。

append 修改第一个列表,将始终返回None

x = [1, 2, 3]
x.append([4, 5])
# x is [1, 2, 3, [4, 5]]

extend 做同样的事情,但解包列表:

x = [1, 2, 3]
x.extend([4, 5])
# x is [1, 2, 3, 4, 5]

这两种方法都返回None,但您需要一个返回组合列表的方法,因此只需use the plus sign。

x = [1, 2, 3] + [4, 5]
# x is [1, 2, 3, 4, 5]

火花

file = spark.textFile("hdfs://...")
counts = file.flatMap(lambda line: line.split(" ")) \
         .map(lambda actor: (actor.split(",")[0], actor)) \ 

         # transform each value into a list
         .map(lambda nameTuple: (nameTuple[0], [ nameTuple[1] ])) \

         # combine lists: ([1,2,3] + [4,5]) becomes [1,2,3,4,5]
         .reduceByKey(lambda a, b: a + b)

组合键

也可以用combineByKey来解决这个问题,它在内部用于实现reduceByKey,但它更复杂,"using one of the specialized per-key combiners in Spark can be much faster"。对于上面的解决方案,您的用例已经足够简单了。

GroupByKey

也可以使用groupByKey、but it reduces parallelization 解决此问题,因此对于大数据集可能会慢得多。

【讨论】:

在这种情况下 ReduceByKey 是否比 GroupByKey 快?它产生相同的结果,那么哪个更好?有没有办法从 ReduceByKey 生成的最终列表中删除重复项? @Sofia:如前所述,GroupByKey 减少了并行化,但如果您使用的是小型数据集,这可能不是问题。只有性能测试才能给你一个具体的答案。使用 ReduceByKey 时,删除重复值不是内置的,但您可以轻松添加另一个步骤来执行此操作,或者创建自己的 Create 方法来处理它。 糟糕,我的意思是“你可以创建自己的 Combine 方法”。 使用+ 会强制在每次追加时复制不断增长的列表,在每个列表的最终长度中花费时间二次方。 extend() 是正确的答案——你将它包装在一个函数中,该函数返回(增长的)左侧list 会保持列表的顺序吗?【参考方案2】:

tl;dr 如果您确实需要这样的操作,请使用 groupByKey as suggested by @MariusIon。与直接分组相比,这里提出的所有其他解决方案要么直接效率低下,要么至少次优。

reduceByKey 使用列表连接不是可接受的解决方案,因为:

需要初始化 O(N) 个列表。 + 对一对列表的每次应用都需要两个列表的完整副本 (O(N)),从而有效地将整体复杂度增加到 O(N2 sup>). 没有解决groupByKey 引入的任何问题。必须洗牌的数据量以及最终结构的大小是相同的。 与suggested by one of the answers 不同,使用reduceByKeygroupByKey 实现的并行度没有区别。

combineByKeylist.extend 是次优解决方案,因为:

MergeValue 中创建O(N) 个列表对象(这可以通过直接在新项目上使用list.append 来优化)。 如果使用 list.append 进行优化,则它完全等同于 groupByKey 的旧 (Spark

【讨论】:

【参考方案3】:

我的谈话有点晚了,但这是我的建议:

>>> foo = sc.parallelize([(1, ('a','b')), (2, ('c','d')), (1, ('x','y'))])
>>> foo.map(lambda (x,y): (x, [y])).reduceByKey(lambda p,q: p+q).collect()
[(1, [('a', 'b'), ('x', 'y')]), (2, [('c', 'd')])]

【讨论】:

嗨,您也可以为此提供equivalent Java code 的帮助。我想在 Java 中实现类似的东西 map(lambda (x,y): (x, [y])) 已经解决了连接问题(而不是合并)。谢谢。【参考方案4】:

您可以使用 RDD groupByKey 方法。

输入:

data = [(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd'), (2, 'e'), (3, 'f')]
rdd = sc.parallelize(data)
result = rdd.groupByKey().collect()

输出:

[(1, ['a', 'b']), (2, ['c', 'd', 'e']), (3, ['f'])]

【讨论】:

不鼓励使用groupByKey,因为它会导致过度洗牌。正如@Christian_Strempfer 所建议的,您应该使用reduceByKey (see this link) 或combineByKey【参考方案5】:

如果你想做一个reduceByKey,其中减少的KV对中的类型与原始KV对中的类型不同,那么可以使用函数combineByKey。该函数所做的是获取 KV 对并将它们(通过 Key)组合成 KC 对,其中 C 与 V 的类型不同。

一个指定3个函数,createCombiner、mergeValue、mergeCombiners。第一个指定如何将类型 V 转换为类型 C,第二个描述如何将类型 C 与类型 V 组合,最后一个指定如何将类型 C 与另一个类型 C 组合。我的代码创建了 KV 对:

定义3个函数如下:

def Combiner(a):    #Turns value a (a tuple) into a list of a single tuple.
    return [a]

def MergeValue(a, b): #a is the new type [(,), (,), ..., (,)] and b is the old type (,)
    a.extend([b])
    return a

def MergeCombiners(a, b): #a is the new type [(,),...,(,)] and so is b, combine them
    a.extend(b)
    return a

那么,My_KMV = My_KV.combineByKey(Combiner, MergeValue, MergeCombiners)

我发现使用此功能的最佳资源是:http://abshinn.github.io/python/apache-spark/2014/10/11/using-combinebykey-in-apache-spark/

正如其他人指出的那样,a.append(b)a.extend(b) 返回 None。所以reduceByKey(lambda a, b: a.append(b)) 在第一对 KV 对上返回 None,然后在第二对上失败,因为 None.append(b) 失败。你可以通过定义一个单独的函数来解决这个问题:

 def My_Extend(a,b):
      a.extend(b)
      return a

然后调用reduceByKey(lambda a, b: My_Extend(a,b))(这里可能不需要使用lambda函数,但我没有测试过这种情况。)

【讨论】:

【参考方案6】:

错误消息源于闭包中“a”的类型。

 My_KMV = My_KV.reduce(lambda a, b: a.append([b]))

让 pySpark 将 a 显式评估为列表。例如,

My_KMV = My_KV.reduceByKey(lambda a,b:[a].extend([b]))

在很多情况下,reduceByKey 会比 groupByKey 更可取,参考: http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html

【讨论】:

【参考方案7】:

我尝试使用 combineByKey,这是我的步骤

combineddatardd=sc.parallelize([("A", 3), ("A", 9), ("A", 12),("B", 4), ("B", 10), ("B", 11)])

combineddatardd.combineByKey(lambda v:[v],lambda x,y:x+[y],lambda x,y:x+y).collect()

输出:

[('A', [3, 9, 12]), ('B', [4, 10, 11])]

    为组合器定义一个函数,将累加器设置为在分区内遇到的第一个键值对,在此步骤中将值转换为列表

    定义一个函数,将同一键的新值合并到步骤 1 中捕获的累加器值注意:-将值转换为在此函数中列出,因为累加器值已在第一步中转换为列表

    定义函数以合并各个分区的组合器输出。

【讨论】:

【参考方案8】:

好的。我希望,我做对了。你的输入是这样的:

kv_input = [("a", 1), ("a", 2), ("a", 3), ("b", 1), ("b", 5)]

你想得到这样的东西:

kmv_output = [("a", [1, 2, 3]), ("b", [1, 5])]

那么这可能会起作用(请参阅here):

d = dict()
for k, v in kv_input:
    d.setdefault(k, list()).append(v)
kmv_output = list(d.items())

如果我弄错了,请告诉我,以便我根据您的需要进行调整。

P.S.:a.append([b]) 总是返回 None。您可能想观察[b]a,但不是append 的结果。

【讨论】:

所以你对我所拥有的 kv_input 和我想要的 kmv_output 的想法是完全正确的。我相信您的代码适用于串行 python,但是因为我使用 Spark 并行执行操作,所以我的 kv_input 的类型为 RDD(弹性分布式数据)......这是不可迭代的(所以我不能为 k 做类似的事情,v 在 kv_input)。 啊。好的。我的错,不知道火花。我在这里为那些不知道/注意到这一点的人提供答案。像我一样:P 不用担心。我对它很陌生,感谢您花时间演示此解决方案。 附言很有帮助。我对 retList = a.append([b]) 做了一个快速更改,然后返回 retList 这解决了第一个问题,但是我有一个新的小问题我应该能够解决(代码生成一个包含两个元组的列表和列表)。 @TravisJ:您需要使用extend 而不是append,就像我在回答中所做的那样。另请参阅Python - append vs. extend。【参考方案9】:

我在寻找相同问题的 java 示例时点击了此页面。 (如果你的情况类似,这是我的例子)

诀窍是 - 您需要为键分组。

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public class SparkMRExample 

    public static void main(String[] args) 
        // spark context initialisation
        SparkConf conf = new SparkConf()
                .setAppName("WordCount")
                .setMaster("local");
        JavaSparkContext context = new JavaSparkContext(conf);

        //input for testing;
        List<String> input = Arrays.asList("Lorem Ipsum is simply dummy text of the printing and typesetting industry.",
                "Lorem Ipsum has been the industry's standard dummy text ever since the 1500s, when an unknown printer took a galley of type and scrambled it to make a type specimen book.",
                "It has survived not only for centuries, but also the leap into electronic typesetting, remaining essentially unchanged.",
                "It was popularised in the 1960s with the release of Letraset sheets containing Lorem Ipsum passages, and more recently with desktop publishing");
        JavaRDD<String> inputRDD = context.parallelize(input);


        // the map phase of word count example
        JavaPairRDD<String, Integer> mappedRDD =
                inputRDD.flatMapToPair( line ->                      // for this input, each string is a line
                        Arrays.stream(line.split("\\s+"))            // splitting into words, converting into stream
                                .map(word -> new Tuple2<>(word, 1))  // each word is assigned with count 1
                                .collect(Collectors.toList()));      // stream to iterable

        // group the tuples by key
        // (String,Integer) -> (String, Iterable<Integer>)
        JavaPairRDD<String, Iterable<Integer>> groupedRDD = mappedRDD.groupByKey();

        // the reduce phase of word count example
        //(String, Iterable<Integer>) -> (String,Integer)
        JavaRDD<Tuple2<String, Integer>> resultRDD =
                groupedRDD.map(group ->                                      //input is a tuple (String, Iterable<Integer>)
                        new Tuple2<>(group._1,                              // the output key is same as input key
                        StreamSupport.stream(group._2.spliterator(), true)  // converting to stream
                                .reduce(0, (f, s) -> f + s)));              // the sum of counts
        //collecting the RRD so that we can print
        List<Tuple2<String, Integer>> result = resultRDD.collect();
        // print each tuple
        result.forEach(System.out::println);
    

【讨论】:

【参考方案10】:

希望你有这样的输入数据

10 1
10 2
20 4
20 7
20 9

你想要这样的输出

10-1,2
20-4,7,9

你可以这样做

rdd=sc.textFile("location_of_file") 

def parse(line):
    fields=line.split(" ")
    return (fields[0],fields[1])

rdd1=rdd.map(parse) //parse func is for having the input as key,value pair
rdd1.groupByKey().mapValues(list).collect()

【讨论】:

正如目前所写,您的答案尚不清楚。请edit 添加其他详细信息,以帮助其他人了解这如何解决所提出的问题。你可以找到更多关于如何写好答案的信息in the help center。

以上是关于使用 Apache Spark 将键值对缩减为键列表对的主要内容,如果未能解决你的问题,请参考以下文章

无法使用 Spark/Scala 从 JSON 嵌套键值对创建列和值

将字符串数组转换为键值对

如何将键值对添加到字典中? [复制]

Pyspark 将行数据转换为键值对

如何在php中将对象数组转换为键值对

如何在运行时将键值对存储在应用程序设置中?