如何在 Scala Spark 项目中使用 PySpark UDF?

Posted

技术标签:

【中文标题】如何在 Scala Spark 项目中使用 PySpark UDF?【英文标题】:How to use a PySpark UDF in a Scala Spark project? 【发布时间】:2019-01-25 09:28:24 【问题描述】:

很多人(1、2、3)讨论了在 PySpark 应用程序中使用 Scala UDF,通常是出于性能原因。我对相反的情况感兴趣 - 在 Scala Spark 项目中使用 python UDF。

我特别感兴趣的是使用 sklearn(和 MLFlow)构建模型,然后将其有效地应用于 Spark 流作业中的记录。我知道我还可以在 REST API 和 make calls to that API in the Spark streaming application 后面托管 Python 模型 mapPartitions,但是管理该任务的并发性并为托管模型设置 API 并不是我非常兴奋的事情。

如果没有像 Py4J 这样的太多自定义开发,这可能吗?这只是个坏主意吗?

谢谢!

【问题讨论】:

这是可能的,但绝对不支持也不简单。所以问题真的是你为什么要尝试。真的很难为这样的过程找到合理的理由。 @user6910411 感谢您的回复。我解释了问题中的用例 - 我想使用我使用 sklearn 训练的模型来评估结构化流应用程序中的各个行。 我想问题是——如果你已经想为跨语言通信付出代价,为什么不一直使用 PySpark? 在这种情况下,因为 1)python 操作将是一个更大的 Spark 作业的一小部分,我宁愿不为整个事情支付 PySpark 罚款,并且 2)我已经有一个成熟的 Scala 项目,只是想添加一点 python 而不需要重写。 不作为答案提交,但如果您使用 Databricks,您可以在同一个工作中使用 Scala 和 Python。您可以使用 sklearn 跳转到该 UDF 的 Pyspark,对记录进行评分,然后立即转换回下游的 Scala。公共层是 SparkSQL 中的 Spark 表。 Pyspark 可以读取和写入它们,以及 Scala 中的 Spark(显然)。不确定如何使用纯开源来做到这一点,或者 Zeppelin 之类的东西是否支持这一点。 (完全披露,我为 Databricks 工作) 【参考方案1】:

也许我迟到了,但至少我可以为后代提供帮助。这实际上可以通过创建您的python udf 并将其注册到spark.udf.register("my_python_udf", foo) 来实现。你可以在这里查看文档https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.UDFRegistration.register

然后可以在 Python、Scala、Java、R 或任何语言中从 sqlContext 调用此函数,因为您正在直接访问 sqlContext(其中注册了 udf)。例如,您可以调用类似

spark.sql("SELECT my_python_udf(...)").show()

优点 - 你可以从 Scala 调用你的 sklearn 模型。

缺点 - 您必须使用 sqlContext 并编写 SQL 样式查询。

我希望这会有所帮助,至少对任何未来的访问者都是如此。

【讨论】:

谢谢。看起来我们应该能够将 python zip 与主 jar 一起提交用于 spark 作业,并将这些 python zip 用作依赖项。 我认为您说的是在 Python 进程中有上下文,注册 UDF,然后在可以访问它的 JVM 中重用上下文的情况。这在 Databricks 笔记本中是可能的,但当我有一个从 spark-submit 开始的工作时则不行。

以上是关于如何在 Scala Spark 项目中使用 PySpark UDF?的主要内容,如果未能解决你的问题,请参考以下文章

Python:将熊猫数据框保存到镶木地板文件

如何在 Spark/Scala 中使用 countDistinct?

如何使用scala+spark读写hbase?

如何在 Windows 中使用 Scala 将 Cassandra 与 Spark 连接起来

如何使用 Scala 在 Spark 中进行滑动窗口排名?

如何在窗口 scala/spark 中使用 partitionBy 函数