PySpark:在“NoneType”对象上过滤掉 RDD 元素失败是不可迭代的

Posted

技术标签:

【中文标题】PySpark:在“NoneType”对象上过滤掉 RDD 元素失败是不可迭代的【英文标题】:PySpark: filtering out RDD elements fails on 'NoneType' object is not iterable 【发布时间】:2015-03-11 20:21:57 【问题描述】:

我想filter 输出字段“状态”不等于“确定”的 RDD 元素。我从 HDFS 上的一组 CSV 文件创建我的 RDD,然后在尝试 filter 之前使用map 获得我想要的结构:

import csv, StringIO    

files = "/hdfs_path/*.csv"

fields = ["time", "status"]

dial = "excel"

default = 'status': 'OK', 'time': '2014-01-01  00:00:00'

def loadRecord(line, fieldnames, dialect):
    input = StringIO.StringIO(line)
    reader = csv.DictReader(input, fieldnames = fieldnames, dialect = dialect)
    try:
        line = reader.next()
        if line is None:
            return default
        else:
            return line
    except:
        return default

harmonics = sc.textFile(files) \
              .map(lambda x: loadRecord(x, fields, dial)) \
              .filter(lambda x: "OK" not in x['status'])

我可以对这个 RDD 做其他事情——例如另一个mapget 仅某些字段等。但是,当我使用filter 运行我的代码时,其中一个任务总是失败,并在我的filter lambda 函数中出现异常:

'NoneType object is not iterable'

我认为这意味着filter lambda 正在接收None,因此我将代码添加到loadRecord 以避免返回None。但是,我仍然遇到同样的错误。它确实适用于小样本数据集,但我的实际数据足够大,我不确定如何检测它的哪些部分可能导致问题。

任何意见表示赞赏!

【问题讨论】:

【参考方案1】:

首先,将map(lambda x: loadRecord(x, fields, dial)) 替换为map(lambda x: (x, loadRecord(x, fields, dial))) - 这样您既可以保存原始记录,也可以保存已解析的记录。

其次,将filter() 调用替换为flatMap(test_function) 并定义test_function 测试输入的方式,如果第二个传递的参数为None(解析记录),则返回第一个。

这样,您将获得导致问题的输入行,并在本地测试您的脚本。一般来说,我会在loadRecord 函数的第一行添加一行global default

【讨论】:

我用mapcheckNone 函数实现了这个,它返回了违规行或默认行。然后我运行filter 删除所有默认行。最终输出包含 0 行,这很有趣。感谢您的想法!我将把它留到明天,看看是否还有其他意见;如果没有,我会接受你的回答,因为它确实让我绕过了“无”异常。【参考方案2】:

以0x0FFF 的回答为基础,我能够运行我的代码。我仍然没有看到违规文件的违规行,但我比以前更接近了。这是我所做的,从我的问题中的代码开始:

def checkNone(x):
    try:
        return "OK" not in x['status']
    except:
        return True

harmonics = sc.textFile(files) \
              .map(lambda x: loadRecord(x, fields, dial)) \
              .filter(lambda x: checkNone(x))

【讨论】:

您是否尝试过重新划分滤波后的谐波,harmonics = harmonics.repartition(numberleft)

以上是关于PySpark:在“NoneType”对象上过滤掉 RDD 元素失败是不可迭代的的主要内容,如果未能解决你的问题,请参考以下文章

从列表中删除 NoneType 元素的本机 Python 函数?

如何在过滤器pyspark RDD中过滤掉某种模式[重复]

Pyspark 使用 .filter() 过滤掉空列表

如何从 pyspark.rdd.PipelinedRDD 中过滤掉值?

过滤掉pyspark RDD中的非数字值

AttributeError:'NoneType'对象没有属性'upper'[关闭]