从 Scala Spark 代码调用 Pyspark 脚本

Posted

技术标签:

【中文标题】从 Scala Spark 代码调用 Pyspark 脚本【英文标题】:Invoking Pyspark script from Scala Spark Code 【发布时间】:2021-08-12 19:27:40 【问题描述】:

我有一个 Scala Spark 应用程序并想调用 pySpark/python (pyspark_script.py) 进行进一步处理。

有多种资源可以在 Python 中使用 Java/Scala 代码,但我正在寻找 scala->Pyspark

我探索了用于 Scala/Java 的 Jython 以包含 Python 代码,如下所示:

PythonInterpreter.initialize(System.getProperties, properties, sysArgs)
val pi = new PythonInterpreter()
pi.execfile("path/to/pyscript/mypysparkscript.py")

我看到错误提示:“ImportError: No module named pyspark”

有什么方法可以让 Scala spark 使用相同的 sparkContext/session 与 PYSpark 对话?

【问题讨论】:

【参考方案1】:

您可以在 scala 中使用 process 对象运行 shell 命令。

// Spark codes goes here .....
// Call pyspark code 
import sys.process._
"python3 /path/to/python/file.py.!!

要使用相同的会话,请在 python 文件中添加以下行。

spark = SparkSession.builder.getOrCreate()

你也可以使用getActiveSession()方法。

注意:确保您安装了 pyspark 模块。 你可以使用pip3 install pyspark 命令来做到这一点。

【讨论】:

感谢您提出这种方法,但是使用这种方法,当我运行脚本时,我看到错误“找不到 jdbc 驱动程序的类”。在使用 pyspark 运行时,我能够指向罐子。我试过了,在 scala 中也添加了 jar 和依赖项,但没有运气。 你是否在 $SPARK_HOME/jars 文件夹中添加了 jdbc jar? 这有帮助,我确实添加了 jars 文件夹。谢谢!另一个障碍是“py4j.protocol.Py4JJavaError: An error occurred while calling o405.javaToPython.: java.lang.IllegalArgumentException”。我在 Intellij 上运行 java 8 版本 请用示例代码更新您的问题并标记您遇到错误的行。 在我清除了 intellJ 上的环境变量后,它起作用了。感谢您的帮助!

以上是关于从 Scala Spark 代码调用 Pyspark 脚本的主要内容,如果未能解决你的问题,请参考以下文章

使用 Scala 从 Spark 的 withColumn 中调用 udf 时出错

如何使用反射从scala调用spark UDF?

Scala Spark - 调用 createDataFrame 时获取重载方法

从 Spark 连接到 BigTable 时出现 Jetty ALPN/NPN 异常,scala 代码

通过python扩展spark mllib 算法包(e.g.基于spark使用孤立森林进行异常检测)

Spark:scala - 如何将集合从 RDD 转换为另一个 RDD