使用python对pyspark代码进行单元测试

Posted

技术标签:

【中文标题】使用python对pyspark代码进行单元测试【英文标题】:Unit test pyspark code using python 【发布时间】:2018-03-22 04:37:16 【问题描述】:

我在pyspark 中有脚本,如下所示。我想在这个脚本中对function 进行单元测试。

def rename_chars(column_name):
    chars = ((' ', '_&'), ('.', '_$'))
    new_cols = reduce(lambda a, kv: a.replace(*kv), chars, column_name)
    return new_cols


def column_names(df):
    changed_col_names = df.schema.names
    for cols in changed_col_names:
        df = df.withColumnRenamed(cols, rename_chars(cols))
    return df   

我写了如下unittest 来测试功能。

但我不知道如何提交unittest。我已经完成了spark-submit,它什么也没做。

import unittest
from my_script import column_names

from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)

cols = ['ID', 'NAME', 'last.name', 'abc test']
val = [(1, 'Sam', 'SMITH', 'eng'), (2, 'RAM', 'Reddy', 'turbine')]
df = sqlContext.createDataFrame(val, cols)


class RenameColumnNames(unittest.TestCase):
    def test_column_names(self):
        df1 = column_names(df)
        result = df1.schema.names
        expected = ['ID', 'NAME', 'last_$name', 'abc_&test']
        self.assertEqual(result, expected)

如何将此脚本集成为unittest

我可以在安装了pyspark 的节点上运行什么?

【问题讨论】:

unittest 问题似乎在本地机器上解决了,如何使用pip anaconda 在服务器上创建一个virtualenv 是另一个主题,您可以创建一个不同的线程来制作服务器上的安装、测试和开发, 【参考方案1】:

Pyspark 单元测试指南

1.您需要从站点download Spark 分发并解压。或者,如果您已经拥有 Spark 和 Python 的工作发行版,只需安装 pysparkpip install pyspark

2.根据需要设置这样的系统变量:

export SPARK_HOME="/home/eugene/spark-1.6.0-bin-hadoop2.6"
export PYTHONPATH="$SPARK_HOME/python/:$SPARK_HOME/python/lib/py4j-0.9-src.zip:$PYTHONPATH"
export PATH="SPARK_HOME/bin:$PATH"

我在主目录的 .profile 中添加了这个。 如果您已经有 Spark 的有效分发版,则可以设置此变量。

3.另外你可能需要设置:

PYSPARK_SUBMIT_ARGS="--jars path/to/hive/jars/jar.jar,path/to/other/jars/jar.jar --conf spark.driver.userClassPathFirst=true --master local[*] pyspark-shell"
PYSPARK_PYTHON="/home/eugene/anaconda3/envs/ste/bin/python3"

Python 和罐子?是的。 Pyspark uses py4j 与 Spark 的 java 部分进行通信。如果你想解决更复杂的情况,比如run Kafka server with tests in Python,或者像例子中那样使用来自 Scala 的 TestHiveContext,你应该指定 jars。 我是通过 Idea 运行配置环境变量做到的。

4.您可以使用pyspark/tests.pypyspark/streaming/tests.pypyspark/sql/tests.pypyspark/ml/tests.pypyspark/mllib/tests.pyscripts,其中包含用于测试 pyspark 应用程序的各种 TestCase 类和示例。在您的情况下,您可以这样做(来自 pyspark/sql/tests.py 的示例):

class HiveContextSQLTests(ReusedPySparkTestCase):

    @classmethod
    def setUpClass(cls):
        ReusedPySparkTestCase.setUpClass()
        cls.tempdir = tempfile.NamedTemporaryFile(delete=False)
        try:
            cls.sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
        except py4j.protocol.Py4JError:
            cls.tearDownClass()
            raise unittest.SkipTest("Hive is not available")
        except TypeError:
            cls.tearDownClass()
            raise unittest.SkipTest("Hive is not available")
        os.unlink(cls.tempdir.name)
        _scala_HiveContext =\
            cls.sc._jvm.org.apache.spark.sql.hive.test.TestHiveContext(cls.sc._jsc.sc())
        cls.sqlCtx = HiveContext(cls.sc, _scala_HiveContext)
        cls.testData = [Row(key=i, value=str(i)) for i in range(100)]
        cls.df = cls.sc.parallelize(cls.testData).toDF()

    @classmethod
    def tearDownClass(cls):
        ReusedPySparkTestCase.tearDownClass()
        shutil.rmtree(cls.tempdir.name, ignore_errors=True)

但您需要在 PYSPARK_SUBMIT_ARGS 中指定 --jars 和 Hive 库,如前所述

或者没有 Hive:

class SQLContextTests(ReusedPySparkTestCase):
    def test_get_or_create(self):
        sqlCtx = SQLContext.getOrCreate(self.sc)
        self.assertTrue(SQLContext.getOrCreate(self.sc) is sqlCtx)

据我所知,如果 pyspark 是通过 pip 安装的,您没有在示例中描述的 tests.py。 在这种情况下,只需从 Spark 站点下载分发并复制代码示例。

现在您可以正常运行您的 TestCase:python -m unittest test.py

更新: 由于不推荐使用 HiveContext 和 SqlContext 的 spark 2.3。 您可以使用 SparkSession Hive API。

【讨论】:

对我来说,问题是我无法在我的 edgenode 中安装 spark。我在我的edgenode 中安装了native python,cloudera 提供了anaconda 用于pyspark 我想使用cloudera 提供的spark 在边缘节点上运行unittest 只是好奇,这里相关的jar文件是什么? @Gang 我已经在回答中描述了。 @user9367133 正如 ksindi 所说,只需在您的 anaconda 发行版中安装 pyspark 并通过 python -m unittest script.py 运行测试【参考方案2】:

这是一种方法。在 CLI 调用中:

python -m unittest my_unit_test_script.py

代码

import functools
import unittest

from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext


def rename_chars(column_name):
    chars = ((' ', '_&'), ('.', '_$'))
    new_cols = functools.reduce(lambda a, kv: a.replace(*kv), chars, column_name)
    return new_cols


def column_names(df):
    changed_col_names = df.schema.names
    for cols in changed_col_names:
        df = df.withColumnRenamed(cols, rename_chars(cols))
    return df


class RenameColumnNames(unittest.TestCase):
    def setUp(self):
        conf = SparkConf()
        sc = SparkContext(conf=conf)
        self.sqlContext = HiveContext(sc)

    def test_column_names(self):
        cols = ['ID', 'NAME', 'last.name', 'abc test']
        val = [(1, 'Sam', 'SMITH', 'eng'), (2, 'RAM', 'Reddy', 'turbine')]
        df = self.sqlContext.createDataFrame(val, cols)
        result = df.schema.names
        expected = ['ID', 'NAME', 'last_$name', 'abc_&test']
        self.assertEqual(result, expected)

【讨论】:

它不起作用:预期:['ID', 'NAME', 'last.name', 'abc test'] 实际:['ID', 'NAME', 'last_$名称', 'abc_&test'] @EugeneLopatkin 问题不在于修复单元测试的正确性:-) @ksindi 如果我在本地机器上安装了pysparkhadoop,您的解决方案就可以工作。但对我来说,问题是如果我在edge node 上运行此脚本,那么作业将因No module error pyspark 而失败,因为在我的python 中我没有安装pysparkHow can I use the existing Hadoop environment and cloudera given anaconda to run the unittest @user9367133 你可以 pip install pyspark 从 spark 2.3 开始。也许将其添加为依赖项? @ksindi 如果我使用 pip 安装它会得到必要的二进制文件【参考方案3】:

假设您已安装 pyspark(例如,在 venv 上的 pip install pyspark),您可以使用下面的类在 unittest 中对其进行单元测试:

import unittest
import pyspark


class PySparkTestCase(unittest.TestCase):

    @classmethod
    def setUpClass(cls):
        conf = pyspark.SparkConf().setMaster("local[*]").setAppName("testing")
        cls.sc = pyspark.SparkContext(conf=conf)
        cls.spark = pyspark.SQLContext(cls.sc)

    @classmethod
    def tearDownClass(cls):
        cls.sc.stop()

例子:

class SimpleTestCase(PySparkTestCase):

    def test_with_rdd(self):
        test_input = [
            ' hello spark ',
            ' hello again spark spark'
        ]

        input_rdd = self.sc.parallelize(test_input, 1)

        from operator import add

        results = input_rdd.flatMap(lambda x: x.split()).map(lambda x: (x, 1)).reduceByKey(add).collect()
        self.assertEqual(results, [('hello', 2), ('spark', 3), ('again', 1)])

    def test_with_df(self):
        df = self.spark.createDataFrame(data=[[1, 'a'], [2, 'b']], 
                                        schema=['c1', 'c2'])
        self.assertEqual(df.count(), 2)

请注意,这会为每个类创建一个上下文。使用 setUp 而不是 setUpClass 来获取每个测试的上下文。这通常会在执行测试时增加大量开销时间,因为目前创建新的 Spark 上下文非常昂贵。

【讨论】:

【参考方案4】:

这是一种测试功能的轻量级方法。您无需下载 Spark 即可运行 PySpark 测试,如接受的答案大纲。下载 Spark 是一种选择,但不是必需的。这是测试:

import pysparktestingexample.*** as SO
from chispa import assert_df_equality
import pyspark.sql.functions as F

def test_column_names(spark):
    source_data = [
        ("jose", "oak", "switch")
    ]
    source_df = spark.createDataFrame(source_data, ["some first name", "some.tree.type", "a gaming.system"])

    actual_df = SO.column_names(source_df)

    expected_data = [
        ("jose", "oak", "switch")
    ]
    expected_df = spark.createDataFrame(expected_data, ["some_&first_&name", "some_$tree_$type", "a_&gaming_$system"])

    assert_df_equality(actual_df, expected_df)

测试使用的SparkSession在tests/conftest.py文件中定义:

import pytest
from pyspark.sql import SparkSession

@pytest.fixture(scope='session')
def spark():
    return SparkSession.builder \
      .master("local") \
      .appName("chispa") \
      .getOrCreate()

测试使用chispa库中定义的assert_df_equality函数。

这是 GitHub 存储库中的 your code 和 the test。

pytest 在 Python 社区中通常比 unittest 更受欢迎。 This blog post 解释了如何测试 PySpark 程序,讽刺的是有一个 modify_column_names 函数可以让你更优雅地重命名这些列。

def modify_column_names(df, fun):
    for col_name in df.columns:
        df = df.withColumnRenamed(col_name, fun(col_name))
    return df

【讨论】:

以上是关于使用python对pyspark代码进行单元测试的主要内容,如果未能解决你的问题,请参考以下文章

使用 Python 对 C 代码进行单元测试的最简单方法

第1129期对vue.js单文件(.vue)进行单元测试

对 Python GUI 应用程序进行单元测试的推荐方法是啥?

使用python对pyspark数据帧进行转置操作

如何对绘制 PDF 图形的 Python 函数进行单元测试?

Go 单元测试从 0 到 1