spark UDF 结果可以“显示”,但不能“过滤”

Posted

技术标签:

【中文标题】spark UDF 结果可以“显示”,但不能“过滤”【英文标题】:spark UDF result can do 'show', but can't do 'filter' 【发布时间】:2018-11-21 07:08:19 【问题描述】:

spark UDF 在我执行 show() 时有效,但是当我在 UDF 结果上执行 filter 时它给了我错误。

udf 函数

def chkInterPunctuation(sent) :
    for char in sent[1:-2] : 
        if char in ["\"", "'", ".", "!", "?"] :
            return True
    return False

cip = udf(chkInterPunctuation, BooleanType())

show() 有效

df_punct = dfs.withColumn("in_length", length("input")).\
withColumn("out_length", length("output")).withColumn("cip", cip(col("input")))
df_punct.show()

但是当我做filter时它给了我错误

df_punct.where(col("cip") == True).show()

这些是filter 错误

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-171-e206ffd07f75> in <module>()
----> 1 df_punct.where(col("cip") == True).collect()

/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in collect(self)
    308         """
    309         with SCCallSiteSync(self._sc) as css:
--> 310             port = self._jdf.collectToPython()
    311         return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
    312 

/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    931         answer = self.gateway_client.send_command(command)
    932         return_value = get_return_value(
--> 933             answer, self.gateway_client, self.target_id, self.name)
    934 
    935         for temp_arg in temp_args:

/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    310                 raise Py4JJavaError(
    311                     "An error occurred while calling 012.\n".
--> 312                     format(target_id, ".", name), value)
    313             else:
    314                 raise Py4JError(

Py4JJavaError: An error occurred while calling o3378.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 40 in stage 238.0 failed 1 times, most recent failure: Lost task 40.0 in stage 238.0 (TID 8862, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
    process()
  File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 106, in <lambda>
    func = lambda _, it: map(mapper, it)
  File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 92, in <lambda>
    mapper = lambda a: udf(*a)
  File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
    return lambda *a: f(*a)
  File "<ipython-input-153-fce920dc0de2>", line 5, in chkInterPunctuation
TypeError: 'NoneType' object has no attribute '__getitem__'

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
    at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:124)
    at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:68)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:103)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:893)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:892)
    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)
    at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2513)
    at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2513)
    at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2513)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
    at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532)
    at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2512)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:211)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
    process()
  File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 106, in <lambda>
    func = lambda _, it: map(mapper, it)
  File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 92, in <lambda>
    mapper = lambda a: udf(*a)
  File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
    return lambda *a: f(*a)
  File "<ipython-input-153-fce920dc0de2>", line 5, in chkInterPunctuation
TypeError: 'NoneType' object has no attribute '__getitem__'

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
    at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:124)
    at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:68)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:103)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

我的谷歌搜索建议py4j 错误通常发生在UDF 函数没有返回正确值或有错误时。但我的UDF function 总是返回真或假。此外,当我显示时,火花查询会返回正确的值。这对我来说没有意义。可能的原因是什么?

提前感谢您

【问题讨论】:

错误可能是由于某些 udf 输入太短造成的吗?还是空的? 你能用一些输入数据更新你的问题吗? TypeError: 'NoneType' object has no attribute ' getItem,在我看来你只需要将 cip 列转换为 BooleanType 在你做当然的过滤之前 在不确定的情况下,我猜show() 在 pyspark 中的工作方式与在常规 spark (scala) 中的工作方式相同。在正常的 spark 中,show - 与collect 不同,将不会强制评估整个数据帧,而只会评估一小部分数据。仅足以显示结果集的一小部分。我怀疑您的show 命令处理的某些行中的输入数据可能存在问题,但您的filter(...).collect() 命令确实会处理这些数据。也许尝试做一个df_punct.collect()(无过滤器)而不是df_punct.show()来验证这一点? 【参考方案1】:

发生这种情况是因为您没有更正NULL 的存在。试试:

def chkInterPunctuation(sent) :
    if not sent: return   # In None return
    for char in sent[1:-2] : 
        if char in ["\"", "'", ".", "!", "?"] :
            return True
    return False

【讨论】:

【参考方案2】:

问题已通过更新到 spark2.4.0 得到解决。 (我用的是2.0.0)

【讨论】:

【参考方案3】:

这是一个工作示例:

df = spark.sql(
  """
  select 'gjgjgjgjgjg' as word
  union 
    select 'lalalal?ala' as word
  union 
    select 'ryryry.ryry' as word  
  """)

df.createOrReplaceTempView("words")

from pyspark.sql.types import BooleanType
from pyspark.sql.functions import col
def chkInterPunctuation(sent) :
    for char in sent[1:-2] : 
        if char in ["\"", "'", ".", "!", "?"] :
            return True
    return False

cip = udf(chkInterPunctuation, BooleanType())

udf_df = df.withColumn("cip", cip(col("word")))

udf_df.where("cip = true").show()

不管怎样,你可以在没有 udf 的情况下做到这一点:

spark.sql("""
SELECT * 
FROM   words 
WHERE  word LIKE '%.%' 
        OR word LIKE '%?%' 
""").show()

【讨论】:

【参考方案4】:

我认为是因为您在申请where() 后使用了show()。 我建议首先应用过滤器并将其保存在一个新变量中,然后执行show()

df_punct_filtered = df_punct.where(col("cip") == True)

df_punct_filtered.show()

更新: 或者您可以使用filter() 函数代替where()

df_punct_filtered = df_punct.filter(df_punct.cip == True)

df_punct_filtered.show()

【讨论】:

我已经尝试了你的建议。但它给了我同样的错误。在我看来,Spark 做惰性求值,所以声明新变量不会改变含义。 对不起,过滤器只是 where 的别名。我试过了,还是不行

以上是关于spark UDF 结果可以“显示”,但不能“过滤”的主要内容,如果未能解决你的问题,请参考以下文章

如何将 Set/HashSet 作为参数传递给 Spark 中的 UDF?

spark 能执行udf 不能执行udaf,啥原因

spark 能执行udf 不能执行udaf,啥原因

spark 能执行udf 不能执行udaf,啥原因

Scala中的Spark分组映射UDF

我们可以在 Spark 中编写配置单元查询吗?UDF