如何在 PySpark 中的 RDD 中的列中查找标准差
Posted
技术标签:
【中文标题】如何在 PySpark 中的 RDD 中的列中查找标准差【英文标题】:How to find standard deviation in a column in a RDD in PySpark 【发布时间】:2015-12-03 08:24:15 【问题描述】:我有一个RDD
,我想在RDD
的列之一的数据中找到standard deviation
。我当前的代码是:
def extract(line):
# line[11] is the column in which I want to find standard deviation
return (line[1],line[2],line[5],line[6],line[8],line[10],line[11])
inputfile1 = sc.textFile('file1.csv').zipWithIndex().filter(lambda (line,rownum): rownum>=0).map(lambda (line, rownum): line)
data = (inputfile1
.map(lambda line: line.split(";"))
.filter(lambda line: len(line) >1 )
.map(extract)) # Map to tuples
data
是 RDD
,其中我的最后一列(第 6 列)具有我想要在其中找到 standard deviation
的值。我怎样才能找到它?
更新:我当前的代码:
def extract(line):
# last column is numeric but in string format
return ((float(line[-1])))
input = sc.textFile('file1.csv').zipWithIndex().filter(lambda (line,rownum): rownum>=0).map(lambda (line, rownum): line)
Data = (input
.map(lambda line: line.split(";"))
.filter(lambda line: len(line) >1 )
.map(extract)) # Map to tuples
row = Row("val")
df = Data.map(row).toDF()
df.map(lambda r: r.x).stdev()
当我运行它时,我得到错误:AttributeError: x
df.map(lambda r: r.x).stdev()
。 注意:我的数据中有一些值为负数
【问题讨论】:
【参考方案1】:在 Spark
转换为RDD并使用stdev
方法:
from pyspark.sql import Row
import numpy as np
row = Row("x")
df = sc.parallelize([row(float(x)) for x in np.random.randn(100)]).toDF()
df.rdd.map(lambda r: r.x).stdev()
使用以下公式(here is Scala 版本):
from pyspark.sql.functions import avg, pow, col, sqrt, lit
sd = sqrt(
avg(col("x") * col("x")) - pow(avg(col("x")), lit(2))).alias("stdev")
df.select(sd)
Hive UDF:
df.registerTempTable("df")
sqlContext.sql("SELECT stddev(x) AS sd FROM df")
Spark 1.6.0 引入了stddev
、stddev_samp
和stddev_pop
函数。
【讨论】:
以上是关于如何在 PySpark 中的 RDD 中的列中查找标准差的主要内容,如果未能解决你的问题,请参考以下文章
pyspark中的RDD到DataFrame(来自rdd的第一个元素的列)
使用 map/filter 在 Pyspark 中的 RDD 中查找最大元素
如何在不使用collect()和for循环的情况下将一个(IP地址)的特定部分与RDD python pyspark中另一列中的其他IP地址进行比较