简单的 Java Map/Reduce 框架 [关闭]

Posted

技术标签:

【中文标题】简单的 Java Map/Reduce 框架 [关闭]【英文标题】:Simple Java Map/Reduce framework [closed] 【发布时间】:2011-07-12 17:24:49 【问题描述】:

谁能告诉我一个简单的开源 Map/Reduce 框架/Java API?似乎没有太多证据表明存在这样的事情,但其他人可能知道不同。

我能找到的最好的当然是 Hadoop MapReduce,但这不符合“简单”标准。我不需要运行分布式作业的能力,只需要让我在多核机器上运行 map/reduce 风格的作业,在单个 JVM 中,使用标准的 Java5 风格并发。

自己写并不难,但我宁愿不用。

【问题讨论】:

我看到了这个视频,它宣布了 Java 8 的新特性。新版本中似乎会有 mapreduce API。 youtube.com/watch?v=47_Em-zc7_Q 我很想知道您目前对这个问题的解决方案是什么。我只是在寻找在单台机器上并行执行 Lists.transform(function) 的快速、简单的方法。 LeoTask 有效。它是一个在多核机器上运行的并行任务和结果聚合框架。 github.com/mleoking/leotask 【参考方案1】:

我认为值得一提的是,这些问题是 Java 8 的历史。一个例子:

int heaviestBlueBlock =
    blocks.filter(b -> b.getColor() == BLUE)
          .map(Block::getWeight)
          .reduce(0, Integer::max);

换句话说:单节点 MapReduce 在 Java 8 中可用

更多详情请见Brian Goetz's presentation about project lambda

【讨论】:

假设它成功了,是的。历史告诉我们,有趣的东西通常会被淘汰。 @skaffman:如果 lambda 最终不能成功,我会哭的!! 我(很晚)接受这个答案,因为随着 Java8 的采用,其他所有选项都会很快变得不合时宜。 @skaffman:是的。幸运的是,2.5 年前你的评论是不对的 :)【参考方案2】:

你可以试试 LeoTask:一个并行任务运行和结果聚合框架

它是免费和开源的:https://github.com/mleoking/leotask

这里简单介绍一下它的API:https://github.com/mleoking/leotask/blob/master/leotask/introduction.pdf?raw=true

它是一个轻量级框架,在使用其所有可用 CPU 内核的单台计算机上工作。

它具有以下特点:

自动和并行参数空间探索 灵活且基于配置的结果聚合 只关注关键逻辑的编程模型 可靠的自动中断恢复

和实用程序:

动态和可克隆的网络结构。 与 Gnuplot 集成 根据常见网络模型生成网络 DelimitedReader:一个复杂的阅读器,可以像数据库一样探索 CSV(逗号分隔值)文件 基于 Mersenne Twister 算法的快速随机数生成器 ImageJ 项目中的集成 CurveFitter

【讨论】:

这是一则广告。【参考方案3】:

我喜欢在 Java 中使用 Skandium 来实现并行性。该框架为具有共享内存的多核机器实现了某些并行模式(即主从、Map/Reduce、Pipe、Fork 和分治)。这种技术被称为“算法骨架”。模式可以嵌套。

详细有骨骼和肌肉。肌肉做实际的工作(分裂、合并、执行和调节)。骨架表示并行模式,但“While”、“For”和“If”除外,它们在嵌套模式时很有用。

可以在框架内找到示例。我需要一点了解如何使用肌肉和骨骼,但在克服了这个障碍之后,我真的很喜欢这个框架。 :)

【讨论】:

这并不需要积极开发。 悲伤,但确实如此。几天前想访问他们的网站,似乎他们在今年年初就取消了它。所以如果没有人觉得有义务自己维护这个包(它是开源的),就不会有任何更新。也许下次我会寻找替代品,但我真的很满意。【参考方案4】:

在 Hazelcast v3.2 中引入了 MapReduce API(请参阅MapReduce API section in the docs)。虽然 Hazelcast 旨在用于分布式系统,但它在单节点设置中运行良好,而且相当轻量级。

【讨论】:

【参考方案5】:

你可能想看看Functionals 4 Java的项目网站:http://f4j.rethab.ch/它在8之前的java版本中引入了filter、map和reduce。

【讨论】:

【参考方案6】:

您查看Akka 了吗?虽然 akka 确实是一个基于分布式 Actor 模型的并发框架,但您可以用很少的代码实现很多事情。用它很容易将工作分成几部分,它会自动充分利用多核机器,以及能够使用多台机器来处理工作。与使用线程不同,它对我来说感觉更自然。

我有一个使用 akka 的 Java map reduce example。这不是最简单的 map reduce 示例,因为它使用了期货;但它应该让您大致了解所涉及的内容。我的 map reduce 示例展示了几件主要的事情:

如何分工。 如何分配工作:akka 有一个非常简单的消息传递系统和一个工作分区器,您可以配置其日程安排。一旦我学会了如何使用它,我就停不下来了。它是如此简单和灵活。我很快就使用了我的所有四个 CPU 内核。这对于实现服务非常有用。 如何知道工作何时完成以及结果已准备好处理:这实际上是最难理解的部分,除非您已经熟悉 Futures。您不需要使用 Futures,因为还有其他选项。我只是使用它们,因为我想要一些更短的东西让人们去摸索。

如果您有任何问题,*** 实际上有一个很棒的 akka QA 部分。

【讨论】:

【参考方案7】:

几年前,当我得到一台 8 核机器时,我为自己创建了一次性设备,但我对它并不十分满意。我从来没有像我希望的那样简单易用,而且内存密集型任务也不能很好地扩展。

如果你没有得到任何真实的答案,我可以分享更多,但它的核心是:

public class LocalMapReduce<TMapInput, TMapOutput, TOutput> 
    private int m_threads;
    private Mapper<TMapInput, TMapOutput> m_mapper;
    private Reducer<TMapOutput, TOutput> m_reducer;
    ...
    public TOutput mapReduce(Iterator<TMapInput> inputIterator) 
        ExecutorService pool = Executors.newFixedThreadPool(m_threads);
        Set<Future<TMapOutput>> futureSet = new HashSet<Future<TMapOutput>>();
        while (inputIterator.hasNext()) 
            TMapInput m = inputIterator.next();
            Future<TMapOutput> f = pool.submit(m_mapper.makeWorker(m));
            futureSet.add(f);
            Thread.sleep(10);
        
        while (!futureSet.isEmpty()) 
            Thread.sleep(5);
            for (Iterator<Future<TMapOutput>> fit = futureSet.iterator(); fit.hasNext();) 
                Future<TMapOutput> f = fit.next();
                if (f.isDone()) 
                    fit.remove();
                    TMapOutput x = f.get();
                    m_reducer.reduce(x);
                
            
        
        return m_reducer.getResult();
    

编辑:根据评论,下面是没有sleep 的版本。诀窍是使用CompletionService,它本质上提供了一个已完成Futures 的阻塞队列。

 public class LocalMapReduce<TMapInput, TMapOutput, TOutput> 
    private int m_threads;
    private Mapper<TMapInput, TMapOutput> m_mapper;
    private Reducer<TMapOutput, TOutput> m_reducer;
    ...
    public TOutput mapReduce(Collection<TMapInput> input) 
        ExecutorService pool = Executors.newFixedThreadPool(m_threads);
        CompletionService<TMapOutput> futurePool = 
                  new ExecutorCompletionService<TMapOutput>(pool);
        Set<Future<TMapOutput>> futureSet = new HashSet<Future<TMapOutput>>();
        for (TMapInput m : input) 
            futureSet.add(futurePool.submit(m_mapper.makeWorker(m)));
        
        pool.shutdown();
        int n = futureSet.size();
        for (int i = 0; i < n; i++) 
            m_reducer.reduce(futurePool.take().get());
        
        return m_reducer.getResult();
    

我还要注意这是一个非常精炼的 map-reduce 算法,包括一个同时执行 reduce 和合并操作的 reduce worker。

【讨论】:

缺少对reduce值的key排序,所以reduce部分没有像Hadoop那样并行化。 @yura:确实。这是我不想担心的那种微调的微妙之处。 @Chris 好的,那还有什么更好的呢?我已经有一段时间没有做过任何专业的 Java 工作了——有没有关于“好的”并发技术的参考资料? 看看download.oracle.com/javase/tutorial/essential/concurrency/…。一旦你理解了它,你就不需要在这个上下文中再次使用 Thread.sleep 了:) @xan,你为什么要写一个没有睡眠的版本?是因为睡眠是 CPU 密集型的吗?【参考方案8】:

你看过GridGain吗?

【讨论】:

GridGain 非常好,也许是最好的,但是非常昂贵,而且他们不支持社区版。甚至社区版 3.6 的文件也无法下载。出于简单的目的,我不推荐电网增益。就像你有一个大项目和一家非常非常大的公司一样。因此,我推荐 Akka。 他们在 2014 年 3 月重新开源。【参考方案9】:

我意识到这可能是事后的事,但您可能想看看 JDK7 中的 JSR166y ForkJoin 类。

有一个可以在 JDK6 下正常运行的后向移植库,因此您不必等到下一个千年才能使用它。它位于原始执行程序和 hadoop 之间的某个位置,为在当前 JVM 中处理 map reduce 作业提供了一个框架。

【讨论】:

【参考方案10】:

我使用如下结构

int procs = Runtime.getRuntime().availableProcessors();
ExecutorService es = Executors.newFixedThreadPool(procs);

List<Future<TaskResult>> results = new ArrayList();
for(int i=0;i<tasks;i++)
    results.add(es.submit(new Task(i)));
for(Future<TaskResult> future:results)
    reduce(future);

【讨论】:

嗯...这不是 map-reduce,那只是一个赤裸裸的执行者。 你想要简单。循环将工作映射到tasks 任务,并可用于组合或减少单个结果。可选地,结果可以存储在 Future 中。 我意识到我可以编写自己的 map/reduce 框架,但我不想想要。想要使用现成的通用解决方案已经足够复杂了。 @skaffman,您想要的东西比最简单的解决方案更复杂,但比完整的解决方案更简单。一个金锁解决方案。 ;) 也许你可以说出你的最低要求是什么。 介于“执行者”和“hadoop”之间。我愿意接受这之间的所有建议。

以上是关于简单的 Java Map/Reduce 框架 [关闭]的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop学习:Map/Reduce初探与小Demo实现

Spark streaming storm map reduce区别与联系

MongoDB下Map-Reduce使用简单翻译及示例

Hadoop Map/Reduce

Python 之内置函数:filter、map、reduce、zip、enumerate

关于在eclipse上能运行Map但无法运行Reduce的解决方法