在 scala 中导入 spark.implicits._

Posted

技术标签:

【中文标题】在 scala 中导入 spark.implicits._【英文标题】:Importing spark.implicits._ in scala 【发布时间】:2016-08-25 17:17:11 【问题描述】:

我正在尝试导入 spark.implicits._ 显然,这是 scala 类中的一个对象。 当我以这样的方法导入它时:

def f() = 
  val spark = SparkSession()....
  import spark.implicits._

它工作正常,但是我正在编写一个测试类,我想让这个导入可用于所有测试 我试过了:

class SomeSpec extends FlatSpec with BeforeAndAfter 
  var spark:SparkSession = _

  //This won't compile
  import spark.implicits._

  before 
    spark = SparkSession()....
    //This won't either
    import spark.implicits._
  

  "a test" should "run" in 
    //Even this won't compile (although it already looks bad here)
    import spark.implicits._

    //This was the only way i could make it work
    val spark = this.spark
    import spark.implicits._
  

这不仅看起来很糟糕,我不想每次测试都这样做 什么是“正确”的做法?

【问题讨论】:

为什么不在文件顶部?通常所有的进口都在那里 也试过了,忘了写代码,但显然不可能,因为“implicits”是“spark”类中的一个对象,需要先实例化 【参考方案1】:

您可以执行类似于 Spark 测试套件中的操作。例如这会起作用(灵感来自SQLTestData):

class SomeSpec extends FlatSpec with BeforeAndAfter  self =>

  var spark: SparkSession = _

  private object testImplicits extends SQLImplicits 
    protected override def _sqlContext: SQLContext = self.spark.sqlContext
  
  import testImplicits._

  before 
    spark = SparkSession.builder().master("local").getOrCreate()
  

  "a test" should "run" in 
    // implicits are working
    val df = spark.sparkContext.parallelize(List(1,2,3)).toDF()
  

您也可以直接使用SharedSQLContext 之类的东西,它提供testImplicits: SQLImplicits,即:

class SomeSpec extends FlatSpec with SharedSQLContext 
  import testImplicits._

  // ...


【讨论】:

【参考方案2】:

我认为SparkSession.scala文件中的GitHub代码可以给你一个很好的提示:

      /**
       * :: Experimental ::
       * (Scala-specific) Implicit methods available in Scala for converting
       * common Scala objects into [[DataFrame]]s.
       *
       * 
       *   val sparkSession = SparkSession.builder.getOrCreate()
       *   import sparkSession.implicits._
       * 
       *
       * @since 2.0.0
       */
      @Experimental
      object implicits extends SQLImplicits with Serializable 
        protected override def _sqlContext: SQLContext = SparkSession.this.sqlContext
      

“spark.implicits._”中的“spark”就是我们创建的 sparkSession 对象。

Here 是另一个参考!

【讨论】:

【参考方案3】:

我只是实例化 SparkSession 并在使用之前,“导入隐式”。

@transient lazy val spark = SparkSession
  .builder()
  .master("spark://master:7777")
  .getOrCreate()

import spark.implicits._

【讨论】:

【参考方案4】:

感谢@bluenote10 提供有用的答案,我们可以再次简化它,例如没有帮助对象testImplicits

private object testImplicits extends SQLImplicits 
  protected override def _sqlContext: SQLContext = self.spark.sqlContext

通过以下方式:

trait SharedSparkSession extends BeforeAndAfterAll  self: Suite =>

  /**
   * The SparkSession instance to use for all tests in one suite.
   */
  private var spark: SparkSession = _

  /**
   * Returns local running SparkSession instance.
   * @return SparkSession instance `spark`
   */
  protected def sparkSession: SparkSession = spark

  /**
   * A helper implicit value that allows us to import SQL implicits.
   */
  protected lazy val sqlImplicits: SQLImplicits = self.sparkSession.implicits

  /**
   * Starts a new local spark session for tests.
   */
  protected def startSparkSession(): Unit = 
    if (spark == null) 
      spark = SparkSession
        .builder()
        .master("local[2]")
        .appName("Testing Spark Session")
        .getOrCreate()
    
  

  /**
   * Stops existing local spark session.
   */
  protected def stopSparkSession(): Unit = 
    if (spark != null) 
      spark.stop()
      spark = null
    
  

  /**
   * Runs before all tests and starts spark session.
   */
  override def beforeAll(): Unit = 
    startSparkSession()
    super.beforeAll()
  

  /**
   * Runs after all tests and stops existing spark session.
   */
  override def afterAll(): Unit = 
    super.afterAll()
    stopSparkSession()
  

最后我们可以使用SharedSparkSession 进行单元测试并导入sqlImplicits

class SomeSuite extends FunSuite with SharedSparkSession 
  // We can import sql implicits 
  import sqlImplicits._

  // We can use method sparkSession which returns locally running spark session
  test("some test") 
    val df = sparkSession.sparkContext.parallelize(List(1,2,3)).toDF()
    //...
  

【讨论】:

【参考方案5】:

好吧,我一直在每个调用的方法中重新使用现有的 SparkSession.. 通过在方法中创建本地 val -

val spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession.active

然后

import spark.implicits._

【讨论】:

【参考方案6】:

创建一个 sparksession 对象并在您想要将任何 rdd 转换为数据集之前使用 spark.implicit._。

像这样:

val spark = SparkSession
      .builder
      .appName("SparkSQL")
      .master("local[*]")
      .getOrCreate()

import spark.implicits._
val someDataset = someRdd.toDS

【讨论】:

【参考方案7】:

这与在 scala 中使用 val vs var 有关系。

例如以下不起作用

var sparkSession = new SparkSession.Builder().appName("my-app").config(sparkConf).getOrCreate
import sparkSession.implicits._

但以下确实

sparkSession = new SparkSession.Builder().appName("my-app").config(sparkConf).getOrCreate
val sparkSessionConst = sparkSession
import sparkSessionConst.implicits._

我对 scala 非常熟悉,所以我只能猜测原因与为什么我们只能在 java 的闭包中使用声明为 final 的外部变量相同。

【讨论】:

【参考方案8】:

我知道这是旧帖子,但只是想分享我对此的看法时间。所以它不允许在其上导入隐含,因为它可能会导致歧义,因为在后期可以更改它,因为在 val 的情况下它不一样

【讨论】:

以上是关于在 scala 中导入 spark.implicits._的主要内容,如果未能解决你的问题,请参考以下文章

在 Ammonite (scala) 中重新导入脚本

是否可以在 Scala 3 中导出***不透明类型?

在 index.html 中导入 css 和在 Angular 5 中导入 styleUrls 的区别

在 NextJS 中导入 SVG

在 python 类中导入模块

如何在vue.js项目的main.js文件中导入js类并在所有组件中使用它而不是在每个组件中导入?