在 Java Spark 中迭代大型数据集的最快且有效的方法
Posted
技术标签:
【中文标题】在 Java Spark 中迭代大型数据集的最快且有效的方法【英文标题】:Fastest And Effective Way To Iterate Large DataSet in Java Spark 【发布时间】:2019-01-19 05:34:03 【问题描述】:我正在使用以下方法将 spark 数据集转换为哈希图列表, 我的最终目标是构建 json 对象列表或 hashmaps 列表 我在 320 万行上运行此代码
List<HashMap> finalJsonMap = new ArrayList<HashMap>();
srcData.foreachPartition(new ForeachPartitionFunction<Row>()
public void call(Iterator<Row> t) throws Exception
while (t.hasNext())
Row eachRow = t.next();
HashMap rowMap = new HashMap();
for(int j = 0; j < grpdColNames.size(); j++)
rowMap.put(grpdColNames.get(j), eachRow.getString(j));
finalJsonMap.add(rowMap);
);
迭代工作正常,但我无法将 rowMap 添加到 finalJsonMap 中。
最好的方法是什么?
【问题讨论】:
你在集群模式二下测试过这段代码吗?当您的驱动程序和执行程序不同时,恐怕它不起作用。 如下 user10947263 所述,我认为您的设计没有利用 Spark 的分发功能。认为您的代码在许多具有不同内存的不同机器上执行。如果您的最终目标是保存 JSON 文件,请使用 Spark 转换构建您的 JSON 文件并在最后保存 JSON 文件......只需我的 2 美分 【参考方案1】:Spark 确实不是这样工作的。
foreachPartition
中的代码在与原始上下文不同的上下文中执行
List<HashMap> finalJsonMap = new ArrayList<HashMap>();
在这种设置中,您所能做的就是修改本地副本。
这已在 Stack Overflow 上多次讨论,并在 official documentation in the Understanding Closures 部分进行了详细描述。
考虑到所需的结果(即本地集合),除了将代码转换为使用 mapPartitions
和 collect
之外,您实际上无能为力。然而,这在 Spark 中几乎没有效率或惯用语。
我强烈建议您重新考虑您当前的设计。
【讨论】:
以上是关于在 Java Spark 中迭代大型数据集的最快且有效的方法的主要内容,如果未能解决你的问题,请参考以下文章
从 sql server 迁移到大型数据集的 sqlite 的最快方法
pandas.concat 和 numpy.append 的大型数据集的内存错误
pandas.concat 和 numpy.append 的大型数据集的内存错误