Pandas .apply Loc 到 Pyspark

Posted

技术标签:

【中文标题】Pandas .apply Loc 到 Pyspark【英文标题】:Pandas .apply Loc to Pyspark 【发布时间】:2017-10-25 14:56:17 【问题描述】:

我对 Pandas 做了一些工作。现在我需要在 Pyspark 上做同样的事情,这个任务似乎很棘手!

这是我的代码:

import pandas as pd
def av_years(df,start,end):
return df.loc[df['year'].isin(range(start,end+1))]['B'].mean() 

然后我创建了一个数据框:

raw_data = 
    'year': [2010,2011,2012,2013],
    'B': [2,3,5,4],'startyear':[2012,2010,2011,2011],'endyear':
     [2012,2013,2013,2013]
     df = pd.DataFrame(raw_data)
     df

这是我的结果:

    B   endyear startyear   year
    0   2   2012    2012    2010
    1   3   2013    2010    2011
    2   5   2013    2011    2012
    3   4   2013    2011    2013

最后一步是创建一个从数据框派生的新列:

  df['av'] = df.apply(lambda row: av_years(df,row['startyear'], 
  row['endyear']), axis=1)
  df

我的最终结果是:

 B  endyear startyear   year    av
 0  2   2012    2012    2010    5.0
 1  3   2013    2010    2011    3.5
 2  5   2013    2011    2012    4.0
 3  4   2013    2011    2013    4.0

我需要使用 PySpark 获取同一张表!一些建议?

谢谢

【问题讨论】:

【参考方案1】:

对于数据帧中的每一行,您都要遍历数据帧的所有行(复杂度 n²)。这相当于进行自联接。 在对验证条件r2.year.isin(range(r1.startyear, r1.endyear + 1)) 的行对(r1, r2) 进行过滤后,您可以按startyear, endyear, year 分组以计算mean(B)

注意:在 Spark 中,您可以一步完成 joinfilter

首先让我们从您的 pandas df 创建数据框:

data = spark.createDataFrame(df)

对于自连接,我们将使用别名以避免与列名冲突:

import pyspark.sql.functions as psf
data_join = data.select("startyear", "endyear", "year").alias("left")\
    .join(
        data.select("B", "year").alias("right"), 
        psf.col("right.year").between(psf.col("left.startyear"), psf.col("left.endyear")))\
    .drop("right.year")

    +---------+-------+----+---+
    |startyear|endyear|year|  B|
    +---------+-------+----+---+
    |     2010|   2013|2011|  2|
    |     2010|   2013|2011|  3|
    |     2012|   2012|2010|  5|
    |     2010|   2013|2011|  5|
    |     2010|   2013|2011|  4|
    |     2011|   2013|2012|  3|
    |     2011|   2013|2013|  3|
    |     2011|   2013|2012|  5|
    |     2011|   2013|2012|  4|
    |     2011|   2013|2013|  5|
    |     2011|   2013|2013|  4|
    +---------+-------+----+---+

现在是groupBy

data_join\
    .groupBy("startyear", "endyear", "year")\
    .agg(psf.avg("B").alias("av")).show()

    +---------+-------+----+---+
    |startyear|endyear|year| av|
    +---------+-------+----+---+
    |     2011|   2013|2013|4.0|
    |     2010|   2013|2011|3.5|
    |     2012|   2012|2010|5.0|
    |     2011|   2013|2012|4.0|
    +---------+-------+----+---+

【讨论】:

谢谢玛丽! 没问题 Lizou :) 如果确实解决了,请不要忘记将您的问题标记为已解决【参考方案2】:

这是另一种方法:

raw_data=sc.parallelize(['2\t2012\t2012\t2010\t5.0', \
                        '3\t2013\t2010\t2011\t3.5', \
                        '5\t2013\t2011\t2012\t4.0', \
                        '4\t2013\t2011\t2013\t4.0']).map(lambda x: x.split('\t'))\
                                                      .map(lambda x: (int(x[0]),int(x[1])\
                                                      ,int(x[2]),int(x[3]),float(x[4])))

raw_data_df=sqlContext.createDataFrame(rawdata,['B','endyear','startyear','year','av'])

raw_data_df.show()

+---+-------+---------+----+---+
|  B|endyear|startyear|year| av|
+---+-------+---------+----+---+
|  2|   2012|     2010|2010|5.0|
|  3|   2013|     2010|2011|3.5|
|  5|   2013|     2011|2012|4.0|
|  4|   2013|     2011|2013|4.0|
+---+-------+---------+----+---+

假设您有一个 csv 文件中的数据:

这就是它在文件名中的样子raw_data.csv

2,2012,2010,2010,5.0
3,2013,2010,2011,3.5
5,2013,2011,2012,4.0
4,2013,2011,2013,4.0
3,2008,2011,2011,4.0
5,2013,2019,2012,4.0
4,2005,2012,2016,4.0
4,2013,2013,2012,4.0
8,2018,2014,2018,4.0
5,2013,2014,2012,4.0

导入必要的模块:

from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, Row
import pyspark.sql.functions as func
from pyspark.sql import SparkSession

定义结构,读取文件:

rawdata_path = 'raw_data.csv'

rawdata_struct = artistdata_struct = StructType([StructField('B', IntegerType()), \
                                                 StructField('endyear', IntegerType()), \
                                                 StructField('startyear',IntegerType()), \
                                                 StructField('year',IntegerType()),\
                                                 StructField('av',DoubleType())])

rawdata= spark.read.csv(rawdata_path, sep = ',', schema = rawdata_struct)

rawdata.show()

+---+-------+---------+----+---+
|  B|endyear|startyear|year| av|
+---+-------+---------+----+---+
|  2|   2012|     2010|2010|5.0|
|  3|   2013|     2010|2011|3.5|
|  5|   2013|     2011|2012|4.0|
|  4|   2013|     2011|2013|4.0|
|  3|   2008|     2011|2011|4.0|
|  5|   2013|     2019|2012|4.0|
|  4|   2005|     2012|2016|4.0|
|  4|   2013|     2013|2012|4.0|
|  8|   2018|     2014|2018|4.0|
|  5|   2013|     2014|2012|4.0|
+---+-------+---------+----+---+

有关 Spark 数据类型的更多信息,请查看此链接

https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/sql/types/package-summary.html

【讨论】:

我创建了小数据框以使其易于理解!在现实中,我使用一个巨大的数据框。那么当我有一个非常大的数据框时,我如何应用你的方法呢? 谢谢!我在上面修改了我的答案。我假设您的数据位于 .csv 文件中。您可以修改分隔符以匹配原始文件中的内容。

以上是关于Pandas .apply Loc 到 Pyspark的主要内容,如果未能解决你的问题,请参考以下文章

pandas.DataFrame.loc好慢,怎么遍历访问DataFrame比较快

pandas 用 .loc[,]=value 筛选并原地赋值回原来的 DataFrame

pandas dataframe 过滤——apply最灵活!!!

pandas.DataFrame.where和mask 解读

6.Pandas怎样新增数据列

pandas基本应用记录