在 Java Spark 中迭代大型数据集的最快且有效的方法

Posted

技术标签:

【中文标题】在 Java Spark 中迭代大型数据集的最快且有效的方法【英文标题】:Fastest And Effective Way To Iterate Large DataSet in Java Spark 【发布时间】:2019-01-19 05:34:03 【问题描述】:

我正在使用以下方法将 spark 数据集转换为哈希图列表, 我的最终目标是构建 json 对象列表或 hashmaps 列表 我在 320 万行上运行此代码

List<HashMap> finalJsonMap = new ArrayList<HashMap>();
    srcData.foreachPartition(new ForeachPartitionFunction<Row>() 
        public void call(Iterator<Row> t) throws Exception 
            while (t.hasNext())
                Row eachRow = t.next();
                HashMap rowMap = new HashMap();
                for(int j = 0; j < grpdColNames.size(); j++) 
                    rowMap.put(grpdColNames.get(j), eachRow.getString(j));  
                
                finalJsonMap.add(rowMap);
            
        
    );

迭代工作正常,但我无法将 rowMap 添加到 finalJsonMap 中。

最好的方法是什么?

【问题讨论】:

你在集群模式二下测试过这段代码吗?当您的驱动程序和执行程序不同时,恐怕它不起作用。 如下 user10947263 所述,我认为您的设计没有利用 Spark 的分发功能。认为您的代码在许多具有不同内存的不同机器上执行。如果您的最终目标是保存 JSON 文件,请使用 Spark 转换构建您的 JSON 文件并在最后保存 JSON 文件......只需我的 2 美分 【参考方案1】:

Spark 确实不是这样工作的。

foreachPartition 中的代码在与原始上下文不同的上下文中执行

List<HashMap> finalJsonMap = new ArrayList<HashMap>();

在这种设置中,您所能做的就是修改本地副本。

这已在 Stack Overflow 上多次讨论,并在 official documentation in the Understanding Closures 部分进行了详细描述。

考虑到所需的结果(即本地集合),除了将代码转换为使用 mapPartitionscollect 之外,您实际上无能为力。然而,这在 Spark 中几乎没有效率或惯用语。

我强烈建议您重新考虑您当前的设计。

【讨论】:

以上是关于在 Java Spark 中迭代大型数据集的最快且有效的方法的主要内容,如果未能解决你的问题,请参考以下文章

从 sql server 迁移到大型数据集的 sqlite 的最快方法

JPA:迭代大型结果集的正确模式是啥?

pandas.concat 和 numpy.append 的大型数据集的内存错误

pandas.concat 和 numpy.append 的大型数据集的内存错误

使用 Java 在 Spark 2.0 中使用数据集的 GroupByKey

Apache Spark - 在java中返回空数据集的映射函数