pandas_udf 给出与 pyarrow 相关的错误

Posted

技术标签:

【中文标题】pandas_udf 给出与 pyarrow 相关的错误【英文标题】:pandas_udf giving error related to pyarrow 【发布时间】:2020-08-31 11:55:22 【问题描述】:

我有数据框,我想在 pysaprk 中使用折线库获取给定地理位置的 lat_long

+-----------------+--------------------+----------+                             
|              vid|        geolocations| trip_date|
+-----------------+--------------------+----------+
|58AC21B17LU006754|eurnE||yqU???????...|2020-02-22|
|2T3EWRFV0LW060632|uocbGfjniOK[Fs@rC...|2020-02-25|
|JTDP4RCE0LJ014008|wwtFpdxtM????Q_@...|2020-02-25|
|4T1BZ1HK8KU029845|rz_Dp~hhN?@?@???...|2020-03-03|

我正在使用 pandas_udf 并且 apache 箭头已启用

from pyspark.sql.functions import col, pandas_udf
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled", "true")

lat_long_udf = pandas_udf(lambda geoloc:  polyline.decode(geoloc)[0],ArrayType(StringType()))
df1=df.withColumn('lat_long',lat_long_udf(df.geolocations))

当调用 df.count() 时给出结果,但在执行 df.show() 时,出现如下错误:

 248, in init_stream_yield_batches
    for series in iterator:
  File "/Users/prantik.pariksha/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/Users/prantik.pariksha/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr>
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/Users/prantik.pariksha/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 110, in <lambda>
    verify_result_type(f(*a)), len(a[0])), arrow_return_type)
  File "/Users/prantik.pariksha/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<stdin>", line 1, in <lambda>
  File "/Users/prantik.pariksha/opt/anaconda3/lib/python3.8/site-packages/polyline/__init__.py", line 16, in decode
    return PolylineCodec().decode(expression, precision, geojson)
  File "/Users/prantik.pariksha/opt/anaconda3/lib/python3.8/site-packages/polyline/codec.py", line 43, in decode
    lat_change, index = self._trans(expression, index)
  File "/Users/prantik.pariksha/opt/anaconda3/lib/python3.8/site-packages/polyline/codec.py", line 31, in _trans
    byte = ord(value[index]) - 63
TypeError: ord() expected a character, but string of length 87 found




>>> print(pandas.__version__)
1.1.1
>>> print(numpy.__version__)
1.19.1
>>> import pyarrow
>>> print(pyarrow.__version__)
1.0.1

【问题讨论】:

【参考方案1】:

您很可能会收到此错误,因为 pandas_udf 将 pandas 系列作为输入,并且您将 decode 函数直接应用于该系列,而不是将其应用于熊猫系列中的值。

例如在下面的示例中,我稍微扩展了您的 lambda 函数,以便您可以看到它。我采用 pandas 系列,将 polyline.decode 函数应用于该系列,然后再次返回结果系列。请注意,我还将返回类型更改为 ArrayType(DoubleType()) 而不是 ArrayType(StringType())

import pandas as pd

from pyspark.sql.types import ArrayType, DoubleType

....


df = spark.createDataFrame([["~sqU__pR_jpv@_pR"], ["_~t[__pR~qy@_pR"]], ["geolocations"])


@pandas_udf(ArrayType(DoubleType()))
def lat_long_udf(s: pd.Series) -> pd.Series:
  return s.apply(lambda x: polyline.decode(x)[0])


df1=df.withColumn('decoded', lat_long_udf(df.geolocations))
df1.collect()

【讨论】:

以上是关于pandas_udf 给出与 pyarrow 相关的错误的主要内容,如果未能解决你的问题,请参考以下文章

带有 PySpark 2.4 的 Pandas UDF [重复]

Pyarrow.lib.Schema 与 pyarrow.parquet.Schema

使用 pandas_udf 和 Parquet 序列化时内存泄漏?

将 pandas_udf 与 spark 2.2 一起使用

pandas_udf 与 pyspark 3.0 的 scipiy.find_peaks 结果不一致

pandas_udf结果无法写入表