Pyspark应用不同的基于reduce函数的键
Posted
技术标签:
【中文标题】Pyspark应用不同的基于reduce函数的键【英文标题】:Pyspark apply different reduce function based key 【发布时间】:2018-08-09 19:41:26 【问题描述】:假设我有一些看起来像这样的数据
data =[('yes_sum', np.array([2, 2, 2])),
('yes_sum', np.array([3, 3, 3])),
('no_sum', np.array([4, 4, 4])),
('no_sum', np.array([6, 6, 6]))]
我将其转换为 rdd。
rdd_data = sc.parallelize(data)
我想用键 'yes_sum'
对数组求和,但将带有键 'no_sum'
的数组合并在一起。所以它看起来像这样:
[('yes_sum', array([5, 5, 5])), ('no_sum', array([4, 4, 4, 6, 6, 6]))]
我只知道如何通过键对数组求和:
rdd_data.reduceByKey(lambda x,y: x + y).collect()
我得到的:
[('yes_sum', array([5, 5, 5])), ('no_sum', array([10, 10, 10]))]
但这不是我想要的。我在想这样的事情:
rdd_data.reduceByKey(
lambda x,y: if x.key() == 'yes_sum' x+y else np.concatenate((x, y))
).collect()
【问题讨论】:
【参考方案1】:首先,你的语法:
lambda x,y: if x.key() == 'yes_sum' x+y else np.concatenate((x, y))
不正确。相反,你可以写:
lambda x,y: x+y if x.key() == 'yes_sum' else np.concatenate((x, y))
但这会导致:
AttributeError: 'numpy.ndarray' object has no attribute 'key'
当您执行reduceByKey
时,reduce 函数本身不知道key
部分。 Spark 已经完成了将来自相似键的数据分组在一起并将其传递给适当的 reducer 的工作。
为了完成您想做的事情,您需要先filter
rdd
,然后再调用reduceByKey
。然后您可以根据过滤应用不同的reduce
函数,并合并您的结果。
例如:
yes_rdd = rdd_data.filter(lambda x: x[0] == 'yes_sum')\
.reduceByKey(lambda x,y: x + y)
no_rdd = rdd_data.filter(lambda x: x[0] != 'yes_sum')\
.reduceByKey(lambda x,y: np.concatenate((x, y)))
print(yes_rdd.union(no_rdd).collect())
#[('yes_sum', array([5, 5, 5])), ('no_sum', array([4, 4, 4, 6, 6, 6]))]
【讨论】:
以上是关于Pyspark应用不同的基于reduce函数的键的主要内容,如果未能解决你的问题,请参考以下文章
为同一个 reducer 函数收集不同的键 - HADOOP