pyspark:TypeError:'float'对象不可迭代
Posted
技术标签:
【中文标题】pyspark:TypeError:\'float\'对象不可迭代【英文标题】:pyspark: TypeError: 'float' object is not iterablepyspark:TypeError:'float'对象不可迭代 【发布时间】:2018-04-22 05:55:54 【问题描述】:我正在编写火花代码,但总是出错:
TypeError: 'float' object is not iterable
就行了reduceByKey()
函数。有人能帮我吗?
这是错误的堆栈跟踪:
d[k] = comb(d[k], v) if k in d else creator(v)
File "/home/hw/SC/SC_spark.py", line 535, in <lambda>
TypeError: 'float' object is not iterable
代码如下:
def field_valid(m):
dis=m[1]
TxP=m[2]
ef=m[3]
pl=m[4]
if TxP != 'NaN' and disl != 'NaN' and ef !='NaN' and pl != 'NaN':
return True
else:
return False
def parse_input(d):
#d=data.split(',')
s_name='S'+d[6] # serving cell name
if d[2] =='NaN' or d[2] == '':
ef='NaN'
else:
ef=float(d[2].strip().rstrip())
if d[7] =='NaN' or d[7] == '' or d[7] == '0':
TxP='NaN'
else:
TxP=float(d[7].strip().rstrip())
if d[9] =='NaN' or d[9] == '':
dis='NaN'
else:
dis=float(d[9].strip().rstrip())
if d[10] =='NaN' or d[10] == '':
pl='NaN'
else:
pl=float(d[10].strip().rstrip())
return s_name,dis, TxP, ef, pl
sc=SparkContext(appName="SC_spark")
lines=sc.textFile(ip_file)
lines=lines.map(lambda m: (m.split(",")))
lines=lines.filter(lambda m: (m[6] != 'cell_name'))
my_rdd=lines.map(parse_input).filter(lambda m: (field_valid(m)==True))
my_rdd=my_rdd.map(lambda x: (x[0],(x[1],x[2])))
my_rdd=my_rdd.reduceByKey(lambda x,y:(max(x[0],y[0]),sum(x[1],y[1]))) #this line got error
这是一些示例数据:
Class,PB,EF,RP,RQ,ID,cell_name,TxP,BW,DIS,PL,geom
NaN,10,5110,-78.0,-7.0,134381669,S417|134381669|5110,62.78151250383644,10,2578.5795095469166,113.0,NaN
NaN,10,5110,-71.0,-6.599999904632568,134381669,S417|134381669|5110,62.78151250383644,10,2689.630258510342,106.0,NaN
NaN,10,5110,-77.0,-7.300000190734863,134381669,S417|134381669|5110,62.78151250383644,10,2907.8184899249713,112.0,19.299999999999983
NaN,10,5110,-91.0,-11.0,134381669,S417|134381669|5110,62.78151250383644,10,2779.96762695867,126.0,5.799999999999997
NaN,10,5110,-90.0,-12.69999980926514,134381669,S417|134381669|5110,62.78151250383644,10,2749.8351648579583,125.0,9.599999999999994
NaN,10,5110,-95.0,-13.80000019073486,134381669,S417|134381669|5110,62.78151250383644,10,2942.7938902934643,130.0,-2.4000000000000057
NaN,10,5110,-70.0,-7.099999904632568,134381669,S417|134381669|5110,62.78151250383644,10,3151.930706017461,105.0,22.69999999999999
【问题讨论】:
我不熟悉pyspark
,但是在发生错误的行中,您使用两个参数调用sum
。除非第一个是可迭代的,第二个是 int,否则您的错误可能就在那里。尝试在 python 控制台上调用sum(1.0, 2)
。它给了我一个非常相似的错误。
嗨@bla,我刚刚测试过,确保所有字段都转换为浮点数。您注意到我在这些值上使用 NaN 过滤了该行,因此该数字仅为浮点数。我还检查了 lambda 函数的语法,我将其分隔为 (k,v)。我没发现有什么不对。有没有发现什么问题?
m.split(",")
到底在做什么?您的数据中没有逗号
@HelenZ 您不能将浮点数作为sum
的第一个参数传递。它期待一个可交互的。看看:docs.python.org/3.5/library/functions.html#sum。我无法确认是这种情况,因为我不确定 x[1]
是不是一个浮点数。但是堆栈跟踪非常相似。
嗨@cricket_007。顺便说一句,我刚改成 x[1]+y[1],它就可以工作了!!我是 spark 新手,还不能区分 spark1 和 spark2。你能告诉我在spark2中怎么做吗?预期结果是相同键的值“dis”的总和和最大值,键是列“cell_name”。
【参考方案1】:
预期结果是值的总和和最大值
在这种情况下,您正在寻找x[1] + y[1]
,而不是使用内置的sum()
函数。
my_rdd.reduceByKey( lambda x,y: ( max(x[0],y[0]), x[1] + y[1] ) )
【讨论】:
嗨@cricket_700,我可以再问一个问题吗?现在我想将结果保存到 .txt 文件中,但我想在 .txt 文件中添加标题,我应该怎么做?我用了这个语句:my_rdd.repartition(1).saveAsTextFile("sc_result/result.txt") 您需要将您的 RDD 与标头 RDD 联合。 ***.com/questions/26157456/…以上是关于pyspark:TypeError:'float'对象不可迭代的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark 数据框:访问列(TypeError:列不可迭代)
# 字符串方法 TypeError: 列在 pyspark 中不可迭代
PySpark:TypeError:“行”对象不支持项目分配