如何使用 Apex 进行批处理?
Posted
技术标签:
【中文标题】如何使用 Apex 进行批处理?【英文标题】:How to make batch processing with Apex? 【发布时间】:2016-11-28 07:56:32 【问题描述】:如何使用 Apache Apex 创建批处理应用程序?
我发现的所有示例都是流式应用程序,这意味着它们不会结束,我希望我的应用程序在处理完所有数据后关闭。
谢谢
【问题讨论】:
【参考方案1】:您的用例是什么?原生支持批处理已在路线图上,目前正在进行中。
或者,在此之前,一旦您确定处理完成,输入运算符可以发送一个信号作为 ShutdownException(),该信号将通过 DAG 传播并关闭 DAG。
如果您需要更多详细信息,请告诉我们。
【讨论】:
我正在写几乎所有开源大数据处理引擎的比较作为我的硕士论文。我想创建一个顶点批处理部分(符合 Mapreduce、Flink 和 Spark)。我可能会暂时跳过它并继续进行流比较。 当然。使用它的方法是:在您的 endWindow() 调用中,检查您的任务是否完成 - 需要一些自定义逻辑。如果您的任务完成,请调用 ShuddownException() 并且您的整个管道将关闭。【参考方案2】:您可以在运行应用程序之前添加退出条件。 例如
public void testMapOperator() throws Exception
LocalMode lma = LocalMode.newInstance();
DAG dag = lma.getDAG();
NumberGenerator numGen = dag.addOperator("numGen", new NumberGenerator());
FunctionOperator.MapFunctionOperator<Integer, Integer> mapper
= dag.addOperator("mapper", new FunctionOperator.MapFunctionOperator<Integer, Integer>(new Square()));
ResultCollector collector = dag.addOperator("collector", new ResultCollector());
dag.addStream("raw numbers", numGen.output, mapper.input);
dag.addStream("mapped results", mapper.output, collector.input);
// Create local cluster
LocalMode.Controller lc = lma.getController();
lc.setHeartbeatMonitoringEnabled(false);
//Condition to exit the application
((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
@Override
public Boolean call() throws Exception
return TupleCount == NumTuples;
);
lc.run();
Assert.assertEquals(sum, 285);
完整代码请参考https://github.com/apache/apex-malhar/blob/master/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java
【讨论】:
在运行时环境方面一些更通用的解决方案怎么样?我希望可以选择它是本地环境还是集群环境。以上是关于如何使用 Apex 进行批处理?的主要内容,如果未能解决你的问题,请参考以下文章
如何从 Windows 批处理执行驻留在 Oracle APEX 上的 SQL 脚本
Oracle APEX 交互式网格:如何使用 PLSQL 访问内容?