使用 Java 并发 API 对动态数据流进行建模的技术

Posted

技术标签:

【中文标题】使用 Java 并发 API 对动态数据流进行建模的技术【英文标题】:Techniques for modeling a dynamic dataflow with Java concurrency API 【发布时间】:2011-02-21 07:47:19 【问题描述】:

编辑:这基本上是一个“如何在 Java 中正确实现数据流引擎”的问题,我觉得这个问题无法在一个答案中得到充分回答(就像问“如何正确实现 ORM 层”和让某人写出 Hibernate 或其他内容的详细信息),因此请考虑“关闭”这个问题。

有没有一种优雅的方式来为 Java 中的动态数据流建模?通过数据流,我的意思是有各种类型的任务,这些任务可以任意“连接”,这样当一个任务完成时,后继任务将使用完成的任务输出作为输入并行执行,或者当多个任务完成时,它们的输出在后续任务中聚合(请参阅flow-based programming)。通过动态,我的意思是任务完成时后继任务的类型和数量取决于该已完成任务的输出,例如,如果任务 A 具有特定输出,则可能会生成任务 B,但如果有,则可能会生成任务 C不同的输出。另一种说法是,每个任务(或一组任务)负责确定下一个任务是什么。

用于渲染网页的示例数据流:我有任务类型:文件下载器、html/CSS 渲染器、HTML 解析器/DOM 构建器、图像渲染器、javascript 解析器、JavaScript 解释器。

HTML 文件的文件下载器任务 HTML 解析器/DOM 构建器任务 每个嵌入文件/链接的文件下载器任务 如果是图片,图片渲染器 如果是外部 JavaScript,JavaScript 解析器 JavaScript 解释器 否则,只需存储在 HTML 解析器任务中的某个 var/field 中 每个嵌入脚本的 JavaScript 解析器 JavaScript 解释器 等待上述任务完成,然后是 HTML/CSS 渲染器(显然不是最佳或完全正确,但这很简单)

我并不是说解决方案需要一些全面的框架(事实上,越接近 JDK API 越好),我绝对不想要像 Spring Web Flow 或一些声明性标记这样重量级的东西或其他 DSL。

更具体地说,我试图想出一种在 Java 中使用 Callables、Executors、ExecutorCompletionServices 以及可能还有各种同步器类(如 Semaphore 或 CountDownLatch)来对此进行建模的好方法。有几个用例和要求:

    不要对任务将在哪个执行器上运行做出任何假设。事实上,为了简化,假设只有一个执行者。它可以是一个固定的线程池执行器,所以一个简单的实现可能会导致死锁(例如,想象一个任务提交另一个任务然后阻塞直到该子任务完成,现在想象其中几个任务用尽了所有线程)。 为简化起见,假设数据不是在任务之间流式传输(任务输出 -> 后续任务输入) - 完成任务和后续任务不必同时存在,因此后续任务的输入数据不会被前面的任务改变(因为它已经完成了)。 数据流“引擎”应该能够处理的操作只有几个:
      一个任务可以排队更多任务的机制 一种机制,在完成所有必需的输入任务之前,后续任务不会排队 一种机制,主线程(或其他不受执行程序管理的线程)阻塞直到流程完成 一种机制,主线程(或其他不受执行程序管理的线程)阻塞直到某些任务完成
    由于数据流是动态的(取决于任务的输入/状态),这些机制的激活应该发生在任务代码中,例如Callable 中的代码本身负责将更多的 Callables 排队。 数据流“内部”不应暴露给任务(可调用对象)本身 - 只有上面列出的操作才可用于任务。 请注意,所有任务的数据类型不一定相同,例如文件下载任务可以接受一个文件作为输入,但会输出一个字符串。 如果任务抛出未捕获的异常(表明某些致命错误需要停止所有数据流处理),它必须尽快传播到启动数据流的线程并取消所有任务(或者更奇特的错误,例如致命错误处理程序)。 应尽快启动任务。这与之前的要求一起应该排除简单的 Future 轮询 + Thread.sleep()。 作为奖励,我希望数据流引擎本身在每次任务完成或自上次任务完成后 X 时间内没有完成时执行一些操作(如日志记录)。比如:ExecutorCompletionService<T> ecs; while (hasTasks()) Future<T> future = ecs.poll(1 minute); some_action_like_logging(); if (future != null) future.get() ... ...

是否有直接的方法可以使用 Java 并发 API 完成所有这些操作?或者,如果无论 JDK 中有什么可用的东西都会变得复杂,那么是否有一个轻量级的库可以满足要求?我已经有一个适合我的特定用例的部分解决方案(它在某种程度上作弊,因为我使用了两个执行程序,所以你知道,它与我上面给出的网络浏览器示例完全无关),但我'希望看到更通用、更优雅的解决方案。

【问题讨论】:

【参考方案1】:

如何定义接口,例如:

interface Task extends Callable 
  boolean isReady();

然后,您的“数据流引擎”将只需要管理任务对象的集合,即允许将新的任务对象排队等待执行并允许查询给定任务的状态(因此也许上面的接口需要扩展以包括id 和/或类型)。当任务完成时(当然是引擎启动时),引擎必须只查询任何未启动的任务以查看它们现在是否准备就绪,如果是,则将它们传递给在执行程序上运行。正如您所提到的,任何日志记录等也可以在那时完成。

另一件可能有帮助的事情是使用 Guice (http://code.google.com/p/google-guice/) 或类似的轻量级 DI 框架来帮助正确连接所有对象(例如,确保创建正确的执行器类型,并确保任务可以为需要访问数据流引擎(例如,用于其 isReady 方法或用于排队其他任务)的实例提供一个实例,而无需引入复杂的循环关系。

HTH,但如果我遗漏了任何关键方面,请发表评论... 保罗。

【讨论】:

感谢您的回复。嗯,我开始意识到,我认为将任务与连接逻辑(要排队的新任务以及确定是否所有可用输入都准备好)解耦是一个更合适的建模。任务应该是独立的模块,并且应该有一个提供动态连接逻辑的子类或单独的对象(可能是另一个任务)。不确定这里是否真的需要 DI(更不用说 DI 框架) - 应该与此正交。让我试着写一些接口。我在工作 atm,所以我会在几个小时后回到这个问题。【参考方案2】:

看看https://github.com/rfqu/df4j——一个简单但功能强大的数据流库。如果缺少一些所需的功能,可以轻松添加。

【讨论】:

以上是关于使用 Java 并发 API 对动态数据流进行建模的技术的主要内容,如果未能解决你的问题,请参考以下文章

Java 8 中的 Streams API

Java高并发秒杀API之业务分析与DAO层

java并发之同步辅助类Phaser

UML建模图实战(全)

1.28 Java学习系列(二十八)UML建模的理解和图形整理

对12bit的AD建模后matlab写程序对其进行FFT,计算SNR并输出频谱图,程序该怎么写???急