Spark & Python 2.7 - 复杂的数据结构 - GroupByKey

Posted

技术标签:

【中文标题】Spark & Python 2.7 - 复杂的数据结构 - GroupByKey【英文标题】:Spark & Python 2.7 - Complicated Data Structure - GroupByKey 【发布时间】:2017-06-26 14:21:06 【问题描述】:

我有一个如下所示的 rdd:

totalsrdd = [((2, 16),[[1,2,3,...,36],[2,2,3,...,36]]),((2,17),[[1,2,3,...,36]]),...]

键是天 (2,16) 等,它们分别对应于一个或多个 36 个数字的列表。对于每个日期,我需要一个列表,其中列表中的第 i 个条目是每个列表中第 i 个条目的平均值,或者相​​应日期的列表。

例如,对于 (2,16),第一个条目的平均值为 (1+2)/(36+36) 或 .04166,因为该日期有两个列表。

newRdd = [((2,16),[[.04166,.055555,.083333,...,1]]),(2,17),[[.027777,.055555,.083333,...,1]]),...]

由于 (2,17) 只有一个列表,因此列表中的每个条目都除以 36。

这是我到目前为止的代码。数据远不止两个日期。

def get_partition(x):
    j = [(x[1][i]).total_seconds() for i in range(len(x[1]))]
    return (x[0],j)
newTimeDeltaRdd2 = newtimeDeltaRdd.map(lambda x : ((x[1].month,x[1].day), x[0]))
totals = newTimeDeltaRdd2.map(lambda x: (get_partition(x)))
totalsrdd = totals.groupByKey().map(lambda x : (x[0], list(x[1])))

谢谢!

【问题讨论】:

是否可以将 totalsrdd 转换为字典?这将使这变得容易得多。 您可能希望将操作1+2/36+36 编辑为(1+2)/(36+36) 也许我在这里遗漏了一些东西,但newrdd = totalsrdd.mapValues(lambda l: [sum(x)/len(l) for x in zip(*l)]) 不会做这项工作吗? /(len(l)*36)) 用于您对平均值的定义,我不太明白。 (1+2)/2 是平均值的正常定义。 【参考方案1】:

这是获取newrdd的可能解决方案:

totalsrdd = [((2, 16),[[1,2,3,...,36],[2,2,3,...,36]]),((2,17),[[1,2,3,...,36]]),...]

newrdd = []
for key, _list in totalsrdd:
    averages = []
    for i in range(36):
        averages.append(sum([_l[i] for _l in _list]) / 36 * len(_list))
    newrdd.append((key, averages)) 

【讨论】:

答案不应该没有解释 - 请编辑您的答案并添加解释更改的内容和原因 我收到一条错误消息,提示“PipelinedRDD”对象不可迭代【参考方案2】:

会产生您描述的行为的快速而肮脏的解决方案。

我还是会考虑使用字典

import numpy as np
for entry in totalsrdd:
    sum = np.zeros(36)
    for ls in entry[1]:
        sum = np.add(sum, ls)
    sum = np.divide(sum, len(entry[1]) * 36)
    entry[1] = sum

【讨论】:

这更新了 totalsrdd。让它创建newrdd 将是一个小修改。 我收到一条错误消息,提示“PipelinedRDD”对象不可迭代

以上是关于Spark & Python 2.7 - 复杂的数据结构 - GroupByKey的主要内容,如果未能解决你的问题,请参考以下文章

AWS Elastic Beanstalk 500 错误 (Django) - Python-urllib/2.7 & 静态文件错误

2.将python版本改为2.7的方法

Linux python2.4升级到2.7

如何在 spark-2.1.1-bin-hadoop2.7 的 bin 文件夹外运行 spark-jobs

spark-submit 执行scala代码

Spark实践|如何让CDSW的PySpark自动适配Python版本