spark RDD union 非常慢

Posted

技术标签:

【中文标题】spark RDD union 非常慢【英文标题】:it is very slow for spark RDD union 【发布时间】:2015-01-05 01:28:39 【问题描述】:

我有 2 个 spark RDD,dataRDD 和 newPairDataRDD,它们用于 spark SQL 查询。 当我的应用程序初始化时,dataRDD 将被初始化。一个指定的 hbase 实体中的所有数据都将存储到 dataRDD。

当客户端的 sql 查询到来时,我的 APP 将获取所有新的更新和插入到 newPairDataRDD。 dataRDD 联合 newPairDataRDD 并在 Spark SQL 上下文中注册为表。

我什至在 dataRDD 中发现了 0 条记录,在 newPairDataRDD 中发现了 1 条新插入记录。联合需要 4 秒。太慢了

我认为这是不合理的。任何人都知道如何使它更快?谢谢 简单代码如下

    // Step1: load all data from hbase to dataRDD when initial, this only run once. 
    JavaPairRDD<String, Row>  dataRDD= getAllBaseDataToJavaRDD();
    dataRDD.cache();
    dataRDD.persist(StorageLevel.MEMORY_ONLY());
    logger.info(dataRDD.count());

    // Step2: when spark sql query coming, load latest updated and inserted data from db to newPairDataRDD

    JavaPairRDD<String, Row> newPairDataRDD = getUpdateOrInstertBaseDataToJavaRDD();
    // Step3: if count>0 do union and reduce

       if(newPairDataRDD.count() > 0) 

        JavaPairRDD<String, Row> unionedRDD =dataRDD.union(newPairDataRDD);

    // if data was updated in DB, need to delete the old version from the dataRDD.

        dataRDD = unionedRDD.reduceByKey(
            new Function2<Row, Row, Row>() 
            // @Override
            public Row call(Row r1, Row r2) 
             return r2;
             
            );
    
//step4: register the dataRDD
JavaSchemaRDD schemaRDD = sqlContext.applySchema(dataRDD..values(), schema);

//step5: execute sql query
retRDD = sqlContext.sql(sql);
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect();

从 spark web ui,我可以看到下面。显然它需要 4s 来联合

已完成阶段 (8)

StageId 描述 Submitted Duration Tasks: Succeeded/Total Input Shuffle Read Shuffle Write

6 收集于 SparkPlan.scala:85+details 1/4/2015 8:17 2 s 8-Aug 156.0 B

7 union at SparkSqlQueryForMarsNew.java:389+details 1/4/2015 8:17 4 s 8-Aug 64.0 B 156.0 B

【问题讨论】:

【参考方案1】:

实现您想要的更有效的方法是使用cogroup()flatMapValues(),使用联合除了向dataRDD 添加新分区外几乎没有什么作用,这意味着所有数据必须在reduceByKey()cogroup()flatMapValues() 将导致仅重新分区 newPairDataRDD

JavaPairRDD<String, Tuple2<List<Row>, List<Row>>> unionedRDD = dataRDD.cogroup(newPairDataRDD);
JavaPairRDD<String, Row> updated = unionedRDD.flatMapValues(
    new Function<Tuple2<List<Row>, List<Row>>, Iterable<Row>>() 
        public Iterable<Row> call(Tuple2<List<Row>, List<Row>> grouped) 
            if (grouped._2.nonEmpty()) 
                return grouped._2;
             else 
                return grouped._1;
            
        
    );

或者在 Scala 中

val unioned = dataRDD.cogroup(newPairDataRDD)
val updated = unioned.flatMapValues  case (oldVals, newVals) =>
    if (newVals.nonEmpty) newVals else oldVals

免责声明,我不习惯用 Java 编写 spark!以上如有错误请高人指正!

【讨论】:

【参考方案2】:

尝试重新分区您的 RDD:

JavaPairRDD unionedRDD =dataRDD.repartition(sc.defaultParallelism * 3).union(newPairDataRDD.repartition(sc.defaultParallelism * 3));

【讨论】:

以上是关于spark RDD union 非常慢的主要内容,如果未能解决你的问题,请参考以下文章

spark 教程三 spark Map filter flatMap union distinct intersection操作

Spark中将一个RDD严格划分为多个RDD

Spark RDD编程 双Value类型交互

Spark RDD编程 双Value类型交互

Spark算子

Spark-Core RDD转换算子-双Value型交互