Spark:单元测试 - 我有一个联合 3 个输入数据集的函数。我应该对它们进行单元测试吗?
Posted
技术标签:
【中文标题】Spark:单元测试 - 我有一个联合 3 个输入数据集的函数。我应该对它们进行单元测试吗?【英文标题】:Spark: Unit Test - I have one function that unions 3 input datasets. Should I do Unit test on them? 【发布时间】:2019-09-23 16:15:56 【问题描述】:我写了一段代码如下
Object Cal
def mergedatasets(df: Dataset[Row], df1: Dataset[Row],df2: Dataset[Row]):Dataset[Row]=
df.union(df1).union(df2)
//other logic
object readDataframes
def readFirstDF(spark:SparkSession):Dataset[Row]=
spark.read.json(somefile)
def readSecondDF(spark:SparkSession):Dataset[Row]=
spark.read.json(somefile)
def readThirdDF(spark:SparkSession):Dataset[Row]=
spark.read.json(somefile)
在上面的代码中,我正在读取 3 个文件,然后将它们合并到一个文件中,以便进一步处理。 基于上述情况,我的问题如下:
-
对函数 mergdatasets 进行单元测试有意义吗?如果是,要测试的基本/最少的事情是什么?如何检查极端情况(如果有)?
对 readDataframes 进行单元测试是否有意义?如果是,要测试什么?是否要检查推断的架构是否符合预期?还有什么?
我也想将上述问题扩展到以下功能
def timeIntervalAgg(df: Dataset[Row]): Dataset[Row] =
val timeInterval = df
.groupBy("id","page_number")
.agg(sum("timeInterval").alias("timeInterval"))
timeIntervalAgg
def timeInterval(df: Dataset[Row]): Dataset[Row] =
val windowSpec = Window.partitionBy("id").orderBy("date_time")
val timeFmt = "yyyy-MM-dd'T'HH:mm:ss"
val endTime = lead(col("date_time"),1).over(windowSpec)
val startTime = col("date_time")
val timeDiff = (unix_timestamp(endTime, timeFmt)
- unix_timestamp(startTime, timeFmt))
val timeInterval = df
.withColumn("timeInterval", lit(when(col("event") === "this_event",lit(null)
.cast("long"))
.otherwise(timeDiff)))
.where("""event != "this_event" """)
timeInterval
def addOddpages(df: Dataset[Row]) :Dataset[Row] =
val odd = df
.where("""view_mode = "twin" """)
.withColumn("page_odd", col("page") + 1)
.drop("page")
.select(col("id"), col("date_time")
.cast("timestamp"),col("page_odd")
.alias("page"), col("page_view_mode"),
col("event"),col("timeInterval"))
val timeIntervalWithoddPage = df.union(odd)
timeIntervalWithoddPage
请建议是否需要以更好的方式重构代码 以实现更好的测试。
我的目标是了解要测试什么?需要注意什么 为上面的代码编写测试?所有这些问题都是针对 Spark 代码单元测试而不是其他语言代码测试。
如何在没有冗余测试火花的情况下进行单元测试 测试了吗? 是否需要像这样测试每个功能(因为逻辑/代码不是很复杂)还是最好测试 按正确顺序组合上述功能的功能。通过做 那么它可以称为单元测试吗? 请随时分享一些您可能编写的示例单元测试 上面的代码。【问题讨论】:
我不会对这些方法进行单元测试,union
和 spark.read
应该由 spark
自己进行单元测试。您可以测试的是参数为空时的行为,即您可以测试是否引发了适当的异常
readDataframes
方法没有多大意义,为什么他们将Dataframe
作为输入,我希望他们采用文件路径/名称(字符串)?
是的,让我更正一下已读语句
【参考方案1】:
读取 JSON 文件:如果您只是读取 JSON 文件,则无需对此进行测试。
此外,最好阅读schema()
中具有显式模式的文件,以避免推断模式的一些问题。此外,您不需要 3 种相同的方法来读取文件。
Union Datasets:从 Spark 2.3.0 开始,就有了 unionByName()
函数。
该函数按名称(而不是按位置)解析列。当您的 DataFrame 具有不同的列顺序时,您可以考虑使用这些函数来避免联合问题。当然,这个功能不需要测试。 mergedatasets()
方法中的//other logic
代码很难说。
对于单元测试,您可以使用 ScalaTest 或其他工具。
使用master("local")
创建 SparkSession;
使用预期数据创建一个 DataFrame;
为您要测试的每个方法创建一个输入 DataFrame。;
比较预期和实际的 DataFrame;
以下项目可能有用。您可以在那里找到如何比较两个 DataFrame。此外,README 中有几个示例:https://github.com/MrPowers/spark-fast-tests
【讨论】:
以上是关于Spark:单元测试 - 我有一个联合 3 个输入数据集的函数。我应该对它们进行单元测试吗?的主要内容,如果未能解决你的问题,请参考以下文章