为啥运行 pandas_udf 时 Pyspark 失败?
Posted
技术标签:
【中文标题】为啥运行 pandas_udf 时 Pyspark 失败?【英文标题】:Why is failing Pyspark when running an pandas_udf?为什么运行 pandas_udf 时 Pyspark 失败? 【发布时间】:2019-08-08 12:43:26 【问题描述】:在 PySpark 中运行 pandas UDF 时出现此错误。这是使用外部库textdistance的UDF:
def algoritmos_comparacion(num_serie_rec, num_serie_exp):
d = textdistance.hamming(num_serie_rec, num_serie_exp)
return str(d)
然后我注册函数:
algoritmos_comparacion_udf = f.pandas_udf(algoritmos_comparacion, StringType())
最后我使用了这个 udf:
df.withColumn("hamming", algoritmos_comparacion_udf(f.col("num_serie_exp"), f.col("num_serie_rec")))
我已经安装了 pandas 和 pyarrow 0.8.0 版。我收到此错误:
TypeError: 'Series' objects are mutable, thus they cannot be hashed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/DATOS/var/log/hadoop/yarn/local/usercache/bguser/appcache/application_1563894657824_0447/container_e66_1563894657824_0447_01_000002/pyspark.zip/pyspark/worker.py", line 235, in main
process()
File "/DATOS/var/log/hadoop/yarn/local/usercache/bguser/appcache/application_1563894657824_0447/container_e66_1563894657824_0447_01_000002/pyspark.zip/pyspark/worker.py", line 230, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/DATOS/var/log/hadoop/yarn/local/usercache/bguser/appcache/application_1563894657824_0447/container_e66_1563894657824_0447_01_000002/pyspark.zip/pyspark/serializers.py", line 267, in dump_stream
for series in iterator:
File "<string>", line 1, in <lambda>
File "/DATOS/var/log/hadoop/yarn/local/usercache/bguser/appcache/application_1563894657824_0447/container_e66_1563894657824_0447_01_000002/pyspark.zip/pyspark/worker.py", line 92, in <lambda>
return lambda *a: (verify_result_length(*a), arrow_return_type)
File "/DATOS/var/log/hadoop/yarn/local/usercache/bguser/appcache/application_1563894657824_0447/container_e66_1563894657824_0447_01_000002/pyspark.zip/pyspark/worker.py", line 83, in verify_result_length
result = f(*a)
File "/DATOS/var/log/hadoop/yarn/local/usercache/bguser/appcache/application_1563894657824_0447/container_e66_1563894657824_0447_01_000002/pyspark.zip/pyspark/util.py", line 55, in wrapper
return f(*args, **kwargs)
File "/home/bguser/SII-IVA/jobs/caso3/caso3.py", line 39, in algoritmos_comparacion
File "/DATOS/var/log/hadoop/yarn/local/usercache/bguser/appcache/application_1563894657824_0447/container_e66_1563894657824_0447_01_000002/virtualenv_application_1563894657824_0447_0/lib/python3.6/site-packages/textdistance/algorithms/edit_based.py", line 49, in __call__
result = self.quick_answer(*sequences)
File "/DATOS/var/log/hadoop/yarn/local/usercache/bguser/appcache/application_1563894657824_0447/container_e66_1563894657824_0447_01_000002/virtualenv_application_1563894657824_0447_0/lib/python3.6/site-packages/textdistance/algorithms/base.py", line 91, in quick_answer
if self._ident(*sequences):
File "/DATOS/var/log/hadoop/yarn/local/usercache/bguser/appcache/application_1563894657824_0447/container_e66_1563894657824_0447_01_000002/virtualenv_application_1563894657824_0447_0/lib/python3.6/site-packages/textdistance/algorithms/base.py", line 110, in _ident
if e1 != e2:
File "/DATOS/var/log/hadoop/yarn/local/usercache/bguser/appcache/application_1563894657824_0447/container_e66_1563894657824_0447_01_000002/virtualenv_application_1563894657824_0447_0/lib/python3.6/site-packages/pandas/core/generic.py", line 1556, in __nonzero__
self.__class__.__name__
ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().
我该如何解决这个错误?
要重现它,您可以使用库 textdistance 的任何算法运行 pandas_udf。例如:
import textdistance
import pyspark.sql.functions as f
def algoritmos_comparacion(num_serie_rec, num_serie_exp):
data =
algoritmos =
"hamming":textdistance.hamming,
"levenshtein":textdistance.levenshtein,
"damerau_levenshtein":textdistance.damerau_levenshtein,
"jaro":textdistance.jaro,
"mlipns":textdistance.mlipns,
"strcmp95":textdistance.strcmp95,
"needleman_wunsch":textdistance.needleman_wunsch,
"gotoh":textdistance.gotoh,
"smith_waterman":textdistance.smith_waterman
for name, alg in algoritmos.items():
try:
data[name] = str(alg(num_serie_rec, num_serie_exp))
except:
data[name] = "ERROR"
return data
algoritmos_comparacion_udf=f.pandas_udf(algoritmos_comparacion,MapType(StringType(),StringType()))
dataframe.withColumn("algorithms", algoritmos_comparacion_udf(f.col("a"), f.col("b")))
谢谢。
【问题讨论】:
Truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all()的可能重复 您是否通过 col("num_serie_exp") 和 col("num_serie_rec") 传递可迭代对象? @mazaneicha 不重复,因为我不使用任何布尔运算符。 你能创建一个 MVCE 吗?在这种情况下会更容易为您提供帮助。 @johnckane 是的,在带有 lambda 的 pandas udf 中使用函数 apply 和 python 函数。 func_udf = f.pandas_udf(lambda s: s.apply(func)) 【参考方案1】:解决了这个问题:
algoritmos_comparacion_udf=f.pandas_udf(lambda s: s.apply(algoritmos_comparacion),MapType(StringType(),StringType()))
dataframe.withColumn("algorithms", algoritmos_comparacion_udf(f.col("a"), f.col("b")))
【讨论】:
看来MapType
不再支持了,docs.databricks.com/spark/latest/spark-sql/… 我可以使用常规的Spark SQL udf,而不是pandas_udf 来计算。
当然你得用pandas_udf的其他类型以上是关于为啥运行 pandas_udf 时 Pyspark 失败?的主要内容,如果未能解决你的问题,请参考以下文章
在 PySpark 中使用 pandas_udf 时无法填充数组
Pyspark - 调用 pandas_udf 时出错,结果返回 Series.interpolate()
如何在 pyspark.sql.functions.pandas_udf 和 pyspark.sql.functions.udf 之间进行选择?
PySpark pandas_udfs java.lang.IllegalArgumentException错误