Java spark使用reduceByKey避免嵌套列表将对象连接到一个列表中

Posted

技术标签:

【中文标题】Java spark使用reduceByKey避免嵌套列表将对象连接到一个列表中【英文标题】:Java spark concatenate object into one list using reduceByKey aviod nested list 【发布时间】:2021-06-02 05:27:17 【问题描述】:

我有一个带有 (key,value) 对的 java spark rdd 作为 (String, ArrayList)。 ArrayList 是一个 Object 数组(可以是 int、double、string 或 array 或任何其他类型)。

例如输入如下:

key1, [R1_Entry_1, R1_Entry_2, ..., R1_Entry_n]
key1, [R2_Entry_1, R2_Entry_2, ..., R2_Entry_n]
key1, [R3_Entry_1, R3_Entry_2, ..., R3_Entry_n]
key2, [R4_Entry_1, R4_Entry_2, ..., R4_Entry_n]
... 
keyJ, [RK_Entry_1, RK_Entry_2, ..., RK_Entry_n]
... 
keyX, [RM_Entry_1, RM_Entry_2, ..., RM_Entry_n]

那么RI_entry_J(行 I 和条目 J)是一个 java.lang.Object,可以是 int、double、string、ArrayList 或任何其他类型。

我使用了 JavaRDD.reduceByKey() 来减少进入新列表的条目。我想要的输出应该是(假设key1 只有三个对应的行(前三行:R1、R2、R3)。)

key1, [[R1_Entry_1, R2_Entry_1,R3_Entry_1], [R1_Entry_2, R2_Entry_2,R3_Entry_2], ..., [R1_Entry_n, R2_Entry_n,R3_Entry_n]]
... 
keyJ, [[RK_Entry_1, R*_Entry_1, ...], [RK_Entry_2, R*_Entry_2, ...], ..., [RK_Entry_n, R*_Entry_n, ...]]
... 

我的代码如下:

JavaPairRDD<String, ArrayList> AdjJavaRDD =  JavaRDD.reduceByKey(new Function2<ArrayList, ArrayList, ArrayList>() 
@Override
public ArrayList call(ArrayList v1, ArrayList v2) throws Exception 

    int v1Len = v1.size();
    int v2Len = v2.size();
    if (v1Len != v2Len) 
        System.out.println(" \n The input size is incorrect. Please check! \n  ");
        System.exit(0);
    

    List<Object> obj = new ArrayList<Object>(v1Len);
    for (int i =0; i < v1Len; i++)
    
        List<Object> obj_i = new ArrayList<>();
        Object v1i = v1.get(i);
        Object v2i = v2.get(i);
        obj_i.add(v1i);
        obj_i.add(v2i);
        obj.add(i, obj_i);
    
    return new ArrayList(obj);

);

我得到如下结果: key1, [[[R1_Entry_1, R2_Entry_1],R3_Entry_1], [[R1_Entry_2, R2_Entry_2],R3_Entry_2], ..., [[R1_Entry_n, R2_Entry_n],R3_Entry_n]]

即条目嵌套在列表中,而不是写入没有嵌套的列表。

例如,如果Entry_1 是整数,并且KeyJ 有五个对应的行,五个Entry_11, 2, 3,4, 5KeyJ 使用我的代码的结果是 KeyJ, [[[[[1,2],3], 4], 5], ...](在实际计算中,整数的顺序是随机的)。 然而,我想要的是 KeyJ, [[1, 2, 3, 4, 5], ...]

有什么想法可以避免在 Java 中的 Spark 代码中出现这种嵌套列表吗?

【问题讨论】:

上面的代码可能根本不工作,我正在努力解决。 【参考方案1】:

我有办法。上述代码不起作用的原因是Object v1i = v1.get(i); 将返回一个特定对象,例如v1i 首次调用时为 String(或任何其他数据类型)。但是,之后调用时,v1i 变成了ArrayList,这就是问题中的代码返回嵌套列表的原因。

更新后的解决方案如下:

  JavaPairRDD<Tuple2, ArrayList> JavaRDDColumns = adjJavaRDD.reduceByKey(new Function2<ArrayList, ArrayList, ArrayList>() 
            @Override
            public ArrayList call(ArrayList v1, ArrayList v2) throws Exception 
                int v1Len = v1.size();
                int v2Len = v2.size();

                Object[] objArr = new Object[v1Len];
                for (int i = 0; i < v1Len; i++) 
                    ArrayList<Object> obj_i = new ArrayList<Object>();
                    Object obj1i = v1.get(i);
                    Object obj2i = v2.get(i);
                    List<Object> obj1Arr = new ArrayList<>();
                    List<Object> obj2Arr = new ArrayList<>();
                    if (obj1i instanceof List<?>) 
                        obj1Arr = (List) obj1i;
                     else 
                        obj1Arr = Arrays.asList(obj1i);  // not a Arraylist, convert to a Arraylist
                    

                    if (obj2i instanceof List<?>) 
                        obj2Arr = (List) obj2i;
                     else 
                        obj2Arr = Arrays.asList(obj2i);
                    
                    List<Object> combineList = ListUtils.union(obj1Arr, obj2Arr);
                    objArr[i] = combineList;
                
                ArrayList<Object> obj = new ArrayList<>(Arrays.asList(objArr));
                return obj;
            
        );

【讨论】:

以上是关于Java spark使用reduceByKey避免嵌套列表将对象连接到一个列表中的主要内容,如果未能解决你的问题,请参考以下文章

Spark - groupByKey over reduceByKey 的用例是啥

Apache Spark 转换:groupByKey vs reduceByKey vs aggregateByKey

groupByKey,reduceByKey,sortByKey算子-Java&Python版Spark

Spark入门--Spark的reduce和reduceByKey

Spark中的treeReduce与reduceByKey

在Spark中关于groupByKey与reduceByKey的区别