在 Pyspark 中使用 contains 和 udf 时出现问题:AttributeError: 'NoneType' object has no attribute 'lower'
Posted
技术标签:
【中文标题】在 Pyspark 中使用 contains 和 udf 时出现问题:AttributeError: \'NoneType\' object has no attribute \'lower\'【英文标题】:Problem in using contains and udf in Pyspark: AttributeError: 'NoneType' object has no attribute 'lower'在 Pyspark 中使用 contains 和 udf 时出现问题:AttributeError: 'NoneType' object has no attribute 'lower' 【发布时间】:2020-02-24 13:14:24 【问题描述】:我有 2 个数据框,df1 和 df2:
df1:
+-------------------+----------+------------+
| df1.name |df1.state | df1.pincode|
+-------------------+----------+------------+
| CYBEX INTERNATION| HOUSTON | 00530 |
| FLUID POWER| MEDWAY | 02053 |
| REFINERY SYSTEMS| FRANCE | 072234 |
| K N ENTERPRISES| MUMBAI | 100010 |
+-------------------+----------+------------+
df2:
+--------------------+------------+------------+
| df2.name |df2.state | df2.pincode|
+--------------------+------------+------------+
|FLUID POWER PVT LTD | MEDWAY | 02053 |
| CYBEX INTERNATION | HOUSTON | 02356 |
|REFINERY SYSTEMS LTD| MUMBAI | 072234 |
+--------------------+------------+------------+
我的工作是验证 df1 中的数据是否存在于 df2 上,如果确实 validate = 1 则 validate = 0。 现在我在条件、状态和 Pincode 上运行一些连接操作,对于字符串比较,我首先将字符串转换为小写,排序并使用 Python 序列匹配。 预期输出为:
+-------------------+-------------------+----------+------------+------------+
| df1.name|df2.name |df1.state | df1.pincode| Validated |
+-------------------+-------------------+----------+------------+------------+
| CYBEX INTERNATION| NULL |HOUSTON | 00530 | 0 |
| FLUID POWER|FLUID POWER PVT LTD|MEDWAY | 02053 | 1 |
| REFINERY SYSTEMS| NULL |FRANCE | 072234 | 0 |
| K N ENTERPRISES| NULL |MUMBAI | 100010 | 0 |
+-------------------+-------------------+----------+------------+------------+
我有我的代码:
from pyspark.sql.types import *
from difflib import SequenceMatcher
from pyspark.sql.functions import col,when,lit,udf
contains = udf(lambda s, q: SequenceMatcher(None,"".join(sorted(s.lower())), "".join(sorted(q.lower()))).ratio()>=0.9, BooleanType())
join_condition = ((col("df1.pincode") == col("df2.pincode")) & (col("df1.state") == col("df2.state")))
result_df = df1.alias("df1").join(df2.alias("df2"), join_condition , "left").where(contains(col("df1.name"), col("df2.name")))
result = result_df.select("df1.*",when(col("df2.name").isNotNull(), lit(1)).otherwise(lit(0)).alias("validated"))
result.show()
但是输出给了我 AttributeError: 'NoneType' 对象没有属性 'lower' 我知道不匹配的列是 Null 所以这就是为什么 s.lower() 和 p.lower() 不起作用,但是如何解决这个问题。我只想要包含中的这个条件,来做过滤过程。
另外,我需要在结果中包含 df2.name 列,因为我在列表中给出了 col 名称:
cols = ["df1.name","df2.name","df1.state","df1.pincode"]
result = result_df.select(*cols,when(col("df2.name").isNotNull(), lit(1)).otherwise(lit(0)).alias("validated"))
但我又遇到了一个错误: SyntaxError:只有命名参数可以跟在 *expression 之后
任何帮助将不胜感激。谢谢。
【问题讨论】:
请提供一个可重现的例子。花点时间了解如何发布 spark 问题:***.com/questions/48427185/… 嗨@YOLO 我也更新了我的问题陈述和代码。如果我应用过滤器,我会得到结果,即 result.filter("validate>0").show() 但对于 result.show() 我会出错。 【参考方案1】:在您的 UDF 中,您使用的是 .lower
方法。这个方法是str
对象的方法。显然,在您的 Dataframe 中,您在 df1.name
或 df2.name
的某个位置有一些 None
值。
用这样的东西替换你当前的 UDF 来处理 None :
contains = udf(
lambda s, q: SequenceMatcher(
None,
"".join(sorted((s or "").lower())),
"".join(sorted((q or "").lower()))
).ratio()>=0.9, BooleanType()
)
【讨论】:
非常感谢@Steven 的回复。我会尝试使用我的代码,如果有帮助,我会排除你的答案。以上是关于在 Pyspark 中使用 contains 和 udf 时出现问题:AttributeError: 'NoneType' object has no attribute 'lower'的主要内容,如果未能解决你的问题,请参考以下文章
从文件中读取规则并将这些规则应用于 pyspark 数据框行
如何使用 databricks 将一个 azure 数据湖容器中的所有内容传输到另一个容器?
如何在 Pyspark 中使用 groupby 和数组元素?