从 RDD 中的元组中解包项目时出现 Spark 错误
Posted
技术标签:
【中文标题】从 RDD 中的元组中解包项目时出现 Spark 错误【英文标题】:Spark Error when unpacking items from tuple in RDD 【发布时间】:2019-10-02 02:23:15 【问题描述】:我在 Jupyter notebook 上写了一个脚本来读取 RDD 并执行操作。该脚本在 Jupyter 上运行良好。
rdd= [('xxxxx99', ['cov_id':'Q', 'cov_cd':'100','cov_amt':'100', 'cov_state':'AZ',
'cov_id':'Q', 'cov_cd':'33','cov_amt':'200', 'cov_state':'AZ',
'cov_id':'Q', 'cov_cd':'64','cov_amt':'10', 'cov_state':'AZ'],
['pol_cat_id':'234','pol_dt':'20100220'],
['qor_pol_id':'23492','qor_cd':'30']),
('xxxxx86', ['cov_id':'R', 'cov_cd':'20','cov_amt':'100', 'cov_state':'TX',
'cov_id':'R', 'cov_cd':'44','cov_amt':'500', 'cov_state':'TX',
'cov_id':'R', 'cov_cd':'66','cov_amt':'50', 'cov_state':'TX'],
['pol_cat_id':'532','pol_dt':'20091020'],
['qor_pol_id':'49320','qor_cd':'21']) ]
def flatten_map(record):
# Unpack items
id, items, [line], [pls] = record
pol_id = pls["pol_cat_id"]
pol_dt = pls["pol_dt"]
qor_id = pls["qor_pol_id"]
for item in items:
yield (id,item["cov_id"],item["cov_cd"], item["cov_amt"], item["cov_state"], pol_id, pol_dt, qor_id), 1
result = (rdd
# Expand data
.flatMap(flatten_map)
# Flatten tuples
.map(lambda x: x[0],)))
但是,当转换为 Python 脚本时,出现错误:
2019-10-01 14:12:46,901:ERROR: id, items, [line], [pls] = record
2019-10-01 14:12:46,901:ERROR:ValueError: 没有足够的值来解压
(预期为 1,得到 0)
有什么建议吗? Python 在 notebook 和 .py 上的处理方式有区别吗?
【问题讨论】:
通常 IDE 与可执行错误是因为内存中存储了一些您不知道的变量。这是 Jupiter 和你正在执行的整个脚本吗?你是如何在命令行上执行这个脚本的? 我将其移至 .py 文件并使用 python file.py 执行 .py 文件 【参考方案1】:为正确的变量取正确的值只是一些错误。
请通过以下代码:
rdd = [('xxxxx99', ['cov_id':'Q', 'cov_cd':'100','cov_amt':'100', 'cov_state':'AZ',
'cov_id':'Q', 'cov_cd':'33','cov_amt':'200', 'cov_state':'AZ',
'cov_id':'Q', 'cov_cd':'64','cov_amt':'10', 'cov_state':'AZ'],
['pol_cat_id':'234','pol_dt':'20100220'],
['qor_pol_id':'23492','qor_cd':'30']),
('xxxxx86', ['cov_id':'R', 'cov_cd':'20','cov_amt':'100', 'cov_state':'TX',
'cov_id':'R', 'cov_cd':'44','cov_amt':'500', 'cov_state':'TX',
'cov_id':'R', 'cov_cd':'66','cov_amt':'50', 'cov_state':'TX'],
['pol_cat_id':'532','pol_dt':'20091020'],
['qor_pol_id':'49320','qor_cd':'21']) ]
def flatten_map(record):
# Unpack items
id, items, [line], [pls] = record
pol_id = line["pol_cat_id"]
pol_dt = line["pol_dt"]
qor_id = pls["qor_pol_id"]
for item in items:
yield (id,item["cov_id"],item["cov_cd"], item["cov_amt"], item["cov_state"], pol_id, pol_dt, qor_id), 1
result = spark.sparkContext.parallelize(rdd).flatMap(flatten_map).map(lambda x: x[0])
result.collect()
# OUTPUT
[('xxxxx99', 'Q', '100', '100', 'AZ', '234', '20100220', '23492'), ('xxxxx99', 'Q', '33', '200', 'AZ', '234', '20100220', '23492'), ('xxxxx99', 'Q', '64', '10', 'AZ', '234', '20100220', '23492'), ('xxxxx86', 'R', '20', '100', 'TX', '532', '20091020', '49320'), ('xxxxx86', 'R', '44', '500', 'TX', '532', '20091020', '49320'), ('xxxxx86', 'R', '66', '50', 'TX', '532', '20091020', '49320')]
【讨论】:
@algorythms 当然。也请选择并支持我的答案以上是关于从 RDD 中的元组中解包项目时出现 Spark 错误的主要内容,如果未能解决你的问题,请参考以下文章