Spark - 使用 foreachpartition 收集分区

Posted

技术标签:

【中文标题】Spark - 使用 foreachpartition 收集分区【英文标题】:Spark - Collect partitions using foreachpartition 【发布时间】:2018-02-15 23:58:27 【问题描述】:

我们正在使用 spark 进行文件处理。我们正在处理相当大的文件,每个文件大约 30 GB,大约 40-50 百万行。这些文件已格式化。我们将它们加载到数据框中。最初的要求是识别符合条件的记录并将它们加载到 mysql。我们能够做到这一点。

要求最近发生了变化。不符合标准的记录现在将存储在备用数据库中。这会导致问题,因为集合的大小太大。我们正在尝试独立收集每个分区并按照此处的建议合并到一个列表中

https://umbertogriffo.gitbooks.io/apache-spark-best-practices-and-tuning/content/dont_collect_large_rdds.html

我们不熟悉 scala,因此在将其转换为 Java 时遇到了麻烦。我们如何才能逐个迭代分区并收集?

谢谢

【问题讨论】:

【参考方案1】:

请使用 df.foreachPartition 对每个分区独立执行,不会返回驱动程序。您可以将匹配结果保存到每个执行器级别的数据库中。如果您想在驱动程序中收集结果,请使用不推荐用于您的情况的 mappartitions。

请参考以下链接

Spark - Java - foreachPartition

dataset.foreachPartition(new ForeachPartitionFunction<Row>() 
            public void call(Iterator<Row> r) throws Exception 
                while (t.hasNext())

                    Row row = r.next();
                    System.out.println(row.getString(1));

                
                // do your business logic and load into MySQL.
            
        );

对于地图分区:

// You can use the same as Row but for clarity I am defining this.

public class ResultEntry implements Serializable 
  //define your df properties ..



Dataset<ResultEntry> mappedData = data.mapPartitions(new MapPartitionsFunction<Row, ResultEntry>() 

@Override
public Iterator<ResultEntry> call(Iterator<Row> it) 
  List<ResultEntry> filteredResult = new ArrayList<ResultEntry>();
  while (it.hasNext()) 
   Row row = it.next()
   if(somecondition)
       filteredResult.add(convertToResultEntry(row));
 
return filteredResult.iterator();

, Encoders.javaSerialization(ResultEntry.class));

希望这会有所帮助。

拉维

【讨论】:

感谢您的回复。由于该过程是识别跨分区共有的记录,因此我们需要收集。你能帮忙看看如何使用 mappartitions 或 mappartitionswithindex 吗? 感谢您提供更多详细信息。我们还在挣扎。你能按原样从 scala 翻译成 java 吗?这正是我们需要的,因为它在分区上进行了收集。我们不想在分区内迭代,而是在分区中迭代 for(p if(index = = idx) it else Iterator(), true) val data = partRDD.collect codeFunction2 filterFunc= new Function2, Iterator>() @Override public Iterator call(Integer index, Iterator iterator ) 抛出异常 if(iterator.hasNext()) iterator.next(); if(index == idx) 返回迭代器; 否则返回 new Array().iterator(); ; JavaRDD inputRdd = p.mapPartitionsWithIndex(filterFunc, true);code希望这有帮助

以上是关于Spark - 使用 foreachpartition 收集分区的主要内容,如果未能解决你的问题,请参考以下文章

手把手带你玩转Spark机器学习-使用Spark进行文本处理

科普Spark,Spark是什么,如何使用Spark

手把手带你玩转Spark机器学习-使用Spark构建分类模型

Spark系列

将spark依赖包传入HDFS_spark.yarn.jar和spark.yarn.archive的使用

使用 Java 的 Spark 和 Spark SQL 新手