无法在 pyspark 中应用标量 pandas udf

Posted

技术标签:

【中文标题】无法在 pyspark 中应用标量 pandas udf【英文标题】:Can't apply scalar pandas udf in pyspark 【发布时间】:2020-04-02 09:31:04 【问题描述】:

我正在尝试使用 pandas_udf 将 python 函数应用于数据帧。这是函数:

import html2text

class Udfs(object):
    def __init__(self):
        self.h2t = html2text.HTML2Text()
        self.h2t.ignore_links = True
        self.h2t.ignore_images = True

    def extract_text(self, raw_text):
        try:
            texto = h2t.handle(raw_text)
        except:
            texto = "PARSE HTML ERROR"
        return texto

我在udf的每个熊猫系列中应用函数extract_text,如下所示:

extract_text_udf = f.pandas_udf(lambda s : s.apply(udfs.extract_text), t.StringType())
df = df.withColumn("texto", extract_text_udf(f.col("html_raw")))

然后我得到以下错误:

Traceback (most recent call last):
  File "process_info.py", line 70, in <module>
    row_count = info_df.count()
  File "/mnt/yarn/usercache/hadoop/appcache/application_1585817706233_0001/container_1585817706233_0001_01_000001/pyspark.zip/pyspark/sql/dataframe.py", line 523, in count
  File "/mnt/yarn/usercache/hadoop/appcache/application_1585817706233_0001/container_1585817706233_0001_01_000001/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/mnt/yarn/usercache/hadoop/appcache/application_1585817706233_0001/container_1585817706233_0001_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/mnt/yarn/usercache/hadoop/appcache/application_1585817706233_0001/container_1585817706233_0001_01_000001/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o123.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 2.0 failed 4 times, most recent failure: Lost task 5.3 in stage 2.0 (TID 2327, ip-10-2-6-163.eu-west-1.compute.internal, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt/yarn/usercache/hadoop/appcache/application_1585817706233_0001/container_1585817706233_0001_01_000004/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/mnt/yarn/usercache/hadoop/appcache/application_1585817706233_0001/container_1585817706233_0001_01_000004/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1585817706233_0001/container_1585817706233_0001_01_000004/pyspark.zip/pyspark/serializers.py", line 287, in dump_stream
    batch = _create_batch(series, self._timezone)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1585817706233_0001/container_1585817706233_0001_01_000004/pyspark.zip/pyspark/serializers.py", line 256, in _create_batch
    arrs = [create_array(s, t) for s, t in series]
  File "/mnt/yarn/usercache/hadoop/appcache/application_1585817706233_0001/container_1585817706233_0001_01_000004/pyspark.zip/pyspark/serializers.py", line 256, in <listcomp>
    arrs = [create_array(s, t) for s, t in series]
  File "/mnt/yarn/usercache/hadoop/appcache/application_1585817706233_0001/container_1585817706233_0001_01_000004/pyspark.zip/pyspark/serializers.py", line 254, in create_array
    return pa.Array.from_pandas(s, mask=mask, type=t, safe=False)
  File "pyarrow/array.pxi", line 755, in pyarrow.lib.Array.from_pandas
  File "pyarrow/array.pxi", line 265, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 80, in pyarrow.lib._ndarray_to_array
  File "pyarrow/error.pxi", line 84, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Input object was not a NumPy array

如何使用 pandas_udf 将函数 extract_text 应用到我的数据框?

【问题讨论】:

【参考方案1】:

我发现了问题。问题是 Apache Arrow 的版本。在 2.4.5 版本之前箭头版本 > 0.14.0 失败。

【讨论】:

以上是关于无法在 pyspark 中应用标量 pandas udf的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark 中使用 pandas_udf 时无法填充数组

pandas 比较引发 TypeError:无法将 dtyped [float64] 数组与 [bool] 类型的标量进行比较

在 pandas 数据帧上应用 Pyspark 管道

使用 pyspark 在某些类中应用函数时引发“PicklingError”错误

将 PySpark 数据框的列与标量相乘

为啥我的应用程序不能以 pandas_udf 和 PySpark+Flask 开头?