如何在 Pyspark 中使用 Scala 函数?

Posted

技术标签:

【中文标题】如何在 Pyspark 中使用 Scala 函数?【英文标题】:How to use a Scala function inside Pyspark? 【发布时间】:2020-12-23 19:50:51 【问题描述】:

我一直在寻找是否有任何方法可以在 Pyspark 中使用 Scala 函数,但我没有找到任何有关此主题的文档或指南。

我的目标是使用之前人们定义的scala函数appendPrvlngFields隐式函数。然后我想在python环境中使用这个函数而不重新定义它,但是通过一些类型的方式,比如注册scala函数

假设我在 Scala 中创建了一个使用用户定义库的简单对象,例如:

%scala 
package com.Example
import org.library.DataFrameImplicits._
import org.apache.spark.sql.DataFrame


object ScalaFunction 

    def appendPrvlngFields(df: DataFrame,
                          otherDF: DataFrame,
                          colsToAppend: List[(String)] = List[(String)](),
                          mapColName: Option[String] = None,
                          partitionByCols: List[(String)],
                          sort: List[(String)],
                          sortBfirst: Boolean = false,
                          subsequent: Boolean = false,
                          existingPartitionsOnly: Boolean = false,
                          otherDFPrefix: String = "prvlg",
                          enforceLowerCase: Boolean = false
                          ): DataFrame = 
      
                          return df.appendPrvlngFields(otherDF,
                                                      colsToAppend,
                                                      mapColName,
                                                      partitionByCols,
                                                      sort,
                                                      sortBfirst,
                                                      subsequent,
                                                      existingPartitionsOnly,
                                                      otherDFPrefix,
                                                      enforceLowerCase
                                                      )
                          
                       

然后在python环境中,我通过定义这个函数来调用函数appendPrvlngFields:

def pyAppendPrvlngFields(df: DataFrame, 
                         otherDF: DataFrame, 
                         colsToAppend: list,  
                         partitionByCols: list, 
                         sort: list, 
                         mapColName = None,
                         sortBfirst = False, 
                         subsequent = False, 
                         existingPartitionsOnly = False,
                         otherDFPrefix = "prvlg",
                         enforceLowerCase = False) -> DataFrame:
  
  return(DataFrame(sc._jvm.com.SRMG.ScalaPySpark.appendPrvlngFields(df._jdf,
                                                     otherDF._jdf,
                                                     colsToAppend,  
                                                     mapColName,
                                                     partitionByCols, 
                                                     sort,sortBfirst,
                                                     subsequent),
    sqlContext))

我知道我需要将 df 转换为 df._jdf,但是如何将列表、字符串、Option、Boolean 转换为 java 类型?

【问题讨论】:

我认为与_jdf 之类的东西混在一起可能不合适 - 请参阅此链接以了解您的用例:spark.apache.org/docs/latest/api/python/… 这能回答你的问题吗? How to call scala from python? no... 我想我现在缺少的一步是如何将列表、字符串、选项、布尔值转换为 java 类型,就像我将 df 转换为 df._jdf 一样跨度> 【参考方案1】:

首先,您将 ScalaUDF 代码分成单独的项目并创建 jar 文件。 接下来,将其传递给 python。 这是怎么做的。

# create the jar using SBT
sbt clean assembly

# Pass the jar to the PySpark session
pyspark --jars [path/to/jar/x.jar]

参考:https://medium.com/wbaa/using-scala-udfs-in-pyspark-b70033dd69b9

【讨论】:

您好,感谢您提供这个解决方案,但是我没有从这个参考网站找到好的解决方案。

以上是关于如何在 Pyspark 中使用 Scala 函数?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Databricks 的 PySpark 中使用在 Scala 中创建的 DataFrame

udf(用户定义函数)如何在 pyspark 中工作?

如何在 AWS EMR 中一起添加 2 个(pyspark、scala)步骤?

如何在 Scala Spark 项目中使用 PySpark UDF?

如何使用 Python 或 Pyspark 或 scala 在数据块中获取笔记本的作业运行结果日志

从 Scala 将 UDF 注册到 SqlContext 以在 PySpark 中使用