PySpark 将模型预测与未转换的数据对齐:最佳实践

Posted

技术标签:

【中文标题】PySpark 将模型预测与未转换的数据对齐:最佳实践【英文标题】:PySpark align model predictions with untransformed data: best practice 【发布时间】:2020-09-03 14:08:05 【问题描述】:

使用 PySpark 的 ML 模块,经常会发生以下步骤(在数据清洗等之后):

    执行特征和目标转换管道 创建模型 从模型生成预测 为业务用户和模型验证目的将预测和原始数据集合并在一起

采用简化的 sn-p 代码:

predictions = model.transform(test_df)

这个predictions 数据框将只有预测(以及概率,可能还有预测的转换)。但它不会包含原始数据集。

如何将预测与原始 PySpark DataFrame 相结合?

对我来说,如何将原始数据集(甚至是转换后的 test_df)与预测结合起来并不明显;没有要加入的共享列,adding an index column seems quite tricky for large datasets。

当前解决方案:

对于大型数据集,比如我正在使用的,我尝试过the suggestion here:

test_df = test_df.repartition(predictions.rdd.getNumPartitions())
joined_schema = StructType(test_df.schema.fields + predictions.schema.fields)
interim_rdd = test_df.rdd.zip(predictions.rdd).map(lambda x: x[0] + x[1])
full_data = spark.createDataFrame(interim_rdd, joined_schema)
full_data.write.parquet(my_predictions_path, mode="overwrite")


但我不喜欢这个有两个原因:

    我不完全确定是否维持秩序。该链接表明应该是,但我不明白为什么。 它有时会崩溃,即使我如上所示强制重新分区,当我尝试通过上面的最后一行写入数据时出现以下错误

Caused by: org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition


我不想使用有时给出的 monotonically_increasing_id 建议,因为我的数据集太大而无法执行此操作。


这似乎很重要:如果无法将预测与原始目标进行比较,我怎么能报告任何模型质量。其他人是怎么做到的??

【问题讨论】:

【参考方案1】:

当调用model = <your ml-algorithm>.fit(df_train) 时,训练数据集可以有任意数量的附加列。只有包含特征和标签的列将用于训练模型(通常称为 featureslabel,可配置),但可以存在其他列。

在下一步中对已训练模型调用 predictions = model.transform(df_test) 时,将返回一个数据帧,其中包含 附加predictionprobabilityrawPrediction

尤其是原始特征列和标签列仍然是数据框的一部分。此外,作为 df_test 一部分的 any 列在输出中仍然可用,并可用于识别行。

prediction = model.transform(df_test)
prediction.printSchema()

打印

root
 |-- feature1: double (nullable = true)
 |-- feature2: double (nullable = true)
 |-- feature3: double (nullable = true)
 |-- label: double (nullable = true)
 |-- additional_data: string (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

如果df_test 不仅包含所需的列features,还包含其他列,包括label。例如,通过评估labelprediction,可以创建BinaryClassificationMetrics。

调用model.transform 在技术上是Dataset.withColumn call。


基于Spark docs 中的 ML Pipeline 示例的示例:Spark ML 工作流通常以包含训练数据、特征和标签(=目标值)的数据框开始。在此示例中,还存在一个与 ml 过程无关的附加列。

training_original = spark.createDataFrame([
    (0.0, 1.1, 0.1, 1.0, 'any random value that is not used to train the model'),
    (2.0, 1.0, -1.0, 0.0, 'another value'),
    (2.0, 1.3, 1.0, 0.0, 'value 3'),
    (0.0, 1.2, -0.5, 1.0, 'this value is also not used for training nor testing')],  
    ["feature1", "feature2", "feature3", "label", "additional_data"])

然后使用转换器将特征组合成一列。这个任务最简单的转换器是VectorAssembler

from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
    inputCols=["feature1", "feature2", "feature3"],
    outputCol="features")
training_transformed = assembler.transform(training_original)
#+--------+--------+--------+-----+--------------------+--------------+          
#|feature1|feature2|feature3|label|     additional_data|      features|
#+--------+--------+--------+-----+--------------------+--------------+
#|     0.0|     1.1|     0.1|  1.0|any random value ...| [0.0,1.1,0.1]|
#| ...

现在可以使用 featureslabel 列在此数据帧上训练模型。附加列存在,但将被fit 方法忽略。

lr = LogisticRegression(maxIter=10, regParam=0.01)
model = lr.fit(training_transformed)

现在根据测试数据对模型进行测试。准备与训练数据相同:

test_df = spark.createDataFrame([
    (-1.0, 1.5, 1.3, 1.0, 'test value 1'),
    (3.0, 2.0, -0.1, 0.0, 'another test value'),
    (0.0, 2.2, -1.5, 1.0, 'this is not important')],
    ["feature1", "feature2", "feature3", "label", "additional_data"])
test_df_transformed = assembler.transform(test_df)
#+--------+--------+--------+-----+--------------------+--------------+
#|feature1|feature2|feature3|label|     additional_data|      features|
#+--------+--------+--------+-----+--------------------+--------------+
#|    -1.0|     1.5|     1.3|  1.0|        test value 1|[-1.0,1.5,1.3]|
#| ...

运行机器学习魔法产生

prediction = model.transform(test_df_transformed)
#+--------+--------+--------+-----+--------------------+--------------+--------------------+--------------------+----------+
#|feature1|feature2|feature3|label|     additional_data|      features|       rawPrediction|         probability|prediction|
#+--------+--------+--------+-----+--------------------+--------------+--------------------+--------------------+----------+
#|    -1.0|     1.5|     1.3|  1.0|        test value 1|[-1.0,1.5,1.3]|[-6.5872014439355...|[0.00137599470692...|       1.0|
#| ...

这个数据框现在包含原始输入数据(feature1feature3additional_data)、预期目标值(label)、转换后的特征(features)和模型预测的结果(prediction)。这是所有输入值、目标值和预测都在一个数据集中可用的地方。这里是评估模型和计算模型所需指标的地方。将模型应用于新数据会得到相同的结果(当然没有label 列)。

【讨论】:

嗨@werner,我想我不清楚,因为你刚刚证明了我的挑战。输入值的 DF 只能包含特征。我的原始数据集对其进行转换以满足 Spark MLLib model.fit 的需求之前,还包含目标值。所以现在我需要将原始数据集与目标值与具有预测的数据集结合起来。对于任何大小合适的数据集,我都找不到加入数据集的方法。到目前为止我得到的最好的建议是push to CSV and do a CLI paste 命令,这似乎很疯狂。 @MikeWilliamson 附加列对于model.fitmodel.transform 没有问题。输入数据集不限于特征和标签,它们可以包含那里的任何列。 Spark ML 工作流的想法是不断向现有数据集添加新列,而不会在途中删除列。我在示例中添加了更多细节,希望现在可以更好地理解我的意思。 当然。我很笨,以为我可以提供一个包含特征的DF。我得到它与您保留附加列的建议一起使用。由于我仍然不明白的原因,将预测写入磁盘需要 forever(数小时),而将输入数据集写入磁盘只需几分钟。 (相同的分区方案。)非常感谢!

以上是关于PySpark 将模型预测与未转换的数据对齐:最佳实践的主要内容,如果未能解决你的问题,请参考以下文章

PySpark MLLIB 随机森林:预测总是 0

pyspark GBTRegressor 对象在加载模型后没有属性“转换”

重构pyspark数据框

将 Pyspark Python k-means 模型预测插入具有原始 RDD 项和特征的 DF

将列表转换为 pyspark 数据框

在 pyspark 中转换或处理日期数据类型的最佳方法是啥