如何在 Scala Spark 项目中使用 PySpark UDF?
Posted
技术标签:
【中文标题】如何在 Scala Spark 项目中使用 PySpark UDF?【英文标题】:How to use a PySpark UDF in a Scala Spark project? 【发布时间】:2019-01-25 09:28:24 【问题描述】:很多人(1、2、3)讨论了在 PySpark 应用程序中使用 Scala UDF,通常是出于性能原因。我对相反的情况感兴趣 - 在 Scala Spark 项目中使用 python UDF。
我特别感兴趣的是使用 sklearn(和 MLFlow)构建模型,然后将其有效地应用于 Spark 流作业中的记录。我知道我还可以在 REST API 和 make calls to that API in the Spark streaming application 后面托管 Python 模型 mapPartitions
,但是管理该任务的并发性并为托管模型设置 API 并不是我非常兴奋的事情。
如果没有像 Py4J 这样的太多自定义开发,这可能吗?这只是个坏主意吗?
谢谢!
【问题讨论】:
这是可能的,但绝对不支持也不简单。所以问题真的是你为什么要尝试。真的很难为这样的过程找到合理的理由。 @user6910411 感谢您的回复。我解释了问题中的用例 - 我想使用我使用 sklearn 训练的模型来评估结构化流应用程序中的各个行。 我想问题是——如果你已经想为跨语言通信付出代价,为什么不一直使用 PySpark? 在这种情况下,因为 1)python 操作将是一个更大的 Spark 作业的一小部分,我宁愿不为整个事情支付 PySpark 罚款,并且 2)我已经有一个成熟的 Scala 项目,只是想添加一点 python 而不需要重写。 不作为答案提交,但如果您使用 Databricks,您可以在同一个工作中使用 Scala 和 Python。您可以使用 sklearn 跳转到该 UDF 的 Pyspark,对记录进行评分,然后立即转换回下游的 Scala。公共层是 SparkSQL 中的 Spark 表。 Pyspark 可以读取和写入它们,以及 Scala 中的 Spark(显然)。不确定如何使用纯开源来做到这一点,或者 Zeppelin 之类的东西是否支持这一点。 (完全披露,我为 Databricks 工作) 【参考方案1】:也许我迟到了,但至少我可以为后代提供帮助。这实际上可以通过创建您的python udf
并将其注册到spark.udf.register("my_python_udf", foo)
来实现。你可以在这里查看文档https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.UDFRegistration.register
然后可以在 Python、Scala、Java、R 或任何语言中从 sqlContext
调用此函数,因为您正在直接访问 sqlContext
(其中注册了 udf
)。例如,您可以调用类似
spark.sql("SELECT my_python_udf(...)").show()
优点 - 你可以从 Scala 调用你的 sklearn
模型。
缺点 - 您必须使用 sqlContext
并编写 SQL
样式查询。
我希望这会有所帮助,至少对任何未来的访问者都是如此。
【讨论】:
谢谢。看起来我们应该能够将 python zip 与主 jar 一起提交用于 spark 作业,并将这些 python zip 用作依赖项。 我认为您说的是在 Python 进程中有上下文,注册 UDF,然后在可以访问它的 JVM 中重用上下文的情况。这在 Databricks 笔记本中是可能的,但当我有一个从 spark-submit 开始的工作时则不行。以上是关于如何在 Scala Spark 项目中使用 PySpark UDF?的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Spark/Scala 中使用 countDistinct?