Spark reduceByKey() 返回一个复合值
Posted
技术标签:
【中文标题】Spark reduceByKey() 返回一个复合值【英文标题】:Spark reduceByKey() to return a compound value 【发布时间】:2020-02-08 18:05:38 【问题描述】:我是 Spark 的新手,偶然发现了以下(可能很简单)问题。
我有一个带有键值元素的 RDD,每个值都是一个(字符串,数字)对。 例如键值对是 ('A', ('02', 43))。
我想通过在元素(键和整体值)共享相同键时保持最大数量来减少此 RDD。
reduceByKey() 似乎相关,我选择了这个 MWE。
sc= spark.sparkContext
rdd = sc.parallelize([
('A', ('02', 43)),
('A', ('02', 36)),
('B', ('02', 306)),
('C', ('10', 185))])
rdd.reduceByKey(lambda a,b : max(a[1],b[1])).collect()
产生
[('C', ('10', 185)), ('A', 43), ('B', ('02', 306))]
我的问题是我想得到:
[('C', ('10', 185)), ('A', ('02', 43)), ('B', ('02', 306))]
也就是说,我不知道如何返回 ('A',('02',43)) 而不仅仅是 ('A',43)。
【问题讨论】:
【参考方案1】:我找到了解决这个简单问题的方法。 为 reduceByKey() 定义一个函数而不是使用内联函数。 这是:
def max_compound(a,b):
if (max(a[1],b[1])==a[1]):
return a
else:
return b
然后调用:
rdd.reduceByKey(max_compound).collect()
【讨论】:
【参考方案2】:以下代码在Scala
,希望你能把同样的逻辑转换成pyspark
val rdd = sparkSession.sparkContext.parallelize(Array(('A', (2, 43)), ('A', (2, 36)), ('B', (2, 306)), ('C', (10, 185))))
val rdd2 = rdd.reduceByKey((a, b) => (Math.max(a._1, b._1), Math.max(a._2, b._2)))
rdd2.collect().foreach(println)
输出:
(B,(2,306))
(A,(2,43))
(C,(10,185))
【讨论】:
我没有明确说明我想保持第一个对元素与最大数量(第二对元素)相关联。即 ('A', (1, 43)), ('A', (2, 36) 将返回 ('A', (1, 43))。我猜你的答案可能会混合情侣,在这种情况下返回(('A', (2,43))。 @user1551605 要做到这一点,您可以将第一个max
函数更改为 min
函数,这应该可以解决问题...
不,这仅适用于该示例。如前所述,一般情况是选择 (u,v) 对,其中 v 为最大值,无论 u。以上是关于Spark reduceByKey() 返回一个复合值的主要内容,如果未能解决你的问题,请参考以下文章
Spark中groupByKey() 和 reduceByKey() 和combineByKey()
Spark中groupByKey() 和 reduceByKey() 和combineByKey()
Spark中groupByKey() 和 reduceByKey() 和combineByKey()
Spark中groupByKey() 和 reduceByKey() 和combineByKey()