scala中怎样调用unapplyseq

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了scala中怎样调用unapplyseq相关的知识,希望对你有一定的参考价值。

参考技术A scala里的object一般特指的是伴生对象,可以通过对象名直接调用其中的成员,类似Java中的static成员,如果不在当前作用域,需要import。

从任务中调用 Java/Scala 函数

【中文标题】从任务中调用 Java/Scala 函数【英文标题】:Calling Java/Scala function from a task 【发布时间】:2015-10-19 12:18:22 【问题描述】:

背景

我最初的问题是为什么在 map 函数中使用 DecisionTreeModel.predict 会引发异常? 并且与 How to generate tuples of (original lable, predicted label) on Spark with MLlib? 有关

当我们使用 Scala API a recommended way 获得对 RDD[LabeledPoint] 的预测时,使用 DecisionTreeModel 是简单地映射到 RDD

val labelAndPreds = testData.map  point =>
  val prediction = model.predict(point.features)
  (point.label, prediction)

不幸的是,PySpark 中的类似方法效果不佳:

labelsAndPredictions = testData.map(
    lambda lp: (lp.label, model.predict(lp.features))
labelsAndPredictions.first()

异常:您似乎正试图从广播变量、操作或转换中引用 SparkContext。 SparkContext 只能在驱动程序上使用,不能在它在工作人员上运行的代码中使用。如需更多信息,请参阅SPARK-5063。

而不是 official documentation 推荐这样的东西:

predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)

那么这里发生了什么?这里没有广播变量,Scala API 定义predict 如下:

/**
 * Predict values for a single data point using the model trained.
 *
 * @param features array representing a single data point
 * @return Double prediction from the trained model
 */
def predict(features: Vector): Double = 
  topNode.predict(features)


/**
 * Predict values for the given data set using the model trained.
 *
 * @param features RDD representing data points to be predicted
 * @return RDD of predictions for each of the given data points
 */
def predict(features: RDD[Vector]): RDD[Double] = 
  features.map(x => predict(x))

所以至少乍一看,从动作或转换调用不是问题,因为预测似乎是一种本地操作。

说明

经过一番挖掘,我发现问题的根源是从DecisionTreeModel.predict 调用的JavaModelWrapper.call 方法。调用Java函数需要accessSparkContext

callJavaFunc(self._sc, getattr(self._java_model, name), *a)

问题

DecisionTreeModel.predict 的情况下,有一个推荐的解决方法,并且所有必需的代码都已经是 Scala API 的一部分,但是一般来说有什么优雅的方法来处理这样的问题吗?

目前只有我能想到的比较重量级的解决方案:

通过隐式转换扩展 Spark 类或添加某种包装器,将所有内容推送到 JVM 直接使用 Py4j 网关

【问题讨论】:

这部分是正确的。我在将 Scala 中的相同代码实现放到 Python 中以用于决策树时遇到了同样的麻烦,并引发了相同的广播问题,因此不得不使用 .zip 函数将标签组合回来。谢谢你的解释! 【参考方案1】:

使用默认 Py4J 网关进行通信是不可能的。要了解为什么我们必须查看 PySpark 内部文档 [1] 中的下图:

由于 Py4J 网关在驱动程序上运行,因此通过套接字与 JVM 工作人员通信的 Python 解释器无法访问它(参见例如 PythonRDD / rdd.py)。

理论上可以为每个工作人员创建一个单独的 Py4J 网关,但实际上它不太可能有用。忽略可靠性等问题 Py4J 根本不是为执行数据密集型任务而设计的。

有什么解决方法吗?

    使用Spark SQL Data Sources API 包装JVM 代码。

    优点:受支持,高级别的,不需要访问内部 PySpark API

    缺点:相对冗长且没有很好的文档记录,主要限于输入数据

    使用 Scala UDF 对 DataFrame 进行操作。

    优点:易于实现(请参阅Spark: How to map Python with Scala or Java User Defined Functions?),如果数据已经存储在 DataFrame 中,则无需在 Python 和 Scala 之间进行数据转换,对 Py4J 的访问最少

    缺点:需要访问 Py4J 网关和内部方法,仅限于 Spark SQL,难以调试,不支持

    以类似于在 MLlib 中完成的方式创建高级 Scala 接口。

    优点:灵活,能够执行任意复杂代码。它可以直接在 RDD 上使用(例如参见 MLlib model wrappers)或使用 DataFrames(参见 How to use a Scala class inside Pyspark)。后一种解决方案似乎更友好,因为所有 ser-de 细节都已由现有 API 处理。

    缺点:低级,需要数据转换,和UDF一样需要访问Py4J和内部API,不支持

    一些基本的例子可以在Transforming PySpark RDD with Scala找到

    使用外部工作流管理工具在 Python 和 Scala / Java 作业之间切换并将数据传递到 DFS。

    优点:易于实施,对代码本身的更改最少

    缺点:读取/写入数据的成本 (Alluxio?)

    使用共享的SQLContext(参见例如Apache Zeppelin 或Livy)使用已注册的临时表在来宾语言之间传递数据。

    优点:非常适合交互式分析

    缺点:对于批处理作业(Zeppelin)来说不是很多,或者可能需要额外的编排(Livy)


    约书亚·罗森。 (2014 年 8 月 4 日)PySpark Internals。取自https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals

【讨论】:

以上是关于scala中怎样调用unapplyseq的主要内容,如果未能解决你的问题,请参考以下文章

怎样在scala正则表达式提取器中使用小括号

scala中的传名调用

scala在Java中调用scala方法

scala在Java中调用scala方法

Scala方法调用中的花括号[重复]

在 Scala 中调用其他对象的主要方法