在 Pyspark 中使用 contains 和 udf 时出现问题:AttributeError: 'NoneType' object has no attribute 'lower'

Posted

技术标签:

【中文标题】在 Pyspark 中使用 contains 和 udf 时出现问题:AttributeError: \'NoneType\' object has no attribute \'lower\'【英文标题】:Problem in using contains and udf in Pyspark: AttributeError: 'NoneType' object has no attribute 'lower'在 Pyspark 中使用 contains 和 udf 时出现问题:AttributeError: 'NoneType' object has no attribute 'lower' 【发布时间】:2020-02-24 13:14:24 【问题描述】:

我有 2 个数据框,df1 和 df2:

df1:

+-------------------+----------+------------+
|         df1.name  |df1.state | df1.pincode|
+-------------------+----------+------------+
|  CYBEX INTERNATION| HOUSTON  | 00530      |
|        FLUID POWER| MEDWAY   | 02053      |
|   REFINERY SYSTEMS| FRANCE   | 072234     |
|    K N ENTERPRISES| MUMBAI   | 100010     |
+-------------------+----------+------------+

df2:

+--------------------+------------+------------+
|           df2.name |df2.state   | df2.pincode|
+--------------------+------------+------------+
|FLUID POWER PVT LTD | MEDWAY     | 02053      |
|  CYBEX INTERNATION | HOUSTON    | 02356      |
|REFINERY SYSTEMS LTD| MUMBAI     | 072234     |
+--------------------+------------+------------+

我的工作是验证 df1 中的数据是否存在于 df2 上,如果确实 validate = 1 则 validate = 0。 现在我在条件、状态和 Pincode 上运行一些连接操作,对于字符串比较,我首先将字符串转换为小写,排序并使用 Python 序列匹配。 预期输出为:

+-------------------+-------------------+----------+------------+------------+
|           df1.name|df2.name           |df1.state | df1.pincode|  Validated |
+-------------------+-------------------+----------+------------+------------+
|  CYBEX INTERNATION| NULL              |HOUSTON   | 00530      |     0      |
|        FLUID POWER|FLUID POWER PVT LTD|MEDWAY    | 02053      |     1      |
|   REFINERY SYSTEMS| NULL              |FRANCE    | 072234     |     0      |
|    K N ENTERPRISES| NULL              |MUMBAI    | 100010     |     0      |
+-------------------+-------------------+----------+------------+------------+

我有我的代码:

from pyspark.sql.types import *
from difflib import SequenceMatcher
from pyspark.sql.functions import col,when,lit,udf

contains = udf(lambda s, q: SequenceMatcher(None,"".join(sorted(s.lower())), "".join(sorted(q.lower()))).ratio()>=0.9, BooleanType())
join_condition = ((col("df1.pincode") == col("df2.pincode")) & (col("df1.state") == col("df2.state")))
result_df = df1.alias("df1").join(df2.alias("df2"), join_condition , "left").where(contains(col("df1.name"), col("df2.name")))
result = result_df.select("df1.*",when(col("df2.name").isNotNull(), lit(1)).otherwise(lit(0)).alias("validated"))
result.show()

但是输出给了我 AttributeError: 'NoneType' 对象没有属性 'lower' 我知道不匹配的列是 Null 所以这就是为什么 s.lower() 和 p.lower() 不起作用,但是如何解决这个问题。我只想要包含中的这个条件,来做过滤过程。

另外,我需要在结果中包含 df2.name 列,因为我在列表中给出了 col 名称:

cols = ["df1.name","df2.name","df1.state","df1.pincode"]
result = result_df.select(*cols,when(col("df2.name").isNotNull(), lit(1)).otherwise(lit(0)).alias("validated"))

但我又遇到了一个错误: SyntaxError:只有命名参数可以跟在 *expression 之后

任何帮助将不胜感激。谢谢。

【问题讨论】:

请提供一个可重现的例子。花点时间了解如何发布 spark 问题:***.com/questions/48427185/… 嗨@YOLO 我也更新了我的问题陈述和代码。如果我应用过滤器,我会得到结果,即 result.filter("validate>0").show() 但对于 result.show() 我会出错。 【参考方案1】:

在您的 UDF 中,您使用的是 .lower 方法。这个方法是str对象的方法。显然,在您的 Dataframe 中,您在 df1.namedf2.name 的某个位置有一些 None 值。

用这样的东西替换你当前的 UDF 来处理 None :

contains = udf(
    lambda s, q: SequenceMatcher(
        None,
        "".join(sorted((s or "").lower())), 
        "".join(sorted((q or "").lower()))
    ).ratio()>=0.9, BooleanType()
)

【讨论】:

非常感谢@Steven 的回复。我会尝试使用我的代码,如果有帮助,我会排除你的答案。

以上是关于在 Pyspark 中使用 contains 和 udf 时出现问题:AttributeError: 'NoneType' object has no attribute 'lower'的主要内容,如果未能解决你的问题,请参考以下文章

从文件中读取规则并将这些规则应用于 pyspark 数据框行

如何使用 databricks 将一个 azure 数据湖容器中的所有内容传输到另一个容器?

如何在 Pyspark 中使用 groupby 和数组元素?

在 pyspark 作业中发送和使用 virtualenv

Pyspark 和 BigQuery 在 Google Dataproc 中使用两个不同的项目 ID

在 pyspark 中使用 UDF 和简单数据帧