教你如何在Spark Scala/Java应用中调用Python脚本
Posted 华为云开发者社区
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了教你如何在Spark Scala/Java应用中调用Python脚本相关的知识,希望对你有一定的参考价值。
摘要:本文将介绍如何在 Spark scala 程序中调用 Python 脚本,Spark java程序调用的过程也大体相同。
本文分享自华为云社区《【Spark】如何在Spark Scala/Java应用中调用Python脚本》,作者:小兔子615 。
1.PythonRunner
对于运行与 JVM 上的程序(即Scala、Java程序),Spark 提供了 PythonRunner 类。只需要调用PythonRunner 的main方法,就可以在Scala或Java程序中调用Python脚本。在实现上,PythonRunner 基于py4j ,通过构造GatewayServer实例让python程序通过本地网络socket来与JVM通信。
// Launch a Py4J gateway server for the process to connect to; this will let it see our // Java system properties and such val localhost = InetAddress.getLoopbackAddress() val gatewayServer = new py4j.GatewayServer.GatewayServerBuilder() .authToken(secret) .javaPort(0) .javaAddress(localhost) .callbackClient(py4j.GatewayServer.DEFAULT_PYTHON_PORT, localhost, secret) .build() val thread = new Thread(new Runnable() override def run(): Unit = Utils.logUncaughtExceptions gatewayServer.start() ) thread.setName("py4j-gateway-init") thread.setDaemon(true) thread.start() // Wait until the gateway server has started, so that we know which port is it bound to. // `gatewayServer.start()` will start a new thread and run the server code there, after // initializing the socket, so the thread started above will end as soon as the server is // ready to serve connections. thread.join()
在启动GatewayServer后,再通过ProcessBuilder构造子进程执行Python脚本,等待Python脚本执行完成后,根据exitCode判断是否执行成功,若执行失败则抛出异常,最后关闭gatewayServer。
// Launch Python process val builder = new ProcessBuilder((Seq(pythonExec, formattedPythonFile) ++ otherArgs).asJava) try val process = builder.start() new RedirectThread(process.getInputStream, System.out, "redirect output").start() val exitCode = process.waitFor() if (exitCode != 0) throw new SparkUserAppException(exitCode) finally gatewayServer.shutdown()
2.调用方法
2.1 调用代码
PythonRunner的main方法中需要传入三个参数:
- pythonFile:执行的python脚本
- pyFiles:需要添加到PYTHONPATH的其他python脚本
- otherArgs:传入python脚本的参数数组
val pythonFile = args(0) val pyFiles = args(1) val otherArgs = args.slice(2, args.length)
具体样例代码如下,scala样例代码:
package com.huawei.bigdata.spark.examples import org.apache.spark.deploy.PythonRunner import org.apache.spark.sql.SparkSession object RunPythonExample def main(args: Array[String]) val pyFilePath = args(0) val pyFiles = args(1) val spark = SparkSession .builder() .appName("RunPythonExample") .getOrCreate() runPython(pyFilePath, pyFiles) spark.stop() def runPython(pyFilePath: String, pyFiles :String) : Unit = val inputPath = "-i /input" val outputPath = "-o /output" PythonRunner.main(Array(pyFilePath, pyFiles, inputPath, outputPath))
python样例代码:
#!/usr/bin/env python # coding: utf-8 import sys import argparse argparser = argparse.ArgumentParser(description="ParserMainEntrance") argparser.add_argument(\'--input\', \'-i\', help="input path", default=list(), required=True) argparser.add_argument(\'--output\', \'-o\', help="output path", default=list(), required=True) arglist = argparser.parse_args() def getTargetPath(input_path, output_path): try: print("input path: ".format(input_path)) print("output path: ".format(output_path)) return True except Exception as ex: print("error with: ".format(ex)) return False if __name__ == "__main__": ret = getTargetPath(arglist.input, arglist.output) if ret: sys.exit(0) else: sys.exit(1)
2.2 运行命令
执行python脚本需要设置pythonExec,即执行python脚本所使用的执行环境。默认情况下,使用的执行器为python(Spark 2.4 及以下)或 python3 (Spark 3.0 及以上)。
//Spark 2.4.5 val sparkConf = new SparkConf() val secret = Utils.createSecret(sparkConf) val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON) .orElse(sparkConf.get(PYSPARK_PYTHON)) .orElse(sys.env.get("PYSPARK_DRIVER_PYTHON")) .orElse(sys.env.get("PYSPARK_PYTHON")) .getOrElse("python") //Spark 3.1.1 val sparkConf = new SparkConf() val secret = Utils.createSecret(sparkConf) val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON) .orElse(sparkConf.get(PYSPARK_PYTHON)) .orElse(sys.env.get("PYSPARK_DRIVER_PYTHON")) .orElse(sys.env.get("PYSPARK_PYTHON")) .getOrElse("python3")
如果要手动指定pythonExec,需要在执行前设置环境变量(无法通过spark-defaults传入)。在cluster模式下,可以通过 --conf “spark.executorEnv.PYSPARK_PYTHON=python3” --conf “spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3” 设置。driver端还可以通过export PYSPARK_PYTHON=python3 设置环境变量。
若需要上传pyhton包,可以通过 --archive python.tar.gz 的方式上传。
为了使应用能够获取到py脚本文件,还需要在启动命令中添加 --file pythonFile.py 将python脚本上传到 yarn 上。
运行命令参考如下:
spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.RunPythonExample --files /usr/local/test.py --conf "spark.executorEnv.PYSPARK_PYTHON=python3" --conf "spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3" /usr/local/test.jar test.py test.py
如果需要使用其他python环境,而非节点上已安装的,可以通过 --archives 上传python压缩包,再通过环境变量指定pythonExec,例如:
spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.RunPythonExample --files /usr/local/test.py --archives /usr/local/python.tar.gz#myPython --conf "spark.executorEnv.PYSPARK_PYTHON=myPython/bin/python3" --conf "spark.yarn.appMasterEnv.PYSPARK_PYTHON=myPython/bin/python3" /usr/local/test.jar test.py test.py
以上是关于教你如何在Spark Scala/Java应用中调用Python脚本的主要内容,如果未能解决你的问题,请参考以下文章