使用 ArrayType 列将 UDF 重写为 pandas udf

Posted

技术标签:

【中文标题】使用 ArrayType 列将 UDF 重写为 pandas udf【英文标题】:Rewrite UDF to pandas udf with ArrayType column 【发布时间】:2020-11-22 23:14:38 【问题描述】:

我正在尝试将 UDF 重写为 pandas UDF。

但是,当涉及到其中包含 ArrayType 的列时。我正在努力寻找正确的解决方案。

我有一个如下的数据框:

+-----------+--------------------+
|      genre|                 ids|
+-----------+--------------------+
|      Crime|[6, 22, 42, 47, 5...|
|    Romance|[3, 7, 11, 15, 17...|
|   Thriller|[6, 10, 16, 18, 2...|
|  Adventure|[2, 8, 10, 15, 29...|
|   Children|[1, 2, 8, 13, 34,...|
|      Drama|[4, 11, 14, 16, 1...|
|        War|[41, 110, 151, 15...|
|Documentary|[37, 77, 99, 108,...|
|    Fantasy|[2, 56, 60, 126, ...|
|    Mystery|[59, 113, 123, 16...|
+-----------+--------------------+

以下 UDF 运行良好:

pairs_udf = udf(lambda x: itertools.combinations(x, 2), transformer.schema)
df = df.select("genre", pairs_udf("ids").alias("ids"))

输出如下:

+-----------+--------------------+
|      genre|                 ids|
+-----------+--------------------+
|      Crime|[[6, 22], [6, 42]...|
|    Romance|[[3, 7], [3, 11],...|
|   Thriller|[[6, 10], [6, 16]...|
|  Adventure|[[2, 8], [2, 10],...|
|   Children|[[1, 2], [1, 8], ...|
|      Drama|[[4, 11], [4, 14]...|
|        War|[[41, 110], [41, ...|
|Documentary|[[37, 77], [37, 9...|
|    Fantasy|[[2, 56], [2, 60]...|
|    Mystery|[[59, 113], [59, ...|
+-----------+--------------------+

但是,在pandas udf 中编写函数时,什么是等效的。

PS:我明白,或者,我可以使用交叉连接来实现相同的结果。

但是,我更好奇 pandas udf 如何处理带有 ArrayType 的列。

【问题讨论】:

可能类似于lambda row: row.apply(lambda x: itertools.combinations(x, 2)) 谢谢@mck,这是我的尝试之一。现在我认为我遇到的问题与java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available 更相关。经过几次谷歌搜索,它似乎与 java 11 和 spark-arrow 支持有关。这可能属于一个单独的问题。 【参考方案1】:

我将在这里分享我的发现:

为了让 pandas udf 为您的项目工作,有 3 个方面:

1。 pandas UDF,或者更准确地说,Apache Arrow 不像常见的 udf 那样支持复杂类型。(截至pyspark 3.0.1pyarrow 2.0.0

例如:

ArrayType(StringType()) 受 pandas udf 支持。 不支持ArrayType(StructType([...]))。 您可以查看更多:https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#supported-sql-types

2。如果您运行的是 Java 11,这是 (py)Spark 3 中的默认设置。您需要在 spark 配置中添加以下内容:

spark.driver.extraJavaOptions='-Dio.netty.tryReflectionSetAccessible=true'
spark.executor.extraJavaOptions='-Dio.netty.tryReflectionSetAccessible=true'

这将解决上面提到的java.lang.UnsupportedOperationException

3。确保将您的虚拟环境 python 路径添加到您的 pyspark_python

environ['PYSPARK_PYTHON']='./your/virtual/enviroment/path'

【讨论】:

以上是关于使用 ArrayType 列将 UDF 重写为 pandas udf的主要内容,如果未能解决你的问题,请参考以下文章

从 UDF 返回 StructType 的 ArrayType 时出错(并在多个 UDF 中使用单个函数)

如何在pyspark中将字符串值转换为arrayType

spark中混合数据的ArrayType

Scala Spark 中的 udf 运行时错误

如何为数据框中的复杂列创建包含数组(案例类)的udf

使用 UDF 及其性能的 Spark Scala 数据集验证