如何使用 pyspark.resultiterable.ResultIterable 对象

Posted

技术标签:

【中文标题】如何使用 pyspark.resultiterable.ResultIterable 对象【英文标题】:how to use pyspark.resultiterable.ResultIterable object 【发布时间】:2018-03-22 12:18:13 【问题描述】:

我有 1TB 的记录在一对 rdd 中结构化,我想按键对我的所有记录进行分组,然后仅将函数应用于值。

我的代码如下:

rdd = sc.textFile("path").map(lambdal:l.split(";"))
rdd_pair=rdd.map(lambda a: (a[0], a))
rdd_pair.take(3)
#output: [('id_client', ('id_client','time','city')]
#[('1', [('1', '2013/03/12 23:59:59', 'London')]
#[('1', [('1', '2013/12/03 10:43:12', 'Rome')]
#[('1', [('1', '2013/05/01 00:09:59', 'Madrid')]

我想按 id_client 对所有记录进行分组,然后仅将函数矩阵应用于值。对于每个键,该函数按“时间”对元组列表进行排序,然后该函数提取从一个城市到另一个城市的转换。

grouped=rdd_pair.groupByKey(200)
grouped.take(1)
#output [("1",<pyspark.resultiterable.ResultIterable object at 0x7fc659e0a210)]

def matrix(input):
    output=[]
    input_bag= sorted(input, key=lambda x: x[1], reverse=False)
    loc0 = input_bag[0]
    for loc in input_bag[1:]:
        output.append((loc0[2],loc[2]))
        loc0 = loc
    return output

transition=grouped.mapValues(lambda k: matrix(k)).filter(lambda l: l[1]!=[])

我想要的输出是:

#output transition: [('1', [('London', 'Madrid'),('Madrid', 'Rome')])]

我遇到 Python 错误:列表索引超出范围错误

有人可以帮助我吗?谢谢

【问题讨论】:

您能否添加一些示例输入、所需的输出以及对您正在尝试做什么的更清晰的描述? matrix() 应该返回什么函数?请在how to make good reproducible apache spark dataframe examples 上阅读此帖子并尝试提供minimal reproducible example。 我认为数据格式不正确,并且缺少某些属性。唯一可能导致此错误的行是 loc0 = input_bag[0] 所有其他 (loc0, loc, x) 都是元组。 【参考方案1】:

我是这样解决的:

def matrix(input):
    output=[]
    input2=[i[0] for i in input]
    input_bag= sorted(input2, key=lambda x: x[1], reverse=False)
    loc0 = input_bag[0]
    for loc in input_bag[1:]:
        output.append((loc0[2],loc[2]))
        loc0 = loc
    return output

在使用 Python 内置函数“排序”之前,我在 input2(元组列表)中转换输入(可迭代对象)

【讨论】:

以上是关于如何使用 pyspark.resultiterable.ResultIterable 对象的主要内容,如果未能解决你的问题,请参考以下文章

如何使用本机反应创建登录以及如何验证会话

如何在自动布局中使用约束标识符以及如何使用标识符更改约束? [迅速]

如何使用 AngularJS 的 ng-model 创建一个数组以及如何使用 jquery 提交?

如何使用laravel保存所有行数据每个行名或相等

如何使用 Math.Net 连接矩阵。如何使用 Math.Net 调用特定的行或列?

WSARecv 如何使用 lpOverlapped?如何手动发出事件信号?