pyspark中使用累加器Accumulator统计指标

Posted WOTGL

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了pyspark中使用累加器Accumulator统计指标相关的知识,希望对你有一定的参考价值。

    评价分类模型的性能时需要用到以下四个指标

    最开始使用以下代码计算,发现代码需要跑近一个小时,而且这一个小时都花在这四行代码上

# evaluate model
TP = labelAndPreds.filter(lambda (v, p): (v == 1 and p == 1)).count()
FP = labelAndPreds.filter(lambda (v, p): (v == 0 and p == 1)).count()
TN = labelAndPreds.filter(lambda (v, p): (v == 0 and p == 0)).count()
FN = labelAndPreds.filter(lambda (v, p): (v == 1 and p == 0)).count()

    心想着理论上可以只扫描一遍数据就可以计算出这四个指标。

    一开始在foreach函数中传递一个自定义评估函数,这个函数来统计上面四个指标,然后在函数里再使用全局变量TP,TN等。

    但是程序跑完四个指标都还是0,跟初始化时候的一样。后来查资料,发现pyspark有Accumulator(累加器)可以解决这个问题。

代码如下:

# evaluate model
TP = sc.accumulator(0)  #一开始直接用的TP = 0
FP = sc.accumulator(0) 
TN = sc.accumulator(0)
FN = sc.accumulator(0)
def assess(v, p):
    global TP
    global FP
    global TN
    global FN    
    #print \'tgl\\t\',v,p
    if(v == 1 and p == 1):
        TP += 1
    if(v == 0 and p == 1):
        FP += 1
    if(v == 0 and p == 0):
        TN += 1
    if(v == 1 and p == 0):
        FN += 1
print \'assess model %s\' % time.ctime()
labelAndPreds.foreach(lambda(v,p): assess(v, p))
print "TP=", TP
print "FP=", FP
print "TN=", TN
print "FN=", FN
if (TP.value + FP.value) != 0:
      print "The precision = " + str(TP.value*1.0 / (TP.value+FP.value))
if (TP.value + FN.value) != 0:
      print "The recall = " + str(TP.value*1.0 / (TP.value+FN.value))

 

ps:

    pyspark官方文档

    [http://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=accumulator#pyspark.Accumulator]

以上是关于pyspark中使用累加器Accumulator统计指标的主要内容,如果未能解决你的问题,请参考以下文章

Accumulator

Spark(Accumulator)陷阱及解决办法

Flink的累加器和广播变量广播流分布式缓存

生产常用Spark累加器剖析之二

python pyspark-RevenuePerProductForMonthAccumulator.py

Spark Accumulators