Spark:单元测试 - 我有一个联合 3 个输入数据集的函数。我应该对它们进行单元测试吗?

Posted

技术标签:

【中文标题】Spark:单元测试 - 我有一个联合 3 个输入数据集的函数。我应该对它们进行单元测试吗?【英文标题】:Spark: Unit Test - I have one function that unions 3 input datasets. Should I do Unit test on them? 【发布时间】:2019-09-23 16:15:56 【问题描述】:

我写了一段代码如下

Object Cal
def mergedatasets(df: Dataset[Row], df1: Dataset[Row],df2: Dataset[Row]):Dataset[Row]=
 df.union(df1).union(df2)
//other logic




object readDataframes
def readFirstDF(spark:SparkSession):Dataset[Row]=
 spark.read.json(somefile)

def readSecondDF(spark:SparkSession):Dataset[Row]=
 spark.read.json(somefile)

def readThirdDF(spark:SparkSession):Dataset[Row]=
 spark.read.json(somefile)


在上面的代码中,我正在读取 3 个文件,然后将它们合并到一个文件中,以便进一步处理。 基于上述情况,我的问题如下:

    对函数 mergdatasets 进行单元测试有意义吗?如果是,要测试的基本/最少的事情是什么?如何检查极端情况(如果有)? 对 readDataframes 进行单元测试是否有意义?如果是,要测试什么?是否要检查推断的架构是否符合预期?还有什么?

我也想将上述问题扩展到以下功能

def timeIntervalAgg(df: Dataset[Row]): Dataset[Row] = 

    val timeInterval = df
      .groupBy("id","page_number")
      .agg(sum("timeInterval").alias("timeInterval"))
    timeIntervalAgg

  

  def timeInterval(df: Dataset[Row]): Dataset[Row] =

    val windowSpec = Window.partitionBy("id").orderBy("date_time")
    val timeFmt = "yyyy-MM-dd'T'HH:mm:ss"
    val endTime = lead(col("date_time"),1).over(windowSpec)
    val startTime = col("date_time")
    val timeDiff = (unix_timestamp(endTime, timeFmt)
      - unix_timestamp(startTime, timeFmt))
    val timeInterval = df
      .withColumn("timeInterval", lit(when(col("event") === "this_event",lit(null)
        .cast("long"))
        .otherwise(timeDiff)))
      .where("""event != "this_event" """)
    timeInterval

  

  def addOddpages(df: Dataset[Row]) :Dataset[Row] = 

    val odd = df
      .where("""view_mode = "twin" """)
      .withColumn("page_odd", col("page") + 1)
      .drop("page")
      .select(col("id"), col("date_time")
        .cast("timestamp"),col("page_odd")
        .alias("page"), col("page_view_mode"),
        col("event"),col("timeInterval"))
    val timeIntervalWithoddPage = df.union(odd)
    timeIntervalWithoddPage

  

请建议是否需要以更好的方式重构代码 以实现更好的测试。

我的目标是了解要测试什么?需要注意什么 为上面的代码编写测试?所有这些问题都是针对 Spark 代码单元测试而不是其他语言代码测试。

如何在没有冗余测试火花的情况下进行单元测试 测试了吗? 是否需要像这样测试每个功能(因为逻辑/代码不是很复杂)还是最好测试 按正确顺序组合上述功能的功能。通过做 那么它可以称为单元测试吗? 请随时分享一些您可能编写的示例单元测试 上面的代码。

【问题讨论】:

我不会对这些方法进行单元测试,unionspark.read 应该由 spark 自己进行单元测试。您可以测试的是参数为空时的行为,即您可以测试是否引发了适当的异常 readDataframes 方法没有多大意义,为什么他们将Dataframe 作为输入,我希望他们采用文件路径/名称(字符串)? 是的,让我更正一下已读语句 【参考方案1】:

读取 JSON 文件:如果您只是读取 JSON 文件,则无需对此进行测试。 此外,最好阅读schema() 中具有显式模式的文件,以避免推断模式的一些问题。此外,您不需要 3 种相同的方法来读取文件。

Union Datasets:从 Spark 2.3.0 开始,就有了 unionByName() 函数。 该函数按名称(而不是按位置)解析列。当您的 DataFrame 具有不同的列顺序时,您可以考虑使用这些函数来避免联合问题。当然,这个功能不需要测试。 mergedatasets() 方法中的//other logic 代码很难说。

对于单元测试,您可以使用 ScalaTest 或其他工具。

使用master("local") 创建 SparkSession; 使用预期数据创建一个 DataFrame; 为您要测试的每个方法创建一个输入 DataFrame。; 比较预期和实际的 DataFrame;

以下项目可能有用。您可以在那里找到如何比较两个 DataFrame。此外,README 中有几个示例:https://github.com/MrPowers/spark-fast-tests

【讨论】:

以上是关于Spark:单元测试 - 我有一个联合 3 个输入数据集的函数。我应该对它们进行单元测试吗?的主要内容,如果未能解决你的问题,请参考以下文章

Spark 单元测试(在 intellij 中)

.NET Apache Spark 的单元测试

在火花中有效地使用联合

C++ - 包含数组的联合

spark如何加载大于集群磁盘大小的输入文件?

Spark程序进行单元测试-使用scala