Pyspark 分组和结构化数据

Posted

技术标签:

【中文标题】Pyspark 分组和结构化数据【英文标题】:Pyspark grouping and structuring data 【发布时间】:2020-10-08 20:40:35 【问题描述】:

我在 spark 2.4.5 中有以下数据:

data = [
    ('1234', '203957', '2010', 'London', 'CHEM'),
    ('1234', '203957', '2010', 'London', 'BIOL'),
    ('1234', '288400', '2012', 'Berlin', 'MATH'),
    ('1234', '288400', '2012', 'Berlin', 'CHEM'),
]
d = spark.createDataFrame(data, ['auid', 'eid', 'year', 'city', 'subject'])
d.show()

+----+------+----+------+-------+
|auid|   eid|year|  city|subject|
+----+------+----+------+-------+
|1234|203957|2010|London|   CHEM|
|1234|203957|2010|London|   BIOL|
|1234|288400|2012|Berlin|   MATH|
|1234|288400|2012|Berlin|   CHEM|
+----+------+----+------+-------+

我需要从中获取按auid分组的df,并按城市的时间顺序排列,即另一列中的London, Berlin[[Berlin, 2010], [London, 2012]],另外我需要按主题的降序频率列排序:[CHEM,2], [BIOL, 1], [MATH, 1]。或者就像[CHEM, BIOL, MATH]

我试过了:

d.groupBy('auid').agg(func.collect_set(func.struct('city', 'year')).alias('city_set')).show(10, False)

这导致了:

+----+--------------------------------+
|auid|city_set                        |
+----+--------------------------------+
|1234|[[Berlin, 2012], [London, 2010]]|
+----+--------------------------------+

我在这里卡住了,需要帮助。 (不胜感激对city_set 中的值进行排序的提示)

【问题讨论】:

【参考方案1】:

您可以对struct('year', 'city') 进行collect_list 的聚合,对数组进行排序,然后使用transform 函数调整字段的顺序。与主题类似,创建具有两个字段的结构数组:cntsubject,对结构数组进行排序/降序,然后仅检索 subject 字段:

df_new = d.groupBy('auid').agg(
      func.sort_array(func.collect_set(func.struct('year', 'city'))).alias('city_set'),
      func.collect_list('subject').alias('subjects')
    ).withColumn('city_set', func.expr("transform(city_set, x -> (x.city as city, x.year as year))")) \
    .withColumn('subjects', func.expr("""
        sort_array(
          transform(array_distinct(subjects), x -> (size(filter(subjects, y -> y=x)) as cnt, x as subject)),
          False
        ).subject
      """))

df_new.show(truncate=False) 
+----+--------------------------------+------------------+
|auid|city_set                        |subjects          |
+----+--------------------------------+------------------+
|1234|[[London, 2010], [Berlin, 2012]]|[CHEM, MATH, BIOL]|
+----+--------------------------------+------------------+

编辑:有几种方法可以删除city_set数组中重复的城市条目:

    使用Window函数将每个城市的year调整为min(year),然后重复上述过程。

    d = d.withColumn('year', func.min('year').over(Window.partitionBy('auid','city')))
    

    使用aggregate 函数从city_set 数组中删除重复项:

    df_new = d.groupBy('auid').agg(
        func.sort_array(func.collect_set(func.struct('year', 'city'))).alias('city_set')     
    ).withColumn("city_set", func.expr("""         
        aggregate(        
          /* expr: take slice of city_set array from the 2nd element to the last */
          slice(city_set,2,size(city_set)-1),           
          /* start: initialize `acc` as an array with a single entry city_set[0].city */
          array(city_set[0].city),
          /* merge: iterate through `expr`, if x.city exists in `acc`, keep as-is
           *        , otherwise add an entry to `acc` using concat function */
          (acc,x) -> IF(array_contains(acc,x.city), acc, concat(acc, array(x.city)))                     
        )                              
    """))
    

注意:使用 Spark 3.0+ 会更容易:

df_new = d.groupBy('auid').agg(func.expr("array_sort(collect_set((city,year)), (l,r) -> int(l.year-r.year)) as city_set"))

【讨论】:

太好了,谢谢!还不知道,如何得到一个只有城市的列,按时间顺序排序,即[伦敦,柏林]? @ande ,如果您只需要结构数组中的一个字段,只需跳过转换函数并使用 dot 表示法:d.groupBy('auid').agg(func.sort_array(func.collect_set(func.struct('year', 'city'))).city.alias('city_set')),另请参阅我们对subjects 专栏。 顺便说一句。一个更健壮的方法是使用.getItem('city')['city'],以防字段名称包含特殊字符,如空格、点等。 再次,非常感谢!不过,我确实看到了我的示例数据的一些限制。可能有两个不同的文件附属于伦敦,所以在这种情况下,我看到 [London, London, Berlin],而我想看到 [London, Berlin]。你认为这可能吗? @ande,不客气。对于您刚才提到的问题,我认为一个简单的解决方法是使用 Window 函数创建一个临时列(或只是覆盖现有的year 列),例如:d = d.withColumn('year_min', func.min('year').over(Window.partitionBy('auid','city'))),然后使用 year_min 替换代码中的年份。

以上是关于Pyspark 分组和结构化数据的主要内容,如果未能解决你的问题,请参考以下文章

使用 Pyspark 从结构化流数据帧构建 Spark ML 管道模型

kafka 到 pyspark 结构化流,将 json 解析为数据帧

使用 pyspark 处理结构数据类型

pyspark - 分组和计算数据

使 Spark 结构化流中的 JSON 可以在 python (pyspark) 中作为没有 RDD 的数据帧访问

合并列是变量结构的数据框 - Pyspark