如何在pyspark地图中添加增量数字
Posted
技术标签:
【中文标题】如何在pyspark地图中添加增量数字【英文标题】:how to add incremental numbers in pyspark map 【发布时间】:2016-07-12 20:31:09 【问题描述】:我有这个代码:
import time
from datetime import datetime
ts = time.time()
dt = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
claims_data = data.map(lambda x: x.split(","))
claim_id = claims_data.map(lambda x: (
x[9],
'Claim_id',
'0',
'Claim_id',
'Claim',
1,
))
BeginDOS = claims_data.map(lambda x: (
x[13],
'BeginDOS',
'0',
'BeginDOS',
'Claim',
1,
))
我正在尝试从现有 RDD 中转换数据,这就是我正在尝试做的事情:
此代码示例提取单个列。 claim_id 是 x[9],BeginDOS 是当前加载的 RDD 的 x[13],称为 data
。使用地图,我正在为每一列使用静态数据创建新的 RDD。随着时间的推移,我将添加更多地图。
我的问题是最后一个字段。在这些情况下,两者都被硬设置为数值 1。
我需要实例化一个变量计数器,并用一个数字预加载该变量。我想在claim_id
和BeginDOS
中使用该数字,但每次都增加它。对于claim_id
中的每条记录,它都会递增,BeginDOS
也是如此
我该怎么做?
谢谢
更新 #1。请求的源数据示例:
1,001,885,2HZL911L0,00,99,,L91279331,001,71,59404,03,MED,2014-05-28,2014-05-28,72885,7840,8460,8470,8471,8472,,920,920,0,0,2014-07-09,C,261435747,U,U6FIL,1,,2014-05-28,2014-05-28,12,0,R0129,845,845,0,U6FIL
谢谢。
【问题讨论】:
您能否附上data
的样本?
我放了一些。非常感谢
【参考方案1】:
通过您的示例,我假设您希望索引基于 1(非 0)。
如果是这样,这应该可以满足您的需求(可以将此模板用于两个变量):
claim_ids = map(lambda x: (
x[1][13],
'BeginDOS',
'0',
'BeginDOS',
'Claim',
x[0]+1,
),enumerate(claims_data))
[('2014-05-28', 'BeginDOS', '0', 'BeginDOS', 'Claim', 1),
('2014-05-28', 'BeginDOS', '0', 'BeginDOS', 'Claim', 2)]
# the x used in the lambda is a tuple with (index#,value)
【讨论】:
谢谢,但是山雀给了我TypeError: 'PipelinedRDD' object is not iterable
错误。
明白了。我没有看到claims_data也是一个RDD,我的错以上是关于如何在pyspark地图中添加增量数字的主要内容,如果未能解决你的问题,请参考以下文章