简单的 Java Map/Reduce 框架 [关闭]
Posted
技术标签:
【中文标题】简单的 Java Map/Reduce 框架 [关闭]【英文标题】:Simple Java Map/Reduce framework [closed] 【发布时间】:2011-07-12 17:24:49 【问题描述】:谁能告诉我一个简单的开源 Map/Reduce 框架/Java API?似乎没有太多证据表明存在这样的事情,但其他人可能知道不同。
我能找到的最好的当然是 Hadoop MapReduce,但这不符合“简单”标准。我不需要运行分布式作业的能力,只需要让我在多核机器上运行 map/reduce 风格的作业,在单个 JVM 中,使用标准的 Java5 风格并发。
自己写并不难,但我宁愿不用。
【问题讨论】:
我看到了这个视频,它宣布了 Java 8 的新特性。新版本中似乎会有 mapreduce API。 youtube.com/watch?v=47_Em-zc7_Q 我很想知道您目前对这个问题的解决方案是什么。我只是在寻找在单台机器上并行执行 Lists.transform(function) 的快速、简单的方法。 LeoTask 有效。它是一个在多核机器上运行的并行任务和结果聚合框架。 github.com/mleoking/leotask 【参考方案1】:我认为值得一提的是,这些问题是 Java 8 的历史。一个例子:
int heaviestBlueBlock =
blocks.filter(b -> b.getColor() == BLUE)
.map(Block::getWeight)
.reduce(0, Integer::max);
换句话说:单节点 MapReduce 在 Java 8 中可用。
更多详情请见Brian Goetz's presentation about project lambda
【讨论】:
假设它成功了,是的。历史告诉我们,有趣的东西通常会被淘汰。 @skaffman:如果 lambda 最终不能成功,我会哭的!! 我(很晚)接受这个答案,因为随着 Java8 的采用,其他所有选项都会很快变得不合时宜。 @skaffman:是的。幸运的是,2.5 年前你的评论是不对的 :)【参考方案2】:你可以试试 LeoTask:一个并行任务运行和结果聚合框架
它是免费和开源的:https://github.com/mleoking/leotask
这里简单介绍一下它的API:https://github.com/mleoking/leotask/blob/master/leotask/introduction.pdf?raw=true
它是一个轻量级框架,在使用其所有可用 CPU 内核的单台计算机上工作。
它具有以下特点:
自动和并行参数空间探索 灵活且基于配置的结果聚合 只关注关键逻辑的编程模型 可靠的自动中断恢复和实用程序:
动态和可克隆的网络结构。 与 Gnuplot 集成 根据常见网络模型生成网络 DelimitedReader:一个复杂的阅读器,可以像数据库一样探索 CSV(逗号分隔值)文件 基于 Mersenne Twister 算法的快速随机数生成器 ImageJ 项目中的集成 CurveFitter【讨论】:
这是一则广告。【参考方案3】:我喜欢在 Java 中使用 Skandium 来实现并行性。该框架为具有共享内存的多核机器实现了某些并行模式(即主从、Map/Reduce、Pipe、Fork 和分治)。这种技术被称为“算法骨架”。模式可以嵌套。
详细有骨骼和肌肉。肌肉做实际的工作(分裂、合并、执行和调节)。骨架表示并行模式,但“While”、“For”和“If”除外,它们在嵌套模式时很有用。
可以在框架内找到示例。我需要一点了解如何使用肌肉和骨骼,但在克服了这个障碍之后,我真的很喜欢这个框架。 :)
【讨论】:
这并不需要积极开发。 悲伤,但确实如此。几天前想访问他们的网站,似乎他们在今年年初就取消了它。所以如果没有人觉得有义务自己维护这个包(它是开源的),就不会有任何更新。也许下次我会寻找替代品,但我真的很满意。【参考方案4】:在 Hazelcast v3.2 中引入了 MapReduce API(请参阅MapReduce API section in the docs)。虽然 Hazelcast 旨在用于分布式系统,但它在单节点设置中运行良好,而且相当轻量级。
【讨论】:
【参考方案5】:你可能想看看Functionals 4 Java的项目网站:http://f4j.rethab.ch/它在8之前的java版本中引入了filter、map和reduce。
【讨论】:
【参考方案6】:您查看Akka 了吗?虽然 akka 确实是一个基于分布式 Actor 模型的并发框架,但您可以用很少的代码实现很多事情。用它很容易将工作分成几部分,它会自动充分利用多核机器,以及能够使用多台机器来处理工作。与使用线程不同,它对我来说感觉更自然。
我有一个使用 akka 的 Java map reduce example。这不是最简单的 map reduce 示例,因为它使用了期货;但它应该让您大致了解所涉及的内容。我的 map reduce 示例展示了几件主要的事情:
如何分工。 如何分配工作:akka 有一个非常简单的消息传递系统和一个工作分区器,您可以配置其日程安排。一旦我学会了如何使用它,我就停不下来了。它是如此简单和灵活。我很快就使用了我的所有四个 CPU 内核。这对于实现服务非常有用。 如何知道工作何时完成以及结果已准备好处理:这实际上是最难理解的部分,除非您已经熟悉 Futures。您不需要使用 Futures,因为还有其他选项。我只是使用它们,因为我想要一些更短的东西让人们去摸索。如果您有任何问题,*** 实际上有一个很棒的 akka QA 部分。
【讨论】:
【参考方案7】:几年前,当我得到一台 8 核机器时,我为自己创建了一次性设备,但我对它并不十分满意。我从来没有像我希望的那样简单易用,而且内存密集型任务也不能很好地扩展。
如果你没有得到任何真实的答案,我可以分享更多,但它的核心是:
public class LocalMapReduce<TMapInput, TMapOutput, TOutput>
private int m_threads;
private Mapper<TMapInput, TMapOutput> m_mapper;
private Reducer<TMapOutput, TOutput> m_reducer;
...
public TOutput mapReduce(Iterator<TMapInput> inputIterator)
ExecutorService pool = Executors.newFixedThreadPool(m_threads);
Set<Future<TMapOutput>> futureSet = new HashSet<Future<TMapOutput>>();
while (inputIterator.hasNext())
TMapInput m = inputIterator.next();
Future<TMapOutput> f = pool.submit(m_mapper.makeWorker(m));
futureSet.add(f);
Thread.sleep(10);
while (!futureSet.isEmpty())
Thread.sleep(5);
for (Iterator<Future<TMapOutput>> fit = futureSet.iterator(); fit.hasNext();)
Future<TMapOutput> f = fit.next();
if (f.isDone())
fit.remove();
TMapOutput x = f.get();
m_reducer.reduce(x);
return m_reducer.getResult();
编辑:根据评论,下面是没有sleep
的版本。诀窍是使用CompletionService
,它本质上提供了一个已完成Future
s 的阻塞队列。
public class LocalMapReduce<TMapInput, TMapOutput, TOutput>
private int m_threads;
private Mapper<TMapInput, TMapOutput> m_mapper;
private Reducer<TMapOutput, TOutput> m_reducer;
...
public TOutput mapReduce(Collection<TMapInput> input)
ExecutorService pool = Executors.newFixedThreadPool(m_threads);
CompletionService<TMapOutput> futurePool =
new ExecutorCompletionService<TMapOutput>(pool);
Set<Future<TMapOutput>> futureSet = new HashSet<Future<TMapOutput>>();
for (TMapInput m : input)
futureSet.add(futurePool.submit(m_mapper.makeWorker(m)));
pool.shutdown();
int n = futureSet.size();
for (int i = 0; i < n; i++)
m_reducer.reduce(futurePool.take().get());
return m_reducer.getResult();
我还要注意这是一个非常精炼的 map-reduce 算法,包括一个同时执行 reduce 和合并操作的 reduce worker。
【讨论】:
缺少对reduce值的key排序,所以reduce部分没有像Hadoop那样并行化。 @yura:确实。这是我不想担心的那种微调的微妙之处。 @Chris 好的,那还有什么更好的呢?我已经有一段时间没有做过任何专业的 Java 工作了——有没有关于“好的”并发技术的参考资料? 看看download.oracle.com/javase/tutorial/essential/concurrency/…。一旦你理解了它,你就不需要在这个上下文中再次使用 Thread.sleep 了:) @xan,你为什么要写一个没有睡眠的版本?是因为睡眠是 CPU 密集型的吗?【参考方案8】:你看过GridGain吗?
【讨论】:
GridGain 非常好,也许是最好的,但是非常昂贵,而且他们不支持社区版。甚至社区版 3.6 的文件也无法下载。出于简单的目的,我不推荐电网增益。就像你有一个大项目和一家非常非常大的公司一样。因此,我推荐 Akka。 他们在 2014 年 3 月重新开源。【参考方案9】:我意识到这可能是事后的事,但您可能想看看 JDK7 中的 JSR166y ForkJoin 类。
有一个可以在 JDK6 下正常运行的后向移植库,因此您不必等到下一个千年才能使用它。它位于原始执行程序和 hadoop 之间的某个位置,为在当前 JVM 中处理 map reduce 作业提供了一个框架。
【讨论】:
【参考方案10】:我使用如下结构
int procs = Runtime.getRuntime().availableProcessors();
ExecutorService es = Executors.newFixedThreadPool(procs);
List<Future<TaskResult>> results = new ArrayList();
for(int i=0;i<tasks;i++)
results.add(es.submit(new Task(i)));
for(Future<TaskResult> future:results)
reduce(future);
【讨论】:
嗯...这不是 map-reduce,那只是一个赤裸裸的执行者。 你想要简单。循环将工作映射到tasks
任务,并可用于组合或减少单个结果。可选地,结果可以存储在 Future 中。
我意识到我可以编写自己的 map/reduce 框架,但我不想想要。想要使用现成的通用解决方案已经足够复杂了。
@skaffman,您想要的东西比最简单的解决方案更复杂,但比完整的解决方案更简单。一个金锁解决方案。 ;) 也许你可以说出你的最低要求是什么。
介于“执行者”和“hadoop”之间。我愿意接受这之间的所有建议。以上是关于简单的 Java Map/Reduce 框架 [关闭]的主要内容,如果未能解决你的问题,请参考以下文章
Spark streaming storm map reduce区别与联系