scala中怎样调用unapplyseq
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了scala中怎样调用unapplyseq相关的知识,希望对你有一定的参考价值。
参考技术A scala里的object一般特指的是伴生对象,可以通过对象名直接调用其中的成员,类似Java中的static成员,如果不在当前作用域,需要import。从任务中调用 Java/Scala 函数
【中文标题】从任务中调用 Java/Scala 函数【英文标题】:Calling Java/Scala function from a task 【发布时间】:2015-10-19 12:18:22 【问题描述】:背景
我最初的问题是为什么在 map 函数中使用 DecisionTreeModel.predict
会引发异常? 并且与 How to generate tuples of (original lable, predicted label) on Spark with MLlib? 有关
当我们使用 Scala API a recommended way 获得对 RDD[LabeledPoint]
的预测时,使用 DecisionTreeModel
是简单地映射到 RDD
:
val labelAndPreds = testData.map point =>
val prediction = model.predict(point.features)
(point.label, prediction)
不幸的是,PySpark 中的类似方法效果不佳:
labelsAndPredictions = testData.map(
lambda lp: (lp.label, model.predict(lp.features))
labelsAndPredictions.first()
异常:您似乎正试图从广播变量、操作或转换中引用 SparkContext。 SparkContext 只能在驱动程序上使用,不能在它在工作人员上运行的代码中使用。如需更多信息,请参阅SPARK-5063。
而不是 official documentation 推荐这样的东西:
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
那么这里发生了什么?这里没有广播变量,Scala API 定义predict
如下:
/**
* Predict values for a single data point using the model trained.
*
* @param features array representing a single data point
* @return Double prediction from the trained model
*/
def predict(features: Vector): Double =
topNode.predict(features)
/**
* Predict values for the given data set using the model trained.
*
* @param features RDD representing data points to be predicted
* @return RDD of predictions for each of the given data points
*/
def predict(features: RDD[Vector]): RDD[Double] =
features.map(x => predict(x))
所以至少乍一看,从动作或转换调用不是问题,因为预测似乎是一种本地操作。
说明
经过一番挖掘,我发现问题的根源是从DecisionTreeModel.predict 调用的JavaModelWrapper.call
方法。调用Java函数需要accessSparkContext
:
callJavaFunc(self._sc, getattr(self._java_model, name), *a)
问题
在DecisionTreeModel.predict
的情况下,有一个推荐的解决方法,并且所有必需的代码都已经是 Scala API 的一部分,但是一般来说有什么优雅的方法来处理这样的问题吗?
目前只有我能想到的比较重量级的解决方案:
通过隐式转换扩展 Spark 类或添加某种包装器,将所有内容推送到 JVM 直接使用 Py4j 网关【问题讨论】:
这部分是正确的。我在将 Scala 中的相同代码实现放到 Python 中以用于决策树时遇到了同样的麻烦,并引发了相同的广播问题,因此不得不使用 .zip 函数将标签组合回来。谢谢你的解释! 【参考方案1】:使用默认 Py4J 网关进行通信是不可能的。要了解为什么我们必须查看 PySpark 内部文档 [1] 中的下图:
由于 Py4J 网关在驱动程序上运行,因此通过套接字与 JVM 工作人员通信的 Python 解释器无法访问它(参见例如 PythonRDD
/ rdd.py
)。
理论上可以为每个工作人员创建一个单独的 Py4J 网关,但实际上它不太可能有用。忽略可靠性等问题 Py4J 根本不是为执行数据密集型任务而设计的。
有什么解决方法吗?
使用Spark SQL Data Sources API 包装JVM 代码。
优点:受支持,高级别的,不需要访问内部 PySpark API
缺点:相对冗长且没有很好的文档记录,主要限于输入数据
使用 Scala UDF 对 DataFrame 进行操作。
优点:易于实现(请参阅Spark: How to map Python with Scala or Java User Defined Functions?),如果数据已经存储在 DataFrame 中,则无需在 Python 和 Scala 之间进行数据转换,对 Py4J 的访问最少
缺点:需要访问 Py4J 网关和内部方法,仅限于 Spark SQL,难以调试,不支持
以类似于在 MLlib 中完成的方式创建高级 Scala 接口。
优点:灵活,能够执行任意复杂代码。它可以直接在 RDD 上使用(例如参见 MLlib model wrappers)或使用 DataFrames
(参见 How to use a Scala class inside Pyspark)。后一种解决方案似乎更友好,因为所有 ser-de 细节都已由现有 API 处理。
缺点:低级,需要数据转换,和UDF一样需要访问Py4J和内部API,不支持
一些基本的例子可以在Transforming PySpark RDD with Scala找到
使用外部工作流管理工具在 Python 和 Scala / Java 作业之间切换并将数据传递到 DFS。
优点:易于实施,对代码本身的更改最少
缺点:读取/写入数据的成本 (Alluxio?)
使用共享的SQLContext
(参见例如Apache Zeppelin 或Livy)使用已注册的临时表在来宾语言之间传递数据。
优点:非常适合交互式分析
缺点:对于批处理作业(Zeppelin)来说不是很多,或者可能需要额外的编排(Livy)
-
约书亚·罗森。 (2014 年 8 月 4 日)PySpark Internals。取自https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals
【讨论】:
以上是关于scala中怎样调用unapplyseq的主要内容,如果未能解决你的问题,请参考以下文章