pyspark:用于确定 NaN 或 Null 的用户定义函数不起作用

Posted

技术标签:

【中文标题】pyspark:用于确定 NaN 或 Null 的用户定义函数不起作用【英文标题】:pyspark: user-defined function for determining NaN or Null is not working 【发布时间】:2018-04-16 08:18:37 【问题描述】:

我正在尝试在 pyspark 中编写一个用户定义的函数,用于确定数据帧中的给定条目是否错误(Null 或 NaN)。我似乎无法弄清楚我在这个函数中做错了什么:

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *

def is_bad(value):
   if (value != value | (value.isNull())):
       return True
   else:
       return False

isBadEntry = UserDefinedFunction(lambda x: is_bad(x),BooleanType())

df_test = sql.createDataFrame([(1,1,None ), (1,2, 5), (1,3, None), (1,4, None), (1,5, 10), (1,6,None )], ('session',"timestamp", "id"))
df_test =df_test.withColumn("testing", isBadEntry(df_test.id)).show()

这是因为一个神秘的错误而崩溃:

Py4JJavaErrorTraceback (most recent call last)
<ipython-input-379-b4109047ba40> in <module>()
  1 df_test = sql.createDataFrame([(1,1,None ), (1,2, 5), (1,3, None), (1,4, None), (1,5, 10), (1,6,None )], ('session',"timestamp", "id"))
  2 #df_test.show()
----> 3 df_test =df_test.withColumn("testing", isBadEntry(df_test.id)).show()

/usr/local/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate)
285         +---+-----+
286         """
--> 287         print(self._jdf.showString(n, truncate))
288 
289     def __repr__(self):

/usr/local/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

 /usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
 61     def deco(*a, **kw):
 62         try:
---> 63             return f(*a, **kw)
 64         except py4j.protocol.Py4JJavaError as e:
 65             s = e.java_exception.toString()

/usr/local/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
317                 raise Py4JJavaError(
318                     "An error occurred while calling 012.\n".
--> 319                     format(target_id, ".", name), value)
320             else:
321                 raise Py4JError(

Py4JJavaError: An error occurred while calling o29884.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 43.0 failed 4 times, most recent failure: Lost task 0.3 in stage 43.0 (TID 167, 172.16.193.79): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
process()
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in <lambda>
 func = lambda _, it: map(mapper, it)
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 92, in <lambda>
mapper = lambda a: udf(*a)
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
return lambda *a: f(*a)
 File "<ipython-input-378-2aac40340a6c>", line 14, in <lambda>
 File "<ipython-input-378-2aac40340a6c>", line 9, in is_bad
AttributeError: 'NoneType' object has no attribute 'isNull'

有人可以帮忙吗?

【问题讨论】:

value.isNull() 更改为value is None 【参考方案1】:

正如 Psidom 在评论中暗示的那样,在 Python 中,NULL 对象是单例 None (source);按以下方式更改功能可以:

def is_bad(value):
   if (value != value) | (value is None):
       return True
   else:
       return False

isBadEntry = UserDefinedFunction(lambda x: is_bad(x),BooleanType())
df_test.withColumn("testing", is_bad(df_test.id)).show()
# +-------+---------+----+-------+ 
# |session|timestamp|  id|testing|
# +-------+---------+----+-------+
# |      1|        1|null|   true|
# |      1|        2|   5|  false|
# |      1|        3|null|   true|
# |      1|        4|null|   true|
# |      1|        5|  10|  false|
# |      1|        6|null|   true|
# +-------+---------+----+-------+

也适用于NaN

from pyspark.sql import Row

# toy data:
df = spark.createDataFrame([Row(1.0, 7., None),
                          Row(2., 4., float('nan')),
                          Row(3., 3., 5.0),
                          Row(4., 1., 4.0),
                          Row(5., 1., 1.0)],
                          ["col_1", "col_2", "col_3"])

df.withColumn("testing", isBadEntry(df.col_3)).show()
# +-----+-----+-----+-------+ 
# |col_1|col_2|col_3|testing|
# +-----+-----+-----+-------+ 
# |  1.0|  7.0| null|   true|
# |  2.0|  4.0|  NaN|   true|
# |  3.0|  3.0|  5.0|  false|
# |  4.0|  1.0|  4.0|  false|
# |  5.0|  1.0|  1.0|  false|
# +-----+-----+-----+-------+

【讨论】:

以上是关于pyspark:用于确定 NaN 或 Null 的用户定义函数不起作用的主要内容,如果未能解决你的问题,请参考以下文章

使用 Pyspark 计算 Spark 数据帧每列中非 NaN 条目的数量

使用带有熊猫数据的 CreateDataFrame 时将 NaN 替换为 null

null,undefined,NaN的区别

Pyspark SQL选择列为NaN的数据[重复]

如何迭代或递归确定二维数组中的邻居?

JS杂谈