spark过滤算子+StringIndexer算子出发的一个逻辑bug

Posted the.forgotten

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark过滤算子+StringIndexer算子出发的一个逻辑bug相关的知识,希望对你有一定的参考价值。

问题描述:

在一段spark机器学习的程序中,同时用到了Filter算子和StringIndexer算子,其中StringIndexer在前,filter在后,并且filter是对stringindexer的输出列设置了过滤条件,filter算子之后将数据集灌到随机森林中(试过决策树分类和逻辑回归同样都会触发bug,与filter后面具体是什么算子没有关系),然后再运行的时候报了一个错,错误的原因是源数据中出现了stringindexer模型中没有的标签值。用过stringindexer这个算子的人应该了解,这个算子其实就是把输入数据集中的某一列(一般是离散值)进行编码,按照值出现的次数降序排序存放到一个数组中,然后每种标签值就被映射为它在数组中对应的下标值,这样讲离散的字符串变量转换为double类型的值,方便后面灌入到机器学习算法中进行处理。回到我之前说的报的错误:源数据中出现了stringindexer模型中没有的标签值。这个就很费解,因为stringindexer是根据输入数据进行训练得到的模型,然后我现在用同样的一份数据通过stringindexer训练的模型再进行转换却出现了问题!!

这个问题我想了很久,最后发现跟spark-sql中的基于规则的优化(RBO)有关,具体的说跟PredictionPushdown(过滤条件下推)有关。回到spark程序中,我开始仔细分析整个数据处理流程。stringindexer的输入数据集其实是通过两个数据集进行内等连接得到的,我们知道内等连接其实是能够对数据起到过滤效果的。

另一方面,filter算子由于PredictionPushdown的优化,在实际进行物理计算的时候并不是在stringindexer转换之后执行的,而是被下推到了其中一张表读取的最开始的时候,并且这个下推的过滤条件中是带有一个UDF的,因为在我的程序中,filter算子是对stringindexer输出列设置过滤条件,那么这个输出列再源数据中是不存在的,源数据中只存在stringindexer的输入列,因此实际上下推后的过滤条件是先对这个原始标签列转换为double类型,然后再根据filter中设置的过滤条件判断这个double类型的值,从而实现真正的过滤逻辑。

至此,产生bug的所有条件已经具备,我再把为什么会出现这个错误的原因整个理一下:

原始数据有两张表,对这两张表进行join后的数据输入到stringindexer中,假设其中一张表中有一列标签列rawlabel,他的离散值是‘a‘,‘b‘,‘c‘,‘d‘,但是经过join后只剩下了‘a‘,‘b‘,‘c‘三种值,那么这个数据输入到stringindexer进行训练后得到的模型中保存了一个map, map的内容是(‘a‘->0,‘b‘->1,‘c‘->2), stringindexer的模型根据这个map对输入的数据进行转换。 那么rawlabel经过stringindexer转换后产生一个新的输出列convertedlabel, 然后filter算子对convertedlabel设置的条件是小于1。ok,至此整个计算逻辑已经完了,但是在实际执行的时候,对于convertedlabel的过滤条件会连着stringindexer模型中的map被整合成一个UDF下推到其中一张表的数据读取阶段,这时候如果读到一条rawlabel的值是‘d‘的数据,那么输入这个UDF中就会在map中找不到对应的映射,这时候就会报错。

以上就是这个问题的来龙去脉,我们可以避免这个bug,也就是当stringindexer前面有join这样的算子,那么在stringindexer之后不要对stringindexer的输出列设置过滤条件,可以从业务逻辑上考虑把过滤算子提前到stringindexer之前,直接对原始的列设置等价的过滤条件。

以上是关于spark过滤算子+StringIndexer算子出发的一个逻辑bug的主要内容,如果未能解决你的问题,请参考以下文章

[Spark精进]必须掌握的4个RDD算子之filter算子

spark算子

Spark性能调优-RDD算子调优篇

Spark性能调优-RDD算子调优篇

Spark Transformation和Action算子速查表

Spark Core学习之常用算子(含经典面试题)