如何在pyspark地图中添加增量数字

Posted

技术标签:

【中文标题】如何在pyspark地图中添加增量数字【英文标题】:how to add incremental numbers in pyspark map 【发布时间】:2016-07-12 20:31:09 【问题描述】:

我有这个代码:

import time
from datetime import datetime

ts = time.time()
dt = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')

claims_data = data.map(lambda x: x.split(","))
claim_id = claims_data.map(lambda x: (
    x[9],
    'Claim_id',
    '0',
    'Claim_id',
    'Claim',
    1,
))

BeginDOS = claims_data.map(lambda x: (
    x[13],
    'BeginDOS',
    '0',
    'BeginDOS',
    'Claim',
    1,
))

我正在尝试从现有 RDD 中转换数据,这就是我正在尝试做的事情:

此代码示例提取单个列。 claim_id 是 x[9],BeginDOS 是当前加载的 RDD 的 x[13],称为 data。使用地图,我正在为每一列使用静态数据创建新的 RDD。随着时间的推移,我将添加更多地图。

我的问题是最后一个字段。在这些情况下,两者都被硬设置为数值 1。

我需要实例化一个变量计数器,并用一个数字预加载该变量。我想在claim_idBeginDOS 中使用该数字,但每次都增加它。对于claim_id 中的每条记录,它都会递增,BeginDOS 也是如此

我该怎么做?

谢谢

更新 #1。请求的源数据示例:

1,001,885,2HZL911L0,00,99,,L91279331,001,71,59404,03,MED,2014-05-28,2014-05-28,72885,7840,8460,8470,8471,8472,,920,920,0,0,2014-07-09,C,261435747,U,U6FIL,1,,2014-05-28,2014-05-28,12,0,R0129,845,845,0,U6FIL

谢谢。

【问题讨论】:

您能否附上data 的样本? 我放了一些。非常感谢 【参考方案1】:

通过您的示例,我假设您希望索引基于 1(非 0)。

如果是这样,这应该可以满足您的需求(可以将此模板用于两个变量):

claim_ids = map(lambda x: (
    x[1][13],
    'BeginDOS',
    '0',
    'BeginDOS',
    'Claim',
    x[0]+1,
),enumerate(claims_data))

[('2014-05-28', 'BeginDOS', '0', 'BeginDOS', 'Claim', 1),
 ('2014-05-28', 'BeginDOS', '0', 'BeginDOS', 'Claim', 2)]

# the x used in the lambda is a tuple with (index#,value)

【讨论】:

谢谢,但是山雀给了我TypeError: 'PipelinedRDD' object is not iterable 错误。 明白了。我没有看到claims_data也是一个RDD,我的错

以上是关于如何在pyspark地图中添加增量数字的主要内容,如果未能解决你的问题,请参考以下文章

如何在 pyspark 的结构化流作业中运行地图转换

如何使用 Vue JS 在子组件中保留数字和增量?

如何更改pyspark中的并行任务数

如何在r中添加特定(选定)美国州地图的边界?

MySQL 自动增量是如何工作的?

如何在 Spark SQL(PySpark) 中实现自增