如何在 Pyspark 中启用 Apache Arrow

Posted

技术标签:

【中文标题】如何在 Pyspark 中启用 Apache Arrow【英文标题】:how to enable Apache Arrow in Pyspark 【发布时间】:2020-02-04 17:21:34 【问题描述】:

我正在尝试启用 Apache Arrow 以转换为 Pandas。我正在使用:

pyspark 2.4.4 pyarrow 0.15.0 熊猫 0.25.1 numpy 1.17.2

这是示例代码

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
x = pd.Series([1, 2, 3])
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

我收到了这条警告信息

c:\users\administrator\appdata\local\programs\python\python37\lib\site-packages\pyspark\sql\session.py:714: UserWarning: createDataFrame attempted Arrow optimization because 'spark.sql.execution.arrow.enabled' is set to true; however, failed by the reason below:
  An error occurred while calling z:org.apache.spark.sql.api.python.PythonSQLUtils.readArrowStreamFromFile.
: java.lang.IllegalArgumentException
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
    at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543)
    at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$3.readNextBatch(ArrowConverters.scala:243)
    at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$3.<init>(ArrowConverters.scala:229)
    at org.apache.spark.sql.execution.arrow.ArrowConverters$.getBatchesFromStream(ArrowConverters.scala:228)
    at org.apache.spark.sql.execution.arrow.ArrowConverters$$anonfun$readArrowStreamFromFile$2.apply(ArrowConverters.scala:216)
    at org.apache.spark.sql.execution.arrow.ArrowConverters$$anonfun$readArrowStreamFromFile$2.apply(ArrowConverters.scala:214)
    at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2543)
    at org.apache.spark.sql.execution.arrow.ArrowConverters$.readArrowStreamFromFile(ArrowConverters.scala:214)
    at org.apache.spark.sql.api.python.PythonSQLUtils$.readArrowStreamFromFile(PythonSQLUtils.scala:46)
    at org.apache.spark.sql.api.python.PythonSQLUtils.readArrowStreamFromFile(PythonSQLUtils.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

Attempting non-optimization as 'spark.sql.execution.arrow.fallback.enabled' is set to true.
  warnings.warn(msg)

【问题讨论】:

pandasUDF and pyarrow 0.15.0的可能重复 【参考方案1】:

用于在我的 Spark 2.4.4 集群中使用 pyarrow==0.15 调用我的 pandas UDF。我努力成功地设置了上面提到的ARROW_PRE_0_15_IPC_FORMAT=1 标志。

我在 (1) 命令行中通过 export 在头节点上设置标志,(2) 通过 spark-env.shyarn-env.sh 在集群中的所有节点上,以及 (3) 在 pyspark 代码本身中来自我在头节点上的脚本。由于未知原因,这些都无法在 udf 中实际设置此标志。

我找到的最简单的解决方案是将其 inside 称为 udf:

    @pandas_udf("integer", PandasUDFType.SCALAR)
    def foo(*args):
        import os
        os.environ["ARROW_PRE_0_15_IPC_FORMAT"] = "1"
        #...

希望这可以为其他人节省几个小时。

【讨论】:

谢谢!其他解决方案对我们不起作用,但确实如此。 AWS EMR 2.4.x,所以在 EMR 上的配置设置可能不像在其他 spark 解决方案上那么容易。 这为我节省了几个小时。出于某种原因,我的开发端点有 pyarrow 0.13.0 工作正常,而 AWS Glue 集群有 0.16.0 失败。在我们正确修复它之前,此解决方案是一个很好的解决方法。 在启动您的工作时使用--conf spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT=1。如果您使用的是 AWS Glue,请使用“作业参数”部分(键/值属性)传递它。键为--conf,值为spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT=1【参考方案2】:

我们在 0.15.0 中进行了更改,使 pyarrow 的默认行为与 Java 中的旧版本 Arrow 不兼容——您的 Spark 环境似乎使用的是旧版本。

你的选择是

在您使用 Python 的位置设置环境变量 ARROW_PRE_0_15_IPC_FORMAT=1 暂时降级到 pyarrow

【讨论】:

哇,这已经困扰我好几天了!我有一个非常简单的带有 PandasUDF 方法的 groupBy.apply 语句,它在以前没有的地方不断失败。原来构建环境已经开始使用 0.15 ><.> 这就是答案。请在下面打勾:)谢谢@Wes McKinney 这个答案仍然是最新的吗?

以上是关于如何在 Pyspark 中启用 Apache Arrow的主要内容,如果未能解决你的问题,请参考以下文章

Apache Spark:如何在Python 3中使用pyspark

如何在 Apache livy 中提交 pyspark 作业?

如何在 Apache Spark (pyspark) 中使用自定义类?

如何在 Apache Spark (PySpark 1.4.1) 中可视化/绘制决策树?

如何在 apache toree pyspark notebook 中更改 python 版本?

如何在 Apache Spark 中向 Kryo Serializer 注册类?