如何使用 Apex 进行批处理?

Posted

技术标签:

【中文标题】如何使用 Apex 进行批处理?【英文标题】:How to make batch processing with Apex? 【发布时间】:2016-11-28 07:56:32 【问题描述】:

如何使用 Apache Apex 创建批处理应用程序?

我发现的所有示例都是流式应用程序,这意味着它们不会结束,我希望我的应用程序在处理完所有数据后关闭。

谢谢

【问题讨论】:

【参考方案1】:

您的用例是什么?原生支持批处理已在路线图上,目前正在进行中。

或者,在此之前,一旦您确定处理完成,输入运算符可以发送一个信号作为 ShutdownException(),该信号将通过 DAG 传播并关闭 DAG。

如果您需要更多详细信息,请告诉我们。

【讨论】:

我正在写几乎所有开源大数据处理引擎的比较作为我的硕士论文。我想创建一个顶点批处理部分(符合 Mapreduce、Flink 和 Spark)。我可能会暂时跳过它并继续进行流比较。 当然。使用它的方法是:在您的 endWindow() 调用中,检查您的任务是否完成 - 需要一些自定义逻辑。如果您的任务完成,请调用 ShuddownException() 并且您的整个管道将关闭。【参考方案2】:

您可以在运行应用程序之前添加退出条件。 例如

public void testMapOperator() throws Exception

   LocalMode lma = LocalMode.newInstance();
   DAG dag = lma.getDAG();

   NumberGenerator numGen = dag.addOperator("numGen", new NumberGenerator());
   FunctionOperator.MapFunctionOperator<Integer, Integer> mapper
    = dag.addOperator("mapper", new  FunctionOperator.MapFunctionOperator<Integer, Integer>(new Square()));
   ResultCollector collector = dag.addOperator("collector", new ResultCollector());

   dag.addStream("raw numbers", numGen.output, mapper.input);
   dag.addStream("mapped results", mapper.output, collector.input);

// Create local cluster
   LocalMode.Controller lc = lma.getController();
   lc.setHeartbeatMonitoringEnabled(false);

 //Condition to exit the application
  ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
  
     @Override
     public Boolean call() throws Exception
    
       return TupleCount == NumTuples;
    
  );

  lc.run();

  Assert.assertEquals(sum, 285);

完整代码请参考https://github.com/apache/apex-malhar/blob/master/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java

【讨论】:

在运行时环境方面一些更通用的解决方案怎么样?我希望可以选择它是本地环境还是集群环境。

以上是关于如何使用 Apex 进行批处理?的主要内容,如果未能解决你的问题,请参考以下文章

如何从 Windows 批处理执行驻留在 Oracle APEX 上的 SQL 脚本

Oracle APEX 交互式网格:如何使用 PLSQL 访问内容?

Apex 数据加载向导完成后如何运行附加进程

如何使用 Apache Apex 将数据从 DB2 批量摄取到 Vertica

使用 PLSQL 更新 Apex 表格形式

如何在没有 Web 源模块的情况下从 Oracle APEX 中的 API 获取数据