pyspark - 分组和计算数据
Posted
技术标签:
【中文标题】pyspark - 分组和计算数据【英文标题】:pyspark - Grouping and calculating data 【发布时间】:2018-03-05 12:43:52 【问题描述】:我有以下 csv 文件。
Index,Arrival_Time,Creation_Time,x,y,z,User,Model,Device,gt
0,1424696633908,1424696631913248572,-5.958191,0.6880646,8.135345,a,nexus4,nexus4_1,stand
1,1424696633909,1424696631918283972,-5.95224,0.6702118,8.136536,a,nexus4,nexus4_1,stand
2,1424696633918,1424696631923288855,-5.9950867,0.6535491999999999,8.204376,a,nexus4,nexus4_1,stand
3,1424696633919,1424696631928385290,-5.9427185,0.6761626999999999,8.128204,a,nexus4,nexus4_1,stand
我必须创建一个 RDD,其中 USER MODEL 和 GT 是 PRIMARY KEY,我不知道我是否必须将它们用作元组。
然后,当我有主键字段时,我必须从 'x'、'y' 和 'z' 计算 AVG、MAX 和 MIN。
这是一个输出:
User,Model,gt,media(x,y,z),desviacion(x,y,z),max(x,y,z),min(x,y,z)
a, nexus4,stand,-3.0,0.7,8.2,2.8,0.14,0.0,-1.0,0.8,8.2,-5.0,0.6,8.2
关于如何对它们进行分组以及例如从“x”获取媒体值的任何想法
使用我当前的代码,我得到以下内容。
# Data loading
lectura = sc.textFile("Phones_accelerometer.csv")
datos = lectura.map(lambda x: ((x.split(",")[6], x.split(",")[7], x.split(",")[9]),(x.split(",")[3], x.split(",")[4], x.split(",")[5])))
sumCount = datos.combineByKey(lambda value: (value, 1), lambda x, value: (x[0] + value, x[1] + 1), lambda x, y: (x[0] + y[0], x[1] + y[1]))
我的元组示例:
[(('a', 'nexus4', 'stand'), ('-5.958191', '0.6880646', '8.135345'))]
【问题讨论】:
为什么是 RDD 而不是数据帧?这两者都可以轻松完成,我更喜欢数据框 我真的不知道,我认为使用 DF 更容易生成我的期望输出,但想法是关于使用 RDD。 【参考方案1】:如果您在问题中给出的文件中有 csv 数据,那么您可以使用 sqlContext
将其读取为 dataframe
并将适当的类型转换为
df = sqlContext.read.format("com.databricks.spark.csv").option("header", True).load("path to csv file")
import pyspark.sql.functions as F
import pyspark.sql.types as T
df = df.select(F.col('User'), F.col('Model'), F.col('gt'), F.col('x').cast('float'), F.col('y').cast('float'), F.col('z').cast('float'))
我只选择了应该给你的主键和必要的列
+----+------+-----+----------+---------+--------+
|User|Model |gt |x |y |z |
+----+------+-----+----------+---------+--------+
|a |nexus4|stand|-5.958191 |0.6880646|8.135345|
|a |nexus4|stand|-5.95224 |0.6702118|8.136536|
|a |nexus4|stand|-5.9950867|0.6535492|8.204376|
|a |nexus4|stand|-5.9427185|0.6761627|8.128204|
+----+------+-----+----------+---------+--------+
您的所有要求:中值、偏差、最大值和最小值在按主键分组时取决于 x
、y
和 z
的列表:User, Model and gt
.
所以你需要groupBy
和collect_list
内置函数 和一个udf
函数来计算你的所有需求。最后一步是将它们分隔在下面给出的不同列中
from math import sqrt
def calculation(array):
num_items = len(array)
print num_items, sum(array)
mean = sum(array) / num_items
differences = [x - mean for x in array]
sq_differences = [d ** 2 for d in differences]
ssd = sum(sq_differences)
variance = ssd / (num_items - 1)
sd = sqrt(variance)
return [mean, sd, max(array), min(array)]
calcUdf = F.udf(calculation, T.ArrayType(T.FloatType()))
df.groupBy('User', 'Model', 'gt')\
.agg(calcUdf(F.collect_list(F.col('x'))).alias('x'), calcUdf(F.collect_list(F.col('y'))).alias('y'), calcUdf(F.collect_list(F.col('z'))).alias('z'))\
.select(F.col('User'), F.col('Model'), F.col('gt'), F.col('x')[0].alias('median_x'), F.col('y')[0].alias('median_y'), F.col('z')[0].alias('median_z'), F.col('x')[1].alias('deviation_x'), F.col('y')[1].alias('deviation_y'), F.col('z')[1].alias('deviation_z'), F.col('x')[2].alias('max_x'), F.col('y')[2].alias('max_y'), F.col('z')[2].alias('max_z'), F.col('x')[3].alias('min_x'), F.col('y')[3].alias('min_y'), F.col('z')[3].alias('min_z'))\
.show(truncate=False)
所以最后你应该有
+----+------+-----+---------+---------+--------+-----------+-----------+-----------+----------+---------+--------+----------+---------+--------+
|User|Model |gt |median_x |median_y |median_z|deviation_x|deviation_y|deviation_z|max_x |max_y |max_z |min_x |min_y |min_z |
+----+------+-----+---------+---------+--------+-----------+-----------+-----------+----------+---------+--------+----------+---------+--------+
|a |nexus4|stand|-5.962059|0.6719971|8.151115|0.022922019|0.01436464 |0.0356973 |-5.9427185|0.6880646|8.204376|-5.9950867|0.6535492|8.128204|
+----+------+-----+---------+---------+--------+-----------+-----------+-----------+----------+---------+--------+----------+---------+--------+
希望回答对你有帮助。
【讨论】:
【参考方案2】:您必须使用 groupByKey
来获得中位数。虽然performance reasons 通常不是首选,但很难并行化查找数字列表的中值。计算中位数的逻辑需要整个数字列表。 groupByKey
是需要同时处理一个键的所有值时使用的聚合方法
此外,如 cmets 中所述,使用 Spark DataFrames 会更轻松地完成此任务。
【讨论】:
以上是关于pyspark - 分组和计算数据的主要内容,如果未能解决你的问题,请参考以下文章
在 Pyspark 中,如何在 partitionBy 和 orderBy 之后进行分组?