如何在 Pyspark 中使用 Scala 函数?
Posted
技术标签:
【中文标题】如何在 Pyspark 中使用 Scala 函数?【英文标题】:How to use a Scala function inside Pyspark? 【发布时间】:2020-12-23 19:50:51 【问题描述】:我一直在寻找是否有任何方法可以在 Pyspark 中使用 Scala 函数,但我没有找到任何有关此主题的文档或指南。
我的目标是使用之前人们定义的scala函数appendPrvlngFields隐式函数。然后我想在python环境中使用这个函数而不重新定义它,但是通过一些类型的方式,比如注册scala函数
假设我在 Scala 中创建了一个使用用户定义库的简单对象,例如:
%scala
package com.Example
import org.library.DataFrameImplicits._
import org.apache.spark.sql.DataFrame
object ScalaFunction
def appendPrvlngFields(df: DataFrame,
otherDF: DataFrame,
colsToAppend: List[(String)] = List[(String)](),
mapColName: Option[String] = None,
partitionByCols: List[(String)],
sort: List[(String)],
sortBfirst: Boolean = false,
subsequent: Boolean = false,
existingPartitionsOnly: Boolean = false,
otherDFPrefix: String = "prvlg",
enforceLowerCase: Boolean = false
): DataFrame =
return df.appendPrvlngFields(otherDF,
colsToAppend,
mapColName,
partitionByCols,
sort,
sortBfirst,
subsequent,
existingPartitionsOnly,
otherDFPrefix,
enforceLowerCase
)
然后在python环境中,我通过定义这个函数来调用函数appendPrvlngFields:
def pyAppendPrvlngFields(df: DataFrame,
otherDF: DataFrame,
colsToAppend: list,
partitionByCols: list,
sort: list,
mapColName = None,
sortBfirst = False,
subsequent = False,
existingPartitionsOnly = False,
otherDFPrefix = "prvlg",
enforceLowerCase = False) -> DataFrame:
return(DataFrame(sc._jvm.com.SRMG.ScalaPySpark.appendPrvlngFields(df._jdf,
otherDF._jdf,
colsToAppend,
mapColName,
partitionByCols,
sort,sortBfirst,
subsequent),
sqlContext))
我知道我需要将 df 转换为 df._jdf,但是如何将列表、字符串、Option、Boolean 转换为 java 类型?
【问题讨论】:
我认为与_jdf
之类的东西混在一起可能不合适 - 请参阅此链接以了解您的用例:spark.apache.org/docs/latest/api/python/…
这能回答你的问题吗? How to call scala from python?
no... 我想我现在缺少的一步是如何将列表、字符串、选项、布尔值转换为 java 类型,就像我将 df 转换为 df._jdf 一样跨度>
【参考方案1】:
首先,您将 ScalaUDF 代码分成单独的项目并创建 jar 文件。 接下来,将其传递给 python。 这是怎么做的。
# create the jar using SBT
sbt clean assembly
# Pass the jar to the PySpark session
pyspark --jars [path/to/jar/x.jar]
参考:https://medium.com/wbaa/using-scala-udfs-in-pyspark-b70033dd69b9
【讨论】:
您好,感谢您提供这个解决方案,但是我没有从这个参考网站找到好的解决方案。以上是关于如何在 Pyspark 中使用 Scala 函数?的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Databricks 的 PySpark 中使用在 Scala 中创建的 DataFrame
如何在 AWS EMR 中一起添加 2 个(pyspark、scala)步骤?
如何在 Scala Spark 项目中使用 PySpark UDF?