如何在 PySpark 中的 RDD 中的列中查找标准差

Posted

技术标签:

【中文标题】如何在 PySpark 中的 RDD 中的列中查找标准差【英文标题】:How to find standard deviation in a column in a RDD in PySpark 【发布时间】:2015-12-03 08:24:15 【问题描述】:

我有一个RDD,我想在RDD 的列之一的数据中找到standard deviation。我当前的代码是:

def extract(line):
    # line[11] is the column in which I want to find standard deviation
    return (line[1],line[2],line[5],line[6],line[8],line[10],line[11])

inputfile1 = sc.textFile('file1.csv').zipWithIndex().filter(lambda (line,rownum): rownum>=0).map(lambda (line, rownum): line)


data = (inputfile1
    .map(lambda line: line.split(";"))
    .filter(lambda line: len(line) >1 )
    .map(extract)) # Map to tuples

dataRDD,其中我的最后一列(第 6 列)具有我想要在其中找到 standard deviation 的值。我怎样才能找到它?

更新:我当前的代码:

def extract(line):
    # last column is numeric but in string format
    return ((float(line[-1])))


input = sc.textFile('file1.csv').zipWithIndex().filter(lambda (line,rownum): rownum>=0).map(lambda (line, rownum): line)

Data = (input
    .map(lambda line: line.split(";"))
    .filter(lambda line: len(line) >1 )
    .map(extract)) # Map to tuples

 row = Row("val") 
 df = Data.map(row).toDF()
 df.map(lambda r: r.x).stdev()

当我运行它时,我得到错误:AttributeError: xdf.map(lambda r: r.x).stdev()注意:我的数据中有一些值为负数

【问题讨论】:

【参考方案1】:

在 Spark

转换为RDD并使用stdev方法:

from pyspark.sql import Row
import numpy as np

row = Row("x")

df = sc.parallelize([row(float(x)) for x in np.random.randn(100)]).toDF()
df.rdd.map(lambda r: r.x).stdev()

使用以下公式(here is Scala 版本):

from pyspark.sql.functions import avg, pow, col, sqrt, lit

sd = sqrt(
   avg(col("x") * col("x")) - pow(avg(col("x")), lit(2))).alias("stdev")

df.select(sd)

Hive UDF:

df.registerTempTable("df")

sqlContext.sql("SELECT stddev(x) AS sd FROM df")

Spark 1.6.0 引入了stddevstddev_sampstddev_pop 函数。

【讨论】:

以上是关于如何在 PySpark 中的 RDD 中的列中查找标准差的主要内容,如果未能解决你的问题,请参考以下文章

pyspark中的RDD到DataFrame(来自rdd的第一个元素的列)

PySpark:将 RDD 转换为数据框中的列

使用 map/filter 在 Pyspark 中的 RDD 中查找最大元素

如何在不使用collect()和for循环的情况下将一个(IP地址)的特定部分与RDD python pyspark中另一列中的其他IP地址进行比较

如何更改pyspark中的列元数据?

遍历 pyspark 数据框中的列,而不为单个列创建不同的数据框