从 RDD 中的元组中解包项目时出现 Spark 错误

Posted

技术标签:

【中文标题】从 RDD 中的元组中解包项目时出现 Spark 错误【英文标题】:Spark Error when unpacking items from tuple in RDD 【发布时间】:2019-10-02 02:23:15 【问题描述】:

我在 Jupyter notebook 上写了一个脚本来读取 RDD 并执行操作。该脚本在 Jupyter 上运行良好。

rdd=   [('xxxxx99', ['cov_id':'Q', 'cov_cd':'100','cov_amt':'100', 'cov_state':'AZ',
                  'cov_id':'Q', 'cov_cd':'33','cov_amt':'200', 'cov_state':'AZ',
                  'cov_id':'Q', 'cov_cd':'64','cov_amt':'10', 'cov_state':'AZ'],
                  ['pol_cat_id':'234','pol_dt':'20100220'],
                  ['qor_pol_id':'23492','qor_cd':'30']),

     ('xxxxx86', ['cov_id':'R', 'cov_cd':'20','cov_amt':'100', 'cov_state':'TX',
                  'cov_id':'R', 'cov_cd':'44','cov_amt':'500', 'cov_state':'TX',
                  'cov_id':'R', 'cov_cd':'66','cov_amt':'50', 'cov_state':'TX'],
                  ['pol_cat_id':'532','pol_dt':'20091020'],
                  ['qor_pol_id':'49320','qor_cd':'21']) ]
              

def flatten_map(record):
    # Unpack items
    id, items, [line], [pls] = record
    pol_id = pls["pol_cat_id"]
    pol_dt = pls["pol_dt"]
    qor_id = pls["qor_pol_id"]
    for item in items:
        yield (id,item["cov_id"],item["cov_cd"], item["cov_amt"], item["cov_state"], pol_id, pol_dt, qor_id), 1


 result = (rdd
    # Expand data
    .flatMap(flatten_map)
    # Flatten tuples
    .map(lambda x: x[0],))) 

但是,当转换为 Python 脚本时,出现错误:

2019-10-01 14:12:46,901:ERROR: id, items, [line], [pls] = record

2019-10-01 14:12:46,901:ERROR:ValueError: 没有足够的值来解压

(预期为 1,得到 0)

有什么建议吗? Python 在 notebook 和 .py 上的处理方式有区别吗?

【问题讨论】:

通常 IDE 与可执行错误是因为内存中存储了一些您不知道的变量。这是 Jupiter 和你正在执行的整个脚本吗?你是如何在命令行上执行这个脚本的? 我将其移至 .py 文件并使用 python file.py 执行 .py 文件 【参考方案1】:

为正确的变量取正确的值只是一些错误。

请通过以下代码:

rdd = [('xxxxx99', ['cov_id':'Q', 'cov_cd':'100','cov_amt':'100', 'cov_state':'AZ',
                  'cov_id':'Q', 'cov_cd':'33','cov_amt':'200', 'cov_state':'AZ',
                  'cov_id':'Q', 'cov_cd':'64','cov_amt':'10', 'cov_state':'AZ'],
                  ['pol_cat_id':'234','pol_dt':'20100220'],
                  ['qor_pol_id':'23492','qor_cd':'30']),
     ('xxxxx86', ['cov_id':'R', 'cov_cd':'20','cov_amt':'100', 'cov_state':'TX',
                  'cov_id':'R', 'cov_cd':'44','cov_amt':'500', 'cov_state':'TX',
                  'cov_id':'R', 'cov_cd':'66','cov_amt':'50', 'cov_state':'TX'],
                  ['pol_cat_id':'532','pol_dt':'20091020'],
                  ['qor_pol_id':'49320','qor_cd':'21']) ]
def flatten_map(record):
    # Unpack items
    id, items, [line], [pls] = record
    pol_id = line["pol_cat_id"]
    pol_dt = line["pol_dt"]
    qor_id = pls["qor_pol_id"]
    for item in items:
        yield (id,item["cov_id"],item["cov_cd"], item["cov_amt"], item["cov_state"], pol_id, pol_dt, qor_id), 1
result = spark.sparkContext.parallelize(rdd).flatMap(flatten_map).map(lambda x: x[0])
result.collect()
# OUTPUT
[('xxxxx99', 'Q', '100', '100', 'AZ', '234', '20100220', '23492'), ('xxxxx99', 'Q', '33', '200', 'AZ', '234', '20100220', '23492'), ('xxxxx99', 'Q', '64', '10', 'AZ', '234', '20100220', '23492'), ('xxxxx86', 'R', '20', '100', 'TX', '532', '20091020', '49320'), ('xxxxx86', 'R', '44', '500', 'TX', '532', '20091020', '49320'), ('xxxxx86', 'R', '66', '50', 'TX', '532', '20091020', '49320')]

【讨论】:

@algorythms 当然。也请选择并支持我的答案

以上是关于从 RDD 中的元组中解包项目时出现 Spark 错误的主要内容,如果未能解决你的问题,请参考以下文章

如何对 spark scala RDD 中的元组列表/数组执行转换?

解压更长元组的最佳实践 (Python 3.6)

从列表中的元组中删除空字符串

在 python 中解包一个元组并对其进行迭代

Python中的元组(Tuple)

我的元组中的那些小“u”是啥? (python 2.7)[重复]