可从 PySpark/Python 调用的 Spark(2.3+)Java 函数 [重复]
Posted
技术标签:
【中文标题】可从 PySpark/Python 调用的 Spark(2.3+)Java 函数 [重复]【英文标题】:Spark (2.3+) Java functions callable from PySpark/Python [duplicate] 【发布时间】:2018-08-11 07:13:21 【问题描述】:关于 Spark Doc 2.3:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SQLContext.registerJavaFunction
registerJavaFunction(name, javaClassName, returnType=None)[源代码]
将 Java 用户定义函数注册为 SQL 函数。
除了名称和函数本身之外,还可以>可选地指定返回类型。当没有指定返回类型时,我们将通过反射推断它。
参数:
name – 用户定义函数的名称
javaClassName – java 类的完全限定名
returnType – 注册的 Java 函数的返回类型。该值可以是 pyspark.sql.types.DataType 对象或 DDL 格式的类型字符串。
我的问题:
我想要一个包含大量 UDF 的库,用于 Spark 2.3+,全部用 Java 编写,并且都可以从 PySpark/Python 访问。
阅读我在上面链接的文档,似乎在类和 Java UDF 函数之间存在 一对一的映射(可从 PySpark 中的 Spark-SQL 调用)。 因此,如果我说 10 个 Java UDF 函数,那么我需要创建 10 个公共 Java 类,每个类有 1 个 UDF,以使它们可以从 PySpark/SQL 调用。
这是正确的吗?
我可以创建 1 个公共 Java 类并在 1 个类中放置许多不同的 UDF,并使所有 UDF 都可以从 Spark 2.3 中的 PySpark 调用吗?
这篇文章不提供任何 Java 示例代码来帮助解决我的问题。看起来一切都在 Scala 中。我想要这一切都用Java。 我是否需要扩展一个类或实现接口才能在 Java 中做到这一点? 任何指向要从 PySpark-SQL 调用的示例 Java 代码的链接都将不胜感激。
Spark: How to map Python with Scala or Java User Defined Functions?
【问题讨论】:
根本不是重复的,我问的是Java,链接回复中的所有代码都在Scala中。 【参考方案1】:因此,如果我说 10 个 Java UDF 函数,那么我需要创建 10 个公共 Java 类,每个类有 1 个 UDF,以使它们可以从 PySpark/SQL 调用。
这对吗?
是的,没错。但是你可以:
使用UserDefinedFunction
并按照Spark: How to map Python with Scala or Java User Defined Functions?所示进行接口
使用UDFRegistration.register
注册名为udfs
,然后通过Py4j为每个注册函数调用org.apache.spark.sql.functions.callUDF
。
【讨论】:
在发布我的问题之前,我已经阅读了您链接到的帖子。那里没有提及您的建议。请更具体。也许你可以提供一个Java的小例子?我的问题非常具体,我希望这一切都在 Java 中完成,不是 Scala。【参考方案2】:下面非常简单的 Java/Python/Pyspark 代码示例可能会对某人有所帮助,我在 Spark 2.3.1 和 Java 1.8 上运行了一个可从 Python 调用的 Java UDF。
请注意,这种方法在我看来非常麻烦,因为您需要为每个 Java UDF 单独的 Java 类。所以对于 50 个独立的 Java UDF = 50 个独立的公共 Java 类! 理想情况下,如果单个公共 Java 类可以包含多个单独的 Java UDF,所有这些都打包在一个 JAR 文件中,这将是理想的。 唉,我还是不知道该怎么做。
欢迎提出改进建议! 谢谢
// Java 8 code
package com.yourdomain.sparkUDF;
import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
import org.apache.spark.sql.api.java.UDF0;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;
public final class JavaUDFExample
implements UDF0<String>
@Override
public String call() throws Exception
return java.util.UUID.randomUUID().toString();
// end of Java code
// make a jar file from above including all referenced jar Spark libraries
# PySPark Python code below
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql import HiveContext
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
spark = SparkSession.builder.appName("Java UDF Example").getOrCreate()
df = spark.read.json(r"c:\temp\temperatures.json")
df.createOrReplaceTempView("citytemps")
spark.udf.registerJavaFunction("getGuid", "com.yourdomain.sparkUDF.JavaUDFExample", StringType())
spark.sql("SELECT getguid() as guid, * FROM citytemps").show()
# end of PySpark-SQL Python code
DOS shell script to run on local Spark:
spark-submit --jars c:\dir\sparkjavaudf.jar python-udf-example.py
【讨论】:
以上是关于可从 PySpark/Python 调用的 Spark(2.3+)Java 函数 [重复]的主要内容,如果未能解决你的问题,请参考以下文章
使用 react-aad-msal 缺少范围从 react SPA 调用 web api
ASP.Net Core 6,SPA,端点 - 我无法从 http get 请求调用控制器
延迟加载模块中的 Angular single-spa 延迟加载路由调用未定义的 webpack 错误
来自 Spark 安装的 Pyspark VS Pyspark python 包