Spark reduceByKey() 返回一个复合值

Posted

技术标签:

【中文标题】Spark reduceByKey() 返回一个复合值【英文标题】:Spark reduceByKey() to return a compound value 【发布时间】:2020-02-08 18:05:38 【问题描述】:

我是 Spark 的新手,偶然发现了以下(可能很简单)问题。

我有一个带有键值元素的 RDD,每个值都是一个(字符串,数字)对。 例如键值对是 ('A', ('02', 43))。

我想通过在元素(键和整体值)共享相同键时保持最大数量来减少此 RDD。

reduceByKey() 似乎相关,我选择了这个 MWE。

sc= spark.sparkContext
rdd = sc.parallelize([
 ('A', ('02', 43)),
 ('A', ('02', 36)),
 ('B', ('02', 306)),
 ('C', ('10', 185))])
rdd.reduceByKey(lambda a,b : max(a[1],b[1])).collect()

产生

[('C', ('10', 185)), ('A', 43), ('B', ('02', 306))]

我的问题是我想得到:

[('C', ('10', 185)), ('A', ('02', 43)), ('B', ('02', 306))]

也就是说,我不知道如何返回 ('A',('02',43)) 而不仅仅是 ('A',43)。

【问题讨论】:

【参考方案1】:

我找到了解决这个简单问题的方法。 为 reduceByKey() 定义一个函数而不是使用内联函数。 这是:

def max_compound(a,b):
 if (max(a[1],b[1])==a[1]):
   return a
 else: 
   return b

然后调用:

rdd.reduceByKey(max_compound).collect()

【讨论】:

【参考方案2】:

以下代码在Scala,希望你能把同样的逻辑转换成pyspark

val rdd = sparkSession.sparkContext.parallelize(Array(('A', (2, 43)), ('A', (2, 36)), ('B', (2, 306)), ('C', (10, 185))))

val rdd2 = rdd.reduceByKey((a, b) => (Math.max(a._1, b._1), Math.max(a._2, b._2)))

rdd2.collect().foreach(println)

输出:

(B,(2,306))
(A,(2,43))
(C,(10,185))

【讨论】:

我没有明确说明我想保持第一个对元素与最大数量(第二对元素)相关联。即 ('A', (1, 43)), ('A', (2, 36) 将返回 ('A', (1, 43))。我猜你的答案可能会混合情侣,在这种情况下返回(('A', (2,43))。 @user1551605 要做到这一点,您可以将第一个 max 函数更改为 min 函数,这应该可以解决问题... 不,这仅适用于该示例。如前所述,一般情况是选择 (u,v) 对,其中 v 为最大值,无论 u。

以上是关于Spark reduceByKey() 返回一个复合值的主要内容,如果未能解决你的问题,请参考以下文章

Spark中groupByKey() 和 reduceByKey() 和combineByKey()

Spark中groupByKey() 和 reduceByKey() 和combineByKey()

Spark中groupByKey() 和 reduceByKey() 和combineByKey()

Spark中groupByKey() 和 reduceByKey() 和combineByKey()

Spark入门--Spark的reduce和reduceByKey

Spark中的treeReduce与reduceByKey