在 scala 中导入 spark.implicits._
Posted
技术标签:
【中文标题】在 scala 中导入 spark.implicits._【英文标题】:Importing spark.implicits._ in scala 【发布时间】:2016-08-25 17:17:11 【问题描述】:我正在尝试导入 spark.implicits._ 显然,这是 scala 类中的一个对象。 当我以这样的方法导入它时:
def f() =
val spark = SparkSession()....
import spark.implicits._
它工作正常,但是我正在编写一个测试类,我想让这个导入可用于所有测试 我试过了:
class SomeSpec extends FlatSpec with BeforeAndAfter
var spark:SparkSession = _
//This won't compile
import spark.implicits._
before
spark = SparkSession()....
//This won't either
import spark.implicits._
"a test" should "run" in
//Even this won't compile (although it already looks bad here)
import spark.implicits._
//This was the only way i could make it work
val spark = this.spark
import spark.implicits._
这不仅看起来很糟糕,我不想每次测试都这样做 什么是“正确”的做法?
【问题讨论】:
为什么不在文件顶部?通常所有的进口都在那里 也试过了,忘了写代码,但显然不可能,因为“implicits”是“spark”类中的一个对象,需要先实例化 【参考方案1】:您可以执行类似于 Spark 测试套件中的操作。例如这会起作用(灵感来自SQLTestData
):
class SomeSpec extends FlatSpec with BeforeAndAfter self =>
var spark: SparkSession = _
private object testImplicits extends SQLImplicits
protected override def _sqlContext: SQLContext = self.spark.sqlContext
import testImplicits._
before
spark = SparkSession.builder().master("local").getOrCreate()
"a test" should "run" in
// implicits are working
val df = spark.sparkContext.parallelize(List(1,2,3)).toDF()
您也可以直接使用SharedSQLContext
之类的东西,它提供testImplicits: SQLImplicits
,即:
class SomeSpec extends FlatSpec with SharedSQLContext
import testImplicits._
// ...
【讨论】:
【参考方案2】:我认为SparkSession.scala文件中的GitHub代码可以给你一个很好的提示:
/**
* :: Experimental ::
* (Scala-specific) Implicit methods available in Scala for converting
* common Scala objects into [[DataFrame]]s.
*
*
* val sparkSession = SparkSession.builder.getOrCreate()
* import sparkSession.implicits._
*
*
* @since 2.0.0
*/
@Experimental
object implicits extends SQLImplicits with Serializable
protected override def _sqlContext: SQLContext = SparkSession.this.sqlContext
“spark.implicits._”中的“spark”就是我们创建的 sparkSession 对象。
Here 是另一个参考!
【讨论】:
【参考方案3】:我只是实例化 SparkSession 并在使用之前,“导入隐式”。
@transient lazy val spark = SparkSession
.builder()
.master("spark://master:7777")
.getOrCreate()
import spark.implicits._
【讨论】:
【参考方案4】:感谢@bluenote10 提供有用的答案,我们可以再次简化它,例如没有帮助对象testImplicits
:
private object testImplicits extends SQLImplicits
protected override def _sqlContext: SQLContext = self.spark.sqlContext
通过以下方式:
trait SharedSparkSession extends BeforeAndAfterAll self: Suite =>
/**
* The SparkSession instance to use for all tests in one suite.
*/
private var spark: SparkSession = _
/**
* Returns local running SparkSession instance.
* @return SparkSession instance `spark`
*/
protected def sparkSession: SparkSession = spark
/**
* A helper implicit value that allows us to import SQL implicits.
*/
protected lazy val sqlImplicits: SQLImplicits = self.sparkSession.implicits
/**
* Starts a new local spark session for tests.
*/
protected def startSparkSession(): Unit =
if (spark == null)
spark = SparkSession
.builder()
.master("local[2]")
.appName("Testing Spark Session")
.getOrCreate()
/**
* Stops existing local spark session.
*/
protected def stopSparkSession(): Unit =
if (spark != null)
spark.stop()
spark = null
/**
* Runs before all tests and starts spark session.
*/
override def beforeAll(): Unit =
startSparkSession()
super.beforeAll()
/**
* Runs after all tests and stops existing spark session.
*/
override def afterAll(): Unit =
super.afterAll()
stopSparkSession()
最后我们可以使用SharedSparkSession
进行单元测试并导入sqlImplicits
:
class SomeSuite extends FunSuite with SharedSparkSession
// We can import sql implicits
import sqlImplicits._
// We can use method sparkSession which returns locally running spark session
test("some test")
val df = sparkSession.sparkContext.parallelize(List(1,2,3)).toDF()
//...
【讨论】:
【参考方案5】:好吧,我一直在每个调用的方法中重新使用现有的 SparkSession.. 通过在方法中创建本地 val -
val spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession.active
然后
import spark.implicits._
【讨论】:
【参考方案6】:创建一个 sparksession 对象并在您想要将任何 rdd 转换为数据集之前使用 spark.implicit._。
像这样:
val spark = SparkSession
.builder
.appName("SparkSQL")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val someDataset = someRdd.toDS
【讨论】:
【参考方案7】:这与在 scala 中使用 val vs var 有关系。
例如以下不起作用
var sparkSession = new SparkSession.Builder().appName("my-app").config(sparkConf).getOrCreate
import sparkSession.implicits._
但以下确实
sparkSession = new SparkSession.Builder().appName("my-app").config(sparkConf).getOrCreate
val sparkSessionConst = sparkSession
import sparkSessionConst.implicits._
我对 scala 非常熟悉,所以我只能猜测原因与为什么我们只能在 java 的闭包中使用声明为 final 的外部变量相同。
【讨论】:
【参考方案8】:我知道这是旧帖子,但只是想分享我对此的看法时间。所以它不允许在其上导入隐含,因为它可能会导致歧义,因为在后期可以更改它,因为在 val 的情况下它不一样
【讨论】:
以上是关于在 scala 中导入 spark.implicits._的主要内容,如果未能解决你的问题,请参考以下文章