单元测试火花数据帧转换链接

Posted

技术标签:

【中文标题】单元测试火花数据帧转换链接【英文标题】:Unit Testing spark dataframes transformation chaining 【发布时间】:2019-01-28 13:38:31 【问题描述】:

我对 scala spark 生态系统非常陌生,想知道对链式数据帧转换进行单元测试的最佳方法是什么。所以这里是我想测试的方法的代码示例

def writeToParquet(spark: SparkSession, dataFrame: DataFrame, col1: DataType1, col2:DataType2): Unit 
    dataFrame
        .withColumn("date", some_columnar_date_logic)
        .withColumn("hour", some_more_functional_logic)
        .... //couple more transformation logic
        .write
        .mode(SaveMode.Append)
        .partitionBy("col1", "col2", "col3")
        .parquet("some hdfs/s3/url")        
 

问题是 parquet 是 Unit 返回类型,这使得测试变得困难。 转换本质上是不可变的,这使得模拟和监视变得有点困难,这一事实进一步放大了这个问题

为了创建数据框,我在 csv 中转储了测试数据集

【问题讨论】:

你检查了我下面的答案吗?如果不适合您的情况,请告诉我如何帮助将其适合您的情况 您好,如果您可以修改我的示例以便我可以更紧密地联系到它,将会很有帮助。 请立即查看。我相信它可以满足您的需求。 你检查我的答案了吗? 嗨,我检查了,我仍然对写入 Parquet 文件感到困惑。目前,我正在尝试模拟 DataFrame 并验证该方法被调用的次数。编写 shell 脚本并不是一个理想的解决方案 【参考方案1】:

请找到简单的数据框单元测试示例。你可以把它分成两部分。第一的。测试转换,你可以做简单的shell脚本来测试写入的文件

import com.holdenkarau.spark.testing._
import org.apache.spark.sql.DataFrame, Row
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types.IntegerType, StringType, StructField, StructType
import org.scalatest.FunSuite, Matchers

class SomeDFTest extends FunSuite with Matchers with DataFrameSuiteBase    
 import spark.implicits._

  test("Testing Input customer data date transformation") 


    val inputSchema = List(
      StructField("number", IntegerType, false),
      StructField("word", StringType, false)
    )
    val expectedSchema = List(
      StructField("number", IntegerType, false),
      StructField("word", StringType, false),
      StructField("dummyColumn", StringType, false)

    )
    val inputData = Seq(
      Row(8, "bat"),
      Row(64, "mouse"),
      Row(-27, "horse")
    )

    val expectedData = Seq(
      Row (8, "bat","test"),
      Row(64, "mouse","test"),
      Row(-27, "horse","test")
    )

    val inputDF = spark.createDataFrame(
      spark.sparkContext.parallelize(inputData),
      StructType(inputSchema)
    )

    val expectedDF = spark.createDataFrame(
      spark.sparkContext.parallelize(expectedData),
      StructType(expectedSchema)
    )


    val actual = transformSomeDf(inputDF)

    assertDataFrameEquals(actual, expectedDF) // equal



  

  def transformSomeDf(df:DataFrame):DataFrame=
    df.withColumn("dummyColumn",lit("test"))
  

Sbt.build 配置

name := "SparkTest"

version := "0.1"

scalaVersion := "2.11.8"

val sparkVersion = "2.3.0"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-hive" % sparkVersion % "provided",
"com.holdenkarau" %% "spark-testing-base" % "2.4.0_0.11.0" % Test

)

【讨论】:

【参考方案2】:

我在测试数据框时想到的第一件事就是将转换和 IO 分开

所以对于上述场景 我们可以把上面的链分成三部分

class Coordinator 
    def transformAndWrite(dataframe: Dataframe): Unit = 
transformedDf = dataFrame
        .withColumn("date", some_columnar_date_logic)
        .withColumn("hour", some_more_functional_logic)
        .... //couple more transformation logic
partitionedDfWriter = transformedDf.write
        .mode(SaveMode.Append)
        .partitionBy("col1", "col2", "col3")

partitionedDfWriter.parquet("some hdfs/s3/url")

现在我们可以将它们移动到三个单独的类中,

DFTransformer , DFPartitionerDataFrameParquetWriter extends ResourceWriter

所以代码会变成这样

class DFTransformer 
    def transform(dataframe:DataFrame): Dataframe = 
        return dataFrame
        .withColumn("date", some_columnar_date_logic)
        .withColumn("hour", some_more_functional_logic)
        .... //couple more transformation logic


class DfPartitioner 
    def partition(dataframe: DataFrame): DataFrameWriter = 
        return dataframe.write
        .mode(SaveMode.Append)
        .partitionBy("col1", "col2", "col3")
    


class DataFrameParquetWriter extends ResourceWriter 
    overide def write(partitionedDfWriter: DataFrameWriter) = 
       partitionedDfWriter.parquet("some hdfs/s3/url") 

    

class Coordinator(dfTransformer:DfTransformer, dfPartitioner: DFPartitioner, resourceWriter: ResourceWriter) 
    val transformedDf = dfTransformer.transform(dataframe)
    val partitionedDfWriter = dfPartitioner.partition(transformedDf)
    resourceWriter.write(partitionedDfWriter)

上面的好处是当你必须测试你的 Coordinator 类时,你可以很容易地使用Mockito 来模拟你的依赖。

现在测试DFTransformer 也很容易, 您可以传递一个存根数据帧并断言返回的数据帧。(使用 spark-testing-base)。我们还可以测试转换返回的列。我们也可以测试计数

【讨论】:

以上是关于单元测试火花数据帧转换链接的主要内容,如果未能解决你的问题,请参考以下文章

在 Swift 单元测试中,我在 App 和单元测试目标之间遇到类转换错误

Python单元测试:在单元测试用例中转换旧测试

数据流单元测试

将熊猫数据帧转换为火花数据帧时收到错误

无法将 aws 胶水动态帧转换为火花数据帧

单元测试读取数据库失败是啥原因