Spark DataFrame 聚合

Posted

技术标签:

【中文标题】Spark DataFrame 聚合【英文标题】:Spark DataFrame aggregation 【发布时间】:2015-12-25 11:23:17 【问题描述】:

我有以下代码:

public class IPCCodes 

public static class IPCCount implements Serializable 
    public IPCCount(long permid, int year, int count, String ipc) 
        this.permid = permid;
        this.year = year;
        this.count = count;
        this.ipc = ipc;
    

    public long permid;
    public int year;
    public int count;
    public String ipc;


public static void main(String[] args) 
    SparkConf sparkConf = new SparkConf().setAppName("IPC codes");
    JavaSparkContext sc = new JavaSparkContext(sparkConf);
    HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());

    DataFrame df = sqlContext.sql("SELECT * FROM test.some_table WHERE year>2004");
    JavaRDD<Row> rdd = df.javaRDD();
    JavaRDD<IPCCount> map = rdd.flatMap(new FlatMapFunction<Row, IPCCount>() 
        @Override
        public Iterable<IPCCount> call(Row row) throws Exception 
            List<IPCCount> counts = new ArrayList<>();
            try 
                String codes = row.getString(7);
                for (String s : codes.split(",")) 
                    if(s.length()>4)
                        counts.add(new IPCCount(row.getLong(4), row.getInt(6), 1, s.substring(0, 4)));
                    
                
             catch (NumberFormatException e) 
                System.out.println(e.getMessage());
            
            return counts;
        
    );

我从 Hive 表创建了 DataFrame 并应用 flatMap 函数来拆分 ipc 代码(此字段是 hive 表中的字符串数组),之后我需要聚合代码,其中包含 permid 和 year 的计数,结果表应该是 permid/year/ IPC/计数。

最有效的方法是什么?

【问题讨论】:

一开始我不会flatMap。不离开 Spark SQL 应该很容易解决。但一些细节(如模式和示例输入)会很有用。 表模式是 (patnum string,pan string,assignee string,perm_assignee string,permid bigint,weight int,year int,ipc string,manualchem string,manualelec string,manualeng string,pubdate string,appdate string ,expandclass string) 数据样本:('AT83028261','1984170449','Some Company','SOME COMPANY CO',4295903113,64,2009, 'A61K000718,A61K000716,A61K000821','','','', '','','') 【参考方案1】:

如果您想将DataFrame 作为输出,则没有充分的理由使用RDDflatMap。据我所知,一切都可以使用基本的 Spark SQL 函数轻松处理。使用 Scala:

import org.apache.spark.sql.functions.col, explode, length, split, substring

val transformed = df
  .select(col("permid"), col("year"),
    // Split ipc and explode into multiple rows
    explode(split(col("ipc"), ",")).alias("code")) 
  .where(length(col("code")).gt(4)) // filter
  .withColumn("code", substring(col("code"), 0, 4))

transformed.groupBy(col("permid"), col("year"), col("code")).count

【讨论】:

这非常适合我!谢谢!它还鼓励我在项目中使用 Scala 而不是 Java。

以上是关于Spark DataFrame 聚合的主要内容,如果未能解决你的问题,请参考以下文章

Spark DataFrame 聚合

Spark DataFrame:计算行均值(或任何聚合操作)

Spark SQL读写方法

如何使用 python 在 Spark 中转置 DataFrame 而不进行聚合

Spark DataFrame 按键将列值聚合到 List 中

Spark Scala 聚合组 Dataframe