pyspark reduce键是一个元组值嵌套列表

Posted

技术标签:

【中文标题】pyspark reduce键是一个元组值嵌套列表【英文标题】:pyspark reduce key being a tuple values nested lists 【发布时间】:2016-05-09 09:57:42 【问题描述】:

我的问题如下:我正在解析用户交互,每次检测到交互时我都会发出 ((user1,user2),((date1,0),(0,1)))。这里的零代表交互的方向。

我无法弄清楚为什么我不能使用以下 reduce 函数来减少此输出:

def myFunc2(x1,x2):
    return (min(x1[0][0],x2[0][0]),max(x1[0][0],x2[0][0]),min(x1[0][1],x2[0][1]),max(x1[0][1],x2[0][1]),x1[1][0]+x2[1][0],x1[1][1]+x2[1][1])

我的映射器 (flatmap(myFunc)) 的输出是正确的:

((7401899, 5678002), ((1403185440.0, 0), (1, 0))) ((82628194, 22251869), ((0, 1403185452.0), (0, 1))) ((2162276, 98056200), ((1403185451.0, 0), (1, 0))) ((0509420, 4827510), ((1403185449.0, 0), (1, 0))) ((7974923, 9235930), ((1403185450.0, 0), (1, 0))) ((250259, 6876774), ((0, 1403185450.0), (0, 1))) ((642369, 6876774), ((0, 1403185450.0), (0, 1))) ((82628194, 22251869), ((0, 1403185452.0), (0, 1))) ((2162276, 98056200), ((1403185451.0, 0), (1, 0)))

但正在运行

lines.flatMap(myFunc) \
              .map(lambda x: (x[0], x[1])) \
              .reduceByKey(myFunc2)

给我错误

返回 (min(x1[0][0],x2[0][0]),max(x1[0][0],x2[0][0]),min(x1[0][ 1],x2[0][1]),max(x1[0][1],x2[0][1]),x1[1][0]+x2[1][0],x1[1 ][1]+x2[1][1])

TypeError: 'int' 对象没有属性 'getitem'

我想我在我的键中弄乱了一些东西,但我不知道为什么(我试图将键重铸为元组,如 here 所说但同样的错误)

有什么想法吗?非常感谢

【问题讨论】:

【参考方案1】:

好的,我认为这里的问题是您在没有您想象的那么深的项目中索引得太深。

我们来看看myFunc2

def myFunc2(x1,x2):
    return (min(x1[0][0],x2[0][0]),max(x1[0][0],x2[0][0]),min(x1[0][1],x2[0][1]),max(x1[0][1],x2[0][1]),x1[1][0]+x2[1][0],x1[1][1]+x2[1][1])

鉴于您的上述问题,输入数据将如下所示:

((467401899, 485678002), ((1403185440.0, 0), (1, 0)))

让我们继续并将该数据行分配给一个变量。

x = ((467401899, 485678002), ((1403185440.0, 0), (1, 0)))

当我们运行x[0] 时会发生什么?我们得到(467401899, 485678002)。当我们运行x[1]?我们得到((1403185440.0, 0), (1, 0))。这就是你的map 声明所做的,我相信。

好的。这很清楚。

在您的函数myFunc2 中,您有两个参数x1x2。这些对应于上面的变量:x1 = x[0] = (467401899, 485678002)x2 = x[1] = ((1403185440.0, 0), (1, 0))

现在让我们只检查函数中return 语句的第一部分。

min(x1[0][0], x2[0][0])

所以,x1 = (467401899, 485678002)。凉爽的。现在,x1[0] 是什么?好吧,那是467401899。明显地。可是等等! x1[0][0] 是什么?您尝试在x1[0] 处获取项目的第零个索引,但x1[0] 处的项目不是listtuple,它只是int。而<type 'int'> 的对象没有名为getitem 的方法。

总而言之:您对嵌套不那么深的对象挖掘得太深了。仔细考虑您传递给 myFunc2 的内容,以及您的对象有多深。

我认为myFunc2 的返回语句的第一部分应该如下所示:

return min(x1[0], x2[0][0])。您可以在 x2 上进行更深入的索引,因为 x2 具有更深的嵌套元组!


当我运行以下命令时,它工作得很好:

a = sc.parallelize([((7401899, 5678002), ((1403185440.0, 0), (1, 0))),
((82628194, 22251869), ((0, 1403185452.0), (0, 1))),
((2162276, 98056200), ((1403185451.0, 0), (1, 0))),
((1509420, 4827510), ((1403185449.0, 0), (1, 0))),
((7974923, 9235930), ((1403185450.0, 0), (1, 0))),
((250259, 6876774), ((0, 1403185450.0), (0, 1))),
((642369, 6876774), ((0, 1403185450.0), (0, 1))),
((82628194, 22251869), ((0, 1403185452.0), (0, 1))),
((2162276, 98056200), ((1403185451.0, 0), (1, 0)))])

b = a.map(lambda x: (x[0], x[1])).reduceByKey(myFunc2)

b.collect()

[((1509420, 4827510), ((1403185449.0, 0), (1, 0))),
 ((2162276, 98056200), (1403185451.0, 1403185451.0, 0, 0, 2, 0)),
 ((7974923, 9235930), ((1403185450.0, 0), (1, 0))), 
 ((7401899, 5678002), ((1403185440.0, 0), (1, 0))), 
 ((642369, 6876774), ((0, 1403185450.0), (0, 1))), 
 ((82628194, 22251869), (0, 0, 1403185452.0, 1403185452.0, 0, 2)),
 ((250259, 6876774), ((0, 1403185450.0), (0, 1)))]

【讨论】:

我不确定是否会关注你。 myfunc2 是传递给减速器的函数吗? x1 = x[0] 是我的钥匙,是 (467401899, 485678002)。所以,我可能在这里遗漏了一些东西,但是 myfun2 需要两个输入,即具有相同键的两个值。所以 x1 和 x2 的格式应该是((1403185440.0, 0), (1, 0))。我想这是我在这里遗漏了一点,但我不明白为什么应该将密钥作为输入传递给减速器(措辞不好)。 我想我可能也很难理解你的例子。您能否在问题中添加更多数据,例如((467401899, 485678002), ((1403185440.0, 0), (1, 0)))?这样我就可以实际运行代码并尝试复制您的错误。 再次感谢您的回答。我认为问题可能来自flatmap 之后(不必要的)使用map,但我在map 之前和之后得到相同的输出以及相同的错误。我认为我的代码中有一个错误,用于使用键 (user_id_1,user_id_2) 对记录进行分组,但我无法弄清楚我做错了什么 @HorusH 根据您的示例数据,我在上面添加了新代码。

以上是关于pyspark reduce键是一个元组值嵌套列表的主要内容,如果未能解决你的问题,请参考以下文章

Scala:使用map从列表中提取辅助元组值

计算第一个数字相似的所有元组值的平均值

从 dict 创建数据框,其中键是元组,值是列表

如何从 PySpark 中的多个列创建字典列表,其中键是列名,值是该列的值?

Pyspark 在元组列表上设置

从 Pyspark 中的 RDD 中提取字典