将多个文件作为独立的 RDD 并行处理

Posted

技术标签:

【中文标题】将多个文件作为独立的 RDD 并行处理【英文标题】:Processing multiple files as independent RDD's in parallel 【发布时间】:2015-08-10 06:27:40 【问题描述】:

我有一个场景,包括 group by 在内的一定数量的操作必须应用于许多小文件(每个约 300MB)。操作是这样的..

df.groupBy(....).agg(....)

现在要在多个文件上处理它,我可以使用通配符“/**/*.csv”,但是,它会创建单个 RDD 并将其分区以进行操作。但是,从操作上看,它是一个分组,并且涉及大量的 shuffle,如果文件互斥,则这是不必要的。

我正在研究的是一种可以在文件上创建独立 RDD 并独立操作它们的方法。

【问题讨论】:

【参考方案1】:

这更像是一个想法,而不是一个完整的解决方案,我还没有测试过。

您可以先将数据处理管道提取到函数中。

def pipeline(f: String, n: Int) = 
    sqlContext
        .read
        .format("com.databricks.spark.csv")
        .option("header", "true")
        .load(f)
        .repartition(n)
        .groupBy(...)
        .agg(...)
        .cache // Cache so we can force computation later

如果您的文件很小,您可以调整n 参数以使用尽可能少的分区来容纳单个文件中的数据并避免混洗。这意味着您正在限制并发,但我们稍后会回到这个问题。

val n: Int = ??? 

接下来,您必须获取输入文件的列表。此步骤取决于数据源,但大多数情况下它或多或少是简单的:

val files: Array[String] = ???

接下来你可以使用pipeline函数映射上面的列表:

val rdds = files.map(f => pipeline(f, n))

由于我们将并发限制在单个文件级别,因此我们希望通过提交多个作业来进行补偿。让我们添加一个简单的帮助器来强制评估并用Future包装它

import scala.concurrent._
import ExecutionContext.Implicits.global

def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = future 
    df.rdd.foreach(_ => ()) // Force computation
    df

最后我们可以在rdds 上使用上面的助手:

val result = Future.sequence(
   rdds.map(rdd => pipelineToFuture(rdd)).toList
)

根据您的要求,您可以添加 onComplete 回调或使用响应式流来收集结果。

【讨论】:

好吧,就我而言,不需要解决方法,因为在这里完全无关紧要。 sqlContext 仅用于驱动程序,因此没有任何理由进行序列化。 @AlexNaspo 不彻底,但我用过一次或两次类似的方法。除非你有太多的内存,否则执行实际操作而不是依赖缓存更有意义。如果您对一般原则感兴趣,请查看org.apache.spark.rdd.AsyncRDDActions【参考方案2】:

如果你有很多文件,并且每个文件都很小(你说 300MB 以上,我认为 Spark 很小),你可以尝试使用SparkContext.wholeTextFiles,它将创建一个 RDD,其中每条记录都是一个完整的文件。

【讨论】:

【参考方案3】:

这样我们就可以并行编写多个RDD

public class ParallelWriteSevice implements IApplicationEventListener 

    private static final IprogramLogger logger = programLoggerFactory.getLogger(ParallelWriteSevice.class);

    private static ExecutorService executorService=null;
    private static List<Future<Boolean>> futures=new ArrayList<Future<Boolean>>();

    public static void submit(Callable callable) 
        if(executorService==null)
        
            executorService=Executors.newFixedThreadPool(15);//Based on target tables increase this
        

        futures.add(executorService.submit(callable));
    

    public static boolean isWriteSucess() 
        boolean writeFailureOccured = false;
        try 
            for (Future<Boolean> future : futures) 
                try 
                    Boolean writeStatus = future.get();
                    if (writeStatus == false) 
                        writeFailureOccured = true;
                    
                 catch (Exception e) 
                    logger.error("Erorr - Scdeduled write failed " + e.getMessage(), e);
                    writeFailureOccured = true;
                
            
         finally 
            resetFutures();         
              if (executorService != null) 
                  executorService.shutdown();
              executorService = null;

        
        return !writeFailureOccured;
    

    private static void resetFutures() 
            logger.error("resetFutures called");
            //futures.clear();
    





【讨论】:

以上是关于将多个文件作为独立的 RDD 并行处理的主要内容,如果未能解决你的问题,请参考以下文章

Java的并发和多处理器的并行的理解

将文件上传为带有集成的春季批处理中的并行过程

如何在并行运行多个文件时在后台运行批处理文件

聊聊C++异步编程-1

如何使用 Pyspark 并行处理多个镶木地板文件?

Spark Streaming:微批处理并行执行