为啥 spark 作业服务器中不支持带有 namedObject 的 sparkSession?

Posted

技术标签:

【中文标题】为啥 spark 作业服务器中不支持带有 namedObject 的 sparkSession?【英文标题】:Why there is no support for sparkSession with namedObject in spark job server?为什么 spark 作业服务器中不支持带有 namedObject 的 sparkSession? 【发布时间】:2017-10-09 06:10:27 【问题描述】:

我正在尝试使用 spark 作业服务器 API(适用于 spark 2.2.0)构建应用程序。但是我发现sparkSession不支持namedObject。 我的样子:

import com.typesafe.config.Config
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.scalactic._
import spark.jobserver.NamedDataFrame, NamedObjectSupport, SparkSessionJob
import spark.jobserver.api.JobEnvironment, SingleProblem, ValidationProblem

import scala.util.Try

object word1 extends SparkSessionJob with NamedObjectSupport 
  type JobData = Seq[String]
  type JobOutput = String

def runJob(sparkSession: SparkSession, runtime: JobEnvironment, data: JobData): JobOutput =

  val df = sparkSession.sparkContext.parallelize(data)
  val ndf = NamedDataFrame(df, true, StorageLevel.MEMORY_ONLY)
  this.namedObjects.update("df1", ndf)
  this.namedObjects.getNames().toString



 def validate(sparkSession: SparkSession, runtime: JobEnvironment, config: Config):
    JobData Or Every[ValidationProblem] = 
Try(config.getString("input.string").split(" ").toSeq)
  .map(words => Good(words))
  .getOrElse(Bad(One(SingleProblem("No input.string param"))))
     


但是 this.namedObjects.update() 行有错误。我认为他们不支持namedObject。当使用 SparkJob 编译相同的代码时:

object word1 extends SparkJob with NamedObjectSupport 

是否支持带有 sparksession 的 namedObjects ?如果没有,那么持久化数据帧/数据集的解决方法是什么?

【问题讨论】:

【参考方案1】:

我想通了。这是我这边的愚蠢错误。来自https://github.com/spark-jobserver/spark-jobserver/blob/master/job-server-api/src/main/scala/spark/jobserver/NamedObjectSupport.scala#L138。正如它所说:

// 由于 api.SparkJobBase 中的 JobEnvironment,不再需要 NamedObjectSupport。也是 // 自动导入旧的 spark.jobserver.SparkJobBase 以实现兼容性。

@Deprecated
trait NamedObjectSupport

因此,要访问这些功能,我们需要将此代码修改为:

import com.typesafe.config.Config
import org.apache.spark.sql.SparkSession 
import org.apache.spark.storage.StorageLevel
import org.scalactic._
import spark.jobserver.NamedDataFrame, NamedObjectSupport, SparkSessionJob
import spark.jobserver.api.JobEnvironment, SingleProblem, ValidationProblem

import scala.util.Try

object word1 extends SparkSessionJob with NamedObjectSupport 
  type JobData = Seq[String]
  type JobOutput = String

def runJob(sparkSession: SparkSession, runtime: JobEnvironment, data: JobData): JobOutput =
  
   val df = sparkSession.sparkContext.parallelize(data)
   val ndf = NamedDataFrame(df, true, StorageLevel.MEMORY_ONLY)
   runtime.namedObjects.update("df1", ndf)
   runtime.namedObjects.getNames().toString
  


 def validate(sparkSession: SparkSession, runtime: JobEnvironment, config: Config):
    JobData Or Every[ValidationProblem] = 
 Try(config.getString("input.string").split(" ").toSeq)
   .map(words => Good(words))
   .getOrElse(Bad(One(SingleProblem("No input.string param"))))
    

 

【讨论】:

以上是关于为啥 spark 作业服务器中不支持带有 namedObject 的 sparkSession?的主要内容,如果未能解决你的问题,请参考以下文章

为啥 spark.read.parquet() 运行 2 个作业?

Spark SQL:为啥一个查询有两个作业?

为啥只有一个 spark 作业只使用一个执行器运行?

带有 partitionBy 的 Spark DataFrame saveAsTable 在 HDFS 中不创建 ORC 文件

为啥“构建配置...”在带有 QT eclipse 插件的 Eclipse CDT 中不可用?

为啥外部表在 netezza 中不起作用?