使用python对pyspark代码进行单元测试
Posted
技术标签:
【中文标题】使用python对pyspark代码进行单元测试【英文标题】:Unit test pyspark code using python 【发布时间】:2018-03-22 04:37:16 【问题描述】:我在pyspark
中有脚本,如下所示。我想在这个脚本中对function
进行单元测试。
def rename_chars(column_name):
chars = ((' ', '_&'), ('.', '_$'))
new_cols = reduce(lambda a, kv: a.replace(*kv), chars, column_name)
return new_cols
def column_names(df):
changed_col_names = df.schema.names
for cols in changed_col_names:
df = df.withColumnRenamed(cols, rename_chars(cols))
return df
我写了如下unittest
来测试功能。
但我不知道如何提交unittest
。我已经完成了spark-submit
,它什么也没做。
import unittest
from my_script import column_names
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
cols = ['ID', 'NAME', 'last.name', 'abc test']
val = [(1, 'Sam', 'SMITH', 'eng'), (2, 'RAM', 'Reddy', 'turbine')]
df = sqlContext.createDataFrame(val, cols)
class RenameColumnNames(unittest.TestCase):
def test_column_names(self):
df1 = column_names(df)
result = df1.schema.names
expected = ['ID', 'NAME', 'last_$name', 'abc_&test']
self.assertEqual(result, expected)
如何将此脚本集成为unittest
我可以在安装了pyspark
的节点上运行什么?
【问题讨论】:
unittest
问题似乎在本地机器上解决了,如何使用pip
anaconda
在服务器上创建一个virtualenv 是另一个主题,您可以创建一个不同的线程来制作服务器上的安装、测试和开发,
【参考方案1】:
Pyspark 单元测试指南
1.您需要从站点download Spark 分发并解压。或者,如果您已经拥有 Spark 和 Python 的工作发行版,只需安装 pyspark:pip install pyspark
2.根据需要设置这样的系统变量:
export SPARK_HOME="/home/eugene/spark-1.6.0-bin-hadoop2.6"
export PYTHONPATH="$SPARK_HOME/python/:$SPARK_HOME/python/lib/py4j-0.9-src.zip:$PYTHONPATH"
export PATH="SPARK_HOME/bin:$PATH"
我在主目录的 .profile 中添加了这个。 如果您已经有 Spark 的有效分发版,则可以设置此变量。
3.另外你可能需要设置:
PYSPARK_SUBMIT_ARGS="--jars path/to/hive/jars/jar.jar,path/to/other/jars/jar.jar --conf spark.driver.userClassPathFirst=true --master local[*] pyspark-shell"
PYSPARK_PYTHON="/home/eugene/anaconda3/envs/ste/bin/python3"
Python 和罐子?是的。 Pyspark uses py4j 与 Spark 的 java 部分进行通信。如果你想解决更复杂的情况,比如run Kafka server with tests in Python,或者像例子中那样使用来自 Scala 的 TestHiveContext,你应该指定 jars。 我是通过 Idea 运行配置环境变量做到的。
4.您可以使用pyspark/tests.py
、pyspark/streaming/tests.py
、pyspark/sql/tests.py
、pyspark/ml/tests.py
、pyspark/mllib/tests.py
scripts,其中包含用于测试 pyspark 应用程序的各种 TestCase 类和示例。在您的情况下,您可以这样做(来自 pyspark/sql/tests.py 的示例):
class HiveContextSQLTests(ReusedPySparkTestCase):
@classmethod
def setUpClass(cls):
ReusedPySparkTestCase.setUpClass()
cls.tempdir = tempfile.NamedTemporaryFile(delete=False)
try:
cls.sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
except py4j.protocol.Py4JError:
cls.tearDownClass()
raise unittest.SkipTest("Hive is not available")
except TypeError:
cls.tearDownClass()
raise unittest.SkipTest("Hive is not available")
os.unlink(cls.tempdir.name)
_scala_HiveContext =\
cls.sc._jvm.org.apache.spark.sql.hive.test.TestHiveContext(cls.sc._jsc.sc())
cls.sqlCtx = HiveContext(cls.sc, _scala_HiveContext)
cls.testData = [Row(key=i, value=str(i)) for i in range(100)]
cls.df = cls.sc.parallelize(cls.testData).toDF()
@classmethod
def tearDownClass(cls):
ReusedPySparkTestCase.tearDownClass()
shutil.rmtree(cls.tempdir.name, ignore_errors=True)
但您需要在 PYSPARK_SUBMIT_ARGS 中指定 --jars 和 Hive 库,如前所述
或者没有 Hive:
class SQLContextTests(ReusedPySparkTestCase):
def test_get_or_create(self):
sqlCtx = SQLContext.getOrCreate(self.sc)
self.assertTrue(SQLContext.getOrCreate(self.sc) is sqlCtx)
据我所知,如果 pyspark 是通过 pip
安装的,您没有在示例中描述的 tests.py。 在这种情况下,只需从 Spark 站点下载分发并复制代码示例。
现在您可以正常运行您的 TestCase:python -m unittest test.py
更新: 由于不推荐使用 HiveContext 和 SqlContext 的 spark 2.3。 您可以使用 SparkSession Hive API。
【讨论】:
对我来说,问题是我无法在我的 edgenode 中安装spark
。我在我的edgenode 中安装了native python
,cloudera 提供了anaconda
用于pyspark
我想使用cloudera 提供的spark
在边缘节点上运行unittest
。
只是好奇,这里相关的jar
文件是什么?
@Gang 我已经在回答中描述了。
@user9367133 正如 ksindi 所说,只需在您的 anaconda
发行版中安装 pyspark
并通过 python -m unittest script.py
运行测试【参考方案2】:
这是一种方法。在 CLI 调用中:
python -m unittest my_unit_test_script.py
代码
import functools
import unittest
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
def rename_chars(column_name):
chars = ((' ', '_&'), ('.', '_$'))
new_cols = functools.reduce(lambda a, kv: a.replace(*kv), chars, column_name)
return new_cols
def column_names(df):
changed_col_names = df.schema.names
for cols in changed_col_names:
df = df.withColumnRenamed(cols, rename_chars(cols))
return df
class RenameColumnNames(unittest.TestCase):
def setUp(self):
conf = SparkConf()
sc = SparkContext(conf=conf)
self.sqlContext = HiveContext(sc)
def test_column_names(self):
cols = ['ID', 'NAME', 'last.name', 'abc test']
val = [(1, 'Sam', 'SMITH', 'eng'), (2, 'RAM', 'Reddy', 'turbine')]
df = self.sqlContext.createDataFrame(val, cols)
result = df.schema.names
expected = ['ID', 'NAME', 'last_$name', 'abc_&test']
self.assertEqual(result, expected)
【讨论】:
它不起作用:预期:['ID', 'NAME', 'last.name', 'abc test'] 实际:['ID', 'NAME', 'last_$名称', 'abc_&test'] @EugeneLopatkin 问题不在于修复单元测试的正确性:-) @ksindi 如果我在本地机器上安装了pyspark
和hadoop
,您的解决方案就可以工作。但对我来说,问题是如果我在edge node
上运行此脚本,那么作业将因No module error pyspark
而失败,因为在我的python 中我没有安装pyspark
。 How can I use the existing Hadoop environment and cloudera given anaconda to run the unittest
@user9367133 你可以 pip install pyspark
从 spark 2.3 开始。也许将其添加为依赖项?
@ksindi 如果我使用 pip 安装它会得到必要的二进制文件【参考方案3】:
假设您已安装 pyspark
(例如,在 venv 上的 pip install pyspark
),您可以使用下面的类在 unittest
中对其进行单元测试:
import unittest
import pyspark
class PySparkTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
conf = pyspark.SparkConf().setMaster("local[*]").setAppName("testing")
cls.sc = pyspark.SparkContext(conf=conf)
cls.spark = pyspark.SQLContext(cls.sc)
@classmethod
def tearDownClass(cls):
cls.sc.stop()
例子:
class SimpleTestCase(PySparkTestCase):
def test_with_rdd(self):
test_input = [
' hello spark ',
' hello again spark spark'
]
input_rdd = self.sc.parallelize(test_input, 1)
from operator import add
results = input_rdd.flatMap(lambda x: x.split()).map(lambda x: (x, 1)).reduceByKey(add).collect()
self.assertEqual(results, [('hello', 2), ('spark', 3), ('again', 1)])
def test_with_df(self):
df = self.spark.createDataFrame(data=[[1, 'a'], [2, 'b']],
schema=['c1', 'c2'])
self.assertEqual(df.count(), 2)
请注意,这会为每个类创建一个上下文。使用 setUp
而不是 setUpClass
来获取每个测试的上下文。这通常会在执行测试时增加大量开销时间,因为目前创建新的 Spark 上下文非常昂贵。
【讨论】:
【参考方案4】:这是一种测试功能的轻量级方法。您无需下载 Spark 即可运行 PySpark 测试,如接受的答案大纲。下载 Spark 是一种选择,但不是必需的。这是测试:
import pysparktestingexample.*** as SO
from chispa import assert_df_equality
import pyspark.sql.functions as F
def test_column_names(spark):
source_data = [
("jose", "oak", "switch")
]
source_df = spark.createDataFrame(source_data, ["some first name", "some.tree.type", "a gaming.system"])
actual_df = SO.column_names(source_df)
expected_data = [
("jose", "oak", "switch")
]
expected_df = spark.createDataFrame(expected_data, ["some_&first_&name", "some_$tree_$type", "a_&gaming_$system"])
assert_df_equality(actual_df, expected_df)
测试使用的SparkSession在tests/conftest.py
文件中定义:
import pytest
from pyspark.sql import SparkSession
@pytest.fixture(scope='session')
def spark():
return SparkSession.builder \
.master("local") \
.appName("chispa") \
.getOrCreate()
测试使用chispa库中定义的assert_df_equality
函数。
这是 GitHub 存储库中的 your code 和 the test。
pytest 在 Python 社区中通常比 unittest 更受欢迎。 This blog post 解释了如何测试 PySpark 程序,讽刺的是有一个 modify_column_names
函数可以让你更优雅地重命名这些列。
def modify_column_names(df, fun):
for col_name in df.columns:
df = df.withColumnRenamed(col_name, fun(col_name))
return df
【讨论】:
以上是关于使用python对pyspark代码进行单元测试的主要内容,如果未能解决你的问题,请参考以下文章
对 Python GUI 应用程序进行单元测试的推荐方法是啥?