使用3个元素元组时为什么会出现CombineByKey错误?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用3个元素元组时为什么会出现CombineByKey错误?相关的知识,希望对你有一定的参考价值。

我正在尝试使用scala来计算我的数据的一些统计数据。我有一系列元组

agency_time_map = Array(("LA", 2), ("NY", 4), ...)

我正在尝试使用CombineByKey:

val combiner = (x: Double) => (1, x, x*x)

val merger = (x: (Int, Double, Double), y: (Double, Double)) => {
    val (c, acc_1, acc_2) = x
    val (y_1, y_2) = y
    (c+1, acc_1 + y_1, acc_2 + y_2)
}

val mergeAndCombiner = (x1: (Int, Double, Double), x2: (Int, Double, Double)) => {
    val (c1, acc1_1, acc1_2) = x1
    val (c2, acc2_1, acc2_2) = x2
    (c1+c2, acc1_1 + acc2_1, acc1_2 + acc2_2)
}

在我的数据上使用它时出现以下错误:

agency_time_map.combineByKey(combiner,merger,mergeAndCombiner)

<console>:32: error: type mismatch;
 found   : ((Int, Double, Double), (Double, Double)) => (Int, Double, Double)
 required: (?, Double) => ?
       agency_time_map.combineByKey(combiner,merger,mergeAndCombiner)

我一直试图把它弄清楚,但对错误的解释是相当困难的。

我怀疑问题来自合并器或合并方法。

你能帮我解决一下这个问题吗?

谢谢

答案

那是因为signature of combine by key, define in context of RDD[(K, V)] is

def combineByKey[C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]

这意味着mergeValue的第二个参数的类型 - V

mergeValue: (C, V) ⇒ C

必须匹配第二种类型的参数和createCombiner的参数类型。

如果createCombiner定义为

val combiner = (x: Double) => (1, x, x*x)

这是(Double) => (Int, Double Double)然后mergeValue必须是:

(C, Double) => C

其中C(Int, Double Double) - 返回类型createCombiner

如果你的代码中createCombiner(C, (Double, Double)) => C,这显然是不满意的。

让我们将实际实现作为用户的练习 - 因为这个问题看起来很接近于家庭作业。

以上是关于使用3个元素元组时为什么会出现CombineByKey错误?的主要内容,如果未能解决你的问题,请参考以下文章

将每个元素视为元组时,在 PySpark 中加入 2 个 RDD

当我的模式只包含一个组时,为啥 re.findall 会返回一个元组列表?

在以下代码中返回对/结构/元组时的参数顺序是不是重要?

当键是一个只知道第一个元素的元组时,如何获取scala Map值?

将列表作为单个元素插入到元组中

元组(tuple)