如何在 Java 中的 javaPairRDD 上使用 aggregateByKey?

Posted

技术标签:

【中文标题】如何在 Java 中的 javaPairRDD 上使用 aggregateByKey?【英文标题】:how to use aggregateByKey on javaPairRDD in Java? 【发布时间】:2016-01-04 18:14:26 【问题描述】:

我已经搜索了很多,但我没有找到在 java 代码中执行 aggregateByKey 的示例。

我想查找 JavaPairRDD 中按键减少的行数。

我读到 aggregateByKey 是最好的方法,但我使用的是 Java 而不是 scala,我无法在 Java 中使用它。

请帮忙!!!

例如:

input: [(key1,[name:abc,email:def,address:ghi]),(key1,[name:abc,email:def,address:ghi]),(key2,[name:abc,email:def,address:ghi])]

output: [(key1,[name:abc,email:def,address:ghi, count:2]),(key2,[name:abc,email:def,address:ghi, count:1])]

我想做与我的示例完全相同的操作,我想在输出行中添加一个额外的列,以减少行数。

谢谢!!!

【问题讨论】:

【参考方案1】:

这是我如何在 java 中按键聚合的示例。

JavaPairRDD<String, Row> result = inputDataFrame.javaRDD().mapToPair(new  PairFunction<Row, String, Row>() 
    private static final long serialVersionUID = 1L;
    public Tuple2<String, Row> call(Row tblRow) throws Exception 
        String strID= CommonConstant.BLANKSTRING;
        Object[] newRow = new Object[schemaSize];
        for(String s: matchKey)
        
            if(tblRow.apply(finalSchema.get(s))!=null)
                strID+= tblRow.apply(finalSchema.get(s)).toString().trim().toLowerCase();
                                       
           
        int rowSize=    tblRow.length();
        for (int itr = 0; itr < rowSize; itr++)
        
            if(tblRow.apply(itr)!=null)
            
                newRow[itr] = tblRow.apply(itr);
            
        
        newRow[idIndex]= Utils.generateKey(strID);
        return new Tuple2<String, Row>(strID,RowFactory.create(newRow));
    
).aggregateByKey(RowFactory.create(arr), new Function2<Row,Row,Row>()

    private static final long serialVersionUID = 1L;

    public Row call(Row argRow1, Row argRow2) throws Exception 
        // TODO Auto-generated method stub

        Integer rowThreshold=   dataSchemaHashMap.get(CommonConstant.STR_TEMPThreshold);
        Object[] newRow = new Object[schemaSize];
        int rowSize=    argRow1.length();

        for (int itr = 0; itr < rowSize; itr++)
        
            if(argRow1!=null && argRow2!=null)
            
                if(argRow1.apply(itr)!=null && argRow2.apply(itr)!=null)
                
                    if(itr==rowSize-1)
                        newRow[itr] = Integer.parseInt(argRow1.apply(itr).toString())+Integer.parseInt(argRow2.apply(itr).toString());
                    else
                        newRow[itr] = argRow2.apply(itr);
                    
                
            
        

        return RowFactory.create(newRow);

    

, new Function2<Row,Row,Row>()
    private static final long serialVersionUID = 1L;

    public Row call(Row v1, Row v2) throws Exception 
        // TODO Auto-generated method stub
        return v1;
    
);

JavaRDD<Row> result1 = result.map(new Function<Tuple2<String,Row>, Row>() 
    private static final long serialVersionUID = -5480405270683046298L;
    public Row call(Tuple2<String, Row> rddRow) throws Exception 
        return rddRow._2();
    
);

【讨论】:

【参考方案2】:

数据文件:average.txt

学生姓名、学科、分数

ss,英文,80

ss,数学,60

GG,英文,180

PP,英文,80

PI,英语,80

GG,数学,100

PP,数学,810

PI,数学,800

问题是在 java 8 中使用aggregateByKey spark 转换找到主题平均。

这是一种方法:

    JavaRDD<String> baseRDD = jsc.textFile("average.txt");
    JavaPairRDD<String,Integer> studentRDD = baseRDD.mapToPair( s -> new Tuple2<String,Integer>(s.split(",")[1],Integer.parseInt(s.split(",")[2])));
    JavaPairRDD<String,Avg> avgRDD = studentRDD.aggregateByKey(new Avg(0,0), (v,x) -> new Avg(v.getSum()+x,v.getNum()+1), (v1,v2) -> new Avg(v1.getSum()+v2.getSum(),v1.getNum()+v2.getNum()));

    Map<String,Avg> mapAvg = avgRDD.collectAsMap();

    for(Entry<String,Avg> entry : mapAvg.entrySet())
        System.out.println(entry.getKey()+"::"+entry.getValue().getAvg());
    



import java.io.Serializable;

public class Avg implements Serializable

private static final long serialVersionUID = 1L;

private int sum;
private int num;

public Avg(int sum, int num)
    this.sum = sum;
    this.num = num;


public double getAvg() return (this.sum / this.num);

public int getSum()    return this.sum;    

public int getNum()        return this.num;    

【讨论】:

【参考方案3】:

我不确定您要做什么,但我可以提供一个解决方案来提供您需要的输出。 AggregateByKey 不会做你期望做的事情,它只是 RDD 的一种组合方式,在 DataFrame 上它的作用与你期望的相似。无论如何,下面的代码可以为您提供所需的输出。

JavaPairRDD<String, Iterable<String>> groups = pairs.groupByKey();

JavaPairRDD<Integer, String> counts = groups.mapToPair(new PairFunction<Tuple2<String, Iterable<String>>, Integer, String>()

            public Tuple2<Integer, String> call(Tuple2<String, Iterable<String>> arg0) throws Exception 
                HashMap<String, Integer> counts = new HashMap<String, Integer>();
                Iterator<String> itr = arg0._2.iterator();
                String val = null;
                while(itr.hasNext())
                    val = itr.next();
                    if(counts.get(val) == null)
                        counts.put(val, 1);
                    else
                        counts.put(val, counts.get(val)+1);
                    
                

                return new Tuple2(arg0._1, counts.toString());
            

        );

您可以尝试告诉我。请注意,坦率地说,这不是合并,因为合并不会做这种事情。

【讨论】:

以上是关于如何在 Java 中的 javaPairRDD 上使用 aggregateByKey?的主要内容,如果未能解决你的问题,请参考以下文章

Java Spark 如何将 JavaPairRDD<HashSet<String>, HashMap<String, Double>> 保存到文件中?

通过操作 JavaPairRDD 的值 (Sum) 来转换 JavaPairRDD

如何相交不同的JavaPairRDD

如何将 JavaPairRDD 转换为数据集?

如何在 javapairrdd 中使用 containsAll 和 contains 过滤器

如何将数据集转换为 JavaPairRDD?