Spark没有将负载均匀地分配给任务

Posted

技术标签:

【中文标题】Spark没有将负载均匀地分配给任务【英文标题】:Spark not distributing load to tasks evenly 【发布时间】:2015-12-02 16:49:27 【问题描述】:

RDD.saveAsTextFile 最后阶段很慢。我怀疑记录没有均匀分布在分区和任务中的问题。有什么办法可以强制吗?

   public static JavaRDD<String> getJsonUserIdVideoIdRDD(JavaRDD<Rating> cachedRating,
                                                      JavaPairRDD<Integer, Integer> userIdClusterId,
                                                      int numPartitions, String outDir)
    /*
     convert the JavaRDD<Rating>  to JavaPairRDD<Integer,DmRating>
     */
    JavaPairRDD<Integer,DmRating> userIdDmRating = cachedRating.mapToPair(new PairFunction<Rating, Integer, DmRating>() 
        public Tuple2<Integer, DmRating> call(Rating dmRating) throws Exception 
            return new Tuple2<>(dmRating.user(), (DmRating)dmRating);
        
    );

    /*
    join this RDD with userIdClusterID RDD by key
     */
    JavaPairRDD<Integer, Tuple2<Integer, DmRating>> userId_T_clusterIdDmRating = userIdClusterId.join(userIdDmRating, numPartitions);

    // extract the clusterId to videoId map
    JavaPairRDD<Integer, Integer> clusterIdVideoId =  userId_T_clusterIdDmRating.mapToPair(new PairFunction<Tuple2<Integer, Tuple2<Integer,DmRating>>, Integer, Integer>() 
        public Tuple2<Integer, Integer> call(Tuple2<Integer, Tuple2<Integer, DmRating>> userIdDmRatingClusterId ) throws Exception 
            Integer userId = userIdDmRatingClusterId._1();
            Tuple2<Integer, DmRating> dmRatingClusterId = userIdDmRatingClusterId._2();
            return new Tuple2<Integer, Integer>(dmRatingClusterId._1(), dmRatingClusterId._2().product());
        
    );
    //////
    /// Count the popularity of a video in a cluster
    JavaPairRDD<String, Integer> clusterIdVideoIdStrInt = clusterIdVideoId.mapToPair(new PairFunction<Tuple2<Integer, Integer>, String, Integer>() 
        @Override
        public Tuple2<String, Integer> call(Tuple2<Integer, Integer> videoIdClusterId) throws Exception 
            return new Tuple2<>(String.format("%d:%d", videoIdClusterId._1(), videoIdClusterId._2()), 1);
        
    );
    JavaPairRDD<String, Integer> clusterIdVideoIdStrCount =   clusterIdVideoIdStrInt.reduceByKey(new Function2<Integer, Integer, Integer>() 
        @Override
        public Integer call(Integer v1, Integer v2) throws Exception 
            return v1+v2;
        
    );
    ///

    JavaPairRDD<Integer, Tuple2<Integer, Integer>> clusterId_T_videoIdCount = clusterIdVideoIdStrCount.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, Tuple2<Integer, Integer>>() 
        @Override
        public Tuple2<Integer, Tuple2<Integer, Integer>> call(Tuple2<String, Integer> clusterIdVideoIdStrCount) throws Exception 
            String[] splits = clusterIdVideoIdStrCount._1().split(":");
            try
                if(splits.length==2)
                    int clusterId = Integer.parseInt(splits[0]);
                    int videoId = Integer.parseInt(splits[1]);
                    return new Tuple2<>(clusterId, new Tuple2<>(videoId, clusterIdVideoIdStrCount._2()));
                else
                    //Should never occur
                    LOGGER.error("Could not split  into two with : as the separator!", clusterIdVideoIdStrCount._1());
                
            catch (NumberFormatException ex)
                LOGGER.error(ex.getMessage());
            
            return new Tuple2<>(-1, new Tuple2<>(-1,-1));
        
    );

    JavaPairRDD<Integer, Iterable<Tuple2<Integer, Integer>>> clusterIdVideoIdGrouped = clusterId_T_videoIdCount.groupByKey();

    JavaPairRDD<Integer, DmRating> clusterIdDmRating = userId_T_clusterIdDmRating.mapToPair(new PairFunction<Tuple2<Integer, Tuple2<Integer, DmRating>>, Integer, DmRating>() 
        @Override
        public Tuple2<Integer, DmRating> call(Tuple2<Integer, Tuple2<Integer, DmRating>> userId_T_clusterIdDmRating) throws Exception 
           return userId_T_clusterIdDmRating._2();
        
    );

    JavaPairRDD<Integer, Tuple2<DmRating, Iterable<Tuple2<Integer, Integer>>>> clusterId_T_DmRatingVideoIds = clusterIdDmRating.join(clusterIdVideoIdGrouped, numPartitions);

    JavaPairRDD<Integer, String> userIdStringRDD = clusterId_T_DmRatingVideoIds.mapToPair(new PairFunction<Tuple2<Integer, Tuple2<DmRating, Iterable<Tuple2<Integer, Integer>>>>, Integer, String>() 
        @Override
        public Tuple2<Integer, String> call(Tuple2<Integer, Tuple2<DmRating, Iterable<Tuple2<Integer, Integer>>>> v1) throws Exception 
            int clusterId = v1._1();
            Tuple2<DmRating, Iterable<Tuple2<Integer, Integer>>> tuple = v1._2();
            DmRating rating = tuple._1();
            Iterable<Tuple2<Integer, Integer>> videosCounts= tuple._2();
            StringBuilder recosStr = new StringBuilder();
            boolean appendComa = false;
            for(Tuple2<Integer, Integer> videoCount : videosCounts)
                if(appendComa) recosStr.append(",");
                recosStr.append("");
                recosStr.append("\"video_id\":");
                recosStr.append(videoCount._1());
                recosStr.append(",");
                recosStr.append("\"count\":");
                recosStr.append(videoCount._2());
                recosStr.append("");
                appendComa = true;
            
            String val = String.format("\"user_id\":\"%s\",\"v1st\":\"%s\",\"redis_uid\":%s,\"cluster_id\": %d,\"recommendations\":[  %s ]", rating.dmUserId,  rating.dmV1stStr, rating.user(), clusterId, recosStr);
            return new Tuple2<Integer, String>(rating.user(), val);
        
    );
    JavaPairRDD<Integer, Iterable<String>> groupedRdd = userIdStringRDD.groupByKey(numPartitions);
    JavaRDD<String> jsonStringRdd = groupedRdd.map(new Function<Tuple2<Integer, Iterable<String>>, String>() 
        @Override
        public String call(Tuple2<Integer, Iterable<String>> v1) throws Exception 
            for(String str : v1._2())
                return str;
            
            LOGGER.error("Could not fetch a string from iterable so returning empty");
            return "";
        
    );

    //LOGGER.info("Number of items in RDD: ", jsonStringRDD.count());
    //return jsonStringRDD.persist(StorageLevel.MEMORY_ONLY_SER_2());
    LOGGER.info("Repartitioning the data into ", numPartitions );
    jsonStringRdd.cache().saveAsTextFile(outDir);
    return jsonStringRdd;

集群大小: 1. 主控:16 CPU,32GB 2. Workers 4 : 32CPU, 102GB, 4X375GB SSD 驱动器

我将代码更改为使用 DataFrames。还是一样的问题

public static void saveAlsKMeansRecosAsParquet(JavaPairRDD<Integer, Tuple2<DmRating, Integer>> userIdRatingClusterIdRDD,
                                                 int numPartitions,
                                                 JavaSparkContext javaSparkContext,
                                                 String outdir)

    JavaRDD<DmRating> dmRatingJavaRDD = userIdRatingClusterIdRDD.map(new Function<Tuple2<Integer, Tuple2<DmRating, Integer>>, DmRating>() 
        public DmRating call(Tuple2<Integer, Tuple2<DmRating, Integer>> v1) throws Exception 
            //Integer userId = v1._1();
            Tuple2<DmRating, Integer> values = v1._2();
            DmRating rating = values._1();
            Integer clusterId = values._2();
            rating.setClusterId(clusterId);
            rating.setVideoId(rating.product());
            rating.setV1stOrUserId((rating.userId== null || rating.userId.isEmpty())? rating.v1stId : rating.userId);
            rating.setRedisId(rating.user());
            return rating;
            //return String.format("\"clusterId\": %s,\"userId\": %s, \"userId\":\"%s\", \"videoId\": %s", clusterId, userId, rating.userId, rating.product());
        
    );
    SQLContext sqlContext = new SQLContext(javaSparkContext);
    DataFrame dmRatingDF = sqlContext.createDataFrame(dmRatingJavaRDD, DmRating.class);
    dmRatingDF.registerTempTable("dmrating");
    DataFrame clusterIdVideoIdDF = sqlContext.sql("SELECT clusterId, videoId FROM dmrating").cache();
    DataFrame rolledupClusterIdVideoIdDF = clusterIdVideoIdDF.rollup("clusterId","videoId").count().cache();
    DataFrame clusterIdUserIdDF = sqlContext.sql("SELECT clusterId, userId, redisId, v1stId FROM dmrating").distinct().cache();
    JavaRDD<Row> rolledUpRDD = rolledupClusterIdVideoIdDF.toJavaRDD();
    JavaRDD<Row> filteredRolledUpRDD = rolledUpRDD.filter(new Function<Row, Boolean>() 
        @Override
        public Boolean call(Row v1) throws Exception 
            //make sure the size and values of the properties are correct
            return !(v1.size()!=3 || v1.isNullAt(0) || v1.isNullAt(1) || v1.isNullAt(2));
        
    );

    JavaPairRDD<Integer, Tuple2<Integer, Integer>> clusterIdVideoIdCount = filteredRolledUpRDD.mapToPair(new PairFunction<Row, Integer, Tuple2<Integer, Integer>>() 
        @Override
        public Tuple2<Integer, Tuple2<Integer, Integer>> call(Row row) throws Exception 
            Tuple2<Integer, Integer> videoIdCount = new Tuple2<Integer, Integer>(row.getInt(1), Long.valueOf(row.getLong(2)).intValue());
            return new Tuple2<Integer, Tuple2<Integer, Integer>>(row.getInt(0),videoIdCount);
        
    ).cache();
    JavaPairRDD<Integer, Iterable<Tuple2<Integer, Integer>>> groupedPair = clusterIdVideoIdCount.groupByKey(numPartitions).cache();
    JavaRDD<ClusterIdVideos> groupedFlat = groupedPair.map(new Function<Tuple2<Integer, Iterable<Tuple2<Integer, Integer>>>, ClusterIdVideos>() 
        @Override
        public ClusterIdVideos call(Tuple2<Integer, Iterable<Tuple2<Integer, Integer>>> v1) throws Exception 
            ClusterIdVideos row = new ClusterIdVideos();
            Iterable<Tuple2<Integer, Integer>> videosCounts= v1._2();
            StringBuilder recosStr = new StringBuilder();
            recosStr.append("[");
            boolean appendComa = false;
            for(Tuple2<Integer, Integer> videoCount : videosCounts)
                if(appendComa) recosStr.append(",");
                recosStr.append("");
                recosStr.append("\"video_id\":");
                recosStr.append(videoCount._1());
                recosStr.append(",");
                recosStr.append("\"count\":");
                recosStr.append(videoCount._2());
                recosStr.append("");
                appendComa = true;
            
            recosStr.append("]");
            row.setClusterId(v1._1());
            row.setVideos(recosStr.toString());
            return row;
        
    ).cache();

    DataFrame groupedClusterId = sqlContext.createDataFrame(groupedFlat, ClusterIdVideos.class);
    DataFrame recosDf = clusterIdUserIdDF.join(groupedClusterId, "clusterId");
    recosDf.write().parquet(outdir);

【问题讨论】:

使用“.repartition(numPartitions: Int)”。它会增加并行度,但这可能无法解决您的问题...您可以发布代码吗? 试过..不影响时间或分区 我们或许可以提供帮助,但需要查看代码。所以请发布代码。 可能不是导致延迟的“saveAsTextFile()”。 “saveAs..”是一个动作,它触发作业中定义的所有其他计算。我建议查看每个阶段,看看哪个操作花费了大部分时间。另外请提供一些示例数据,您的集群大小和数据总大小,我会尝试并尝试优化性能。您还可以启用历史服务器来分析已完成的作业。 【参考方案1】:

好的,找到了问题。罪魁祸首是 groupBy 和 join 操作。 Spark 网站上的文档说

在 Spark 中,数据通常不会跨分区分布在特定操作的必要位置。在计算过程中,单个任务将在单个分区上操作 - 因此,为了组织单个 reduceByKey 减少任务执行的所有数据,Spark 需要执行 all-to-all 操作。它必须从所有分区中读取以找到所有键的所有值,然后将跨分区的值汇总以计算每个键的最终结果 - 这称为 shuffle。

要优化任何 join/groupByKey 操作,目标应该是减少 shuffle。我发现this deck 对诊断问题很有帮助。尤其是幻灯片12。

我知道集群 ids 数据非常小,每次运行 100 个集群,所以我为较小的表创建了一个广播变量并将其广播给所有执行程序,并使用该变量进行连接。这很有效,将计算时间从 2 小时缩短到 10 分钟。

    //convert json string to DF
    DataFrame  groupedClusterId = sqlContext.read().json(groupedFlat.rdd());
    Broadcast<DataFrame> broadcastDataFrame= javaSparkContext.broadcast(groupedClusterId);

    DataFrame recosDf = clusterIdUserIdDF.join(broadcastDataFrame.value(),"clusterId");
    recosDf.write().parquet(outdir);

【讨论】:

以上是关于Spark没有将负载均匀地分配给任务的主要内容,如果未能解决你的问题,请参考以下文章

Spark如何进行动态资源分配

RabbitMQ 将消息不均匀地分配给消费者

如何将 zip 文件的内容分配给 Spark 中的每个任务?

负载均衡的起源

Google研究 | 使用一致的哈希算法分配临界负载

如何在节点上平均分配 slurm 任务?