使用3个元素元组时为什么会出现CombineByKey错误?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用3个元素元组时为什么会出现CombineByKey错误?相关的知识,希望对你有一定的参考价值。
我正在尝试使用scala来计算我的数据的一些统计数据。我有一系列元组
agency_time_map = Array(("LA", 2), ("NY", 4), ...)
我正在尝试使用CombineByKey:
val combiner = (x: Double) => (1, x, x*x)
val merger = (x: (Int, Double, Double), y: (Double, Double)) => {
val (c, acc_1, acc_2) = x
val (y_1, y_2) = y
(c+1, acc_1 + y_1, acc_2 + y_2)
}
val mergeAndCombiner = (x1: (Int, Double, Double), x2: (Int, Double, Double)) => {
val (c1, acc1_1, acc1_2) = x1
val (c2, acc2_1, acc2_2) = x2
(c1+c2, acc1_1 + acc2_1, acc1_2 + acc2_2)
}
在我的数据上使用它时出现以下错误:
agency_time_map.combineByKey(combiner,merger,mergeAndCombiner)
<console>:32: error: type mismatch;
found : ((Int, Double, Double), (Double, Double)) => (Int, Double, Double)
required: (?, Double) => ?
agency_time_map.combineByKey(combiner,merger,mergeAndCombiner)
我一直试图把它弄清楚,但对错误的解释是相当困难的。
我怀疑问题来自合并器或合并方法。
你能帮我解决一下这个问题吗?
谢谢
那是因为signature of combine by key, define in context of RDD[(K, V)]
is
def combineByKey[C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]
这意味着mergeValue
的第二个参数的类型 - V
mergeValue: (C, V) ⇒ C
必须匹配第二种类型的参数和createCombiner
的参数类型。
如果createCombiner
定义为
val combiner = (x: Double) => (1, x, x*x)
这是(Double) => (Int, Double Double)
然后mergeValue
必须是:
(C, Double) => C
其中C
是(Int, Double Double)
- 返回类型createCombiner
。
如果你的代码中createCombiner
是(C, (Double, Double)) => C
,这显然是不满意的。
让我们将实际实现作为用户的练习 - 因为这个问题看起来很接近于家庭作业。
以上是关于使用3个元素元组时为什么会出现CombineByKey错误?的主要内容,如果未能解决你的问题,请参考以下文章
将每个元素视为元组时,在 PySpark 中加入 2 个 RDD
当我的模式只包含一个组时,为啥 re.findall 会返回一个元组列表?