如何使用 Apache Apex 对 Kafka 0.9 运算符进行单元测试?

Posted

技术标签:

【中文标题】如何使用 Apache Apex 对 Kafka 0.9 运算符进行单元测试?【英文标题】:How to unit test Kafka 0.9 operator with Apache Apex? 【发布时间】:2016-04-22 14:56:54 【问题描述】:

从 users@apex.incubator.apache.org 转发

我想使用支持 0.9 版本协议的新 Kafka Operator 运行单元测试代码。

在这个过程中,我包含了 Malhar-Kafka 库版本 ( 3.3.1-incubating ) 并使用 Apex 引擎 ( 版本 3.3.0 ) 作为测试/提供。

编译工作正常,但我的单元测试无法正常运行,出现“java.lang.ClassNotFoundException: com.datatorrent.lib.util.KryoCloneUtils”异常。

使用与 Apex 引擎集成的 Kafka 0.9 运算符运行单元测试的推荐方法是什么?我假设 Malhar-contrib 库 Kafka 运算符不兼容 0.9 ..

单元测试代码是这样的:

CassandraEventDetailsS​​treamingApp 类扩展了以下 sn-p 代码中的 AbstractKafkaInputOperator。

异常出现在方法 lma.getController();

@Test
public void testApplication() throws IOException, Exception 
    try 
        LocalMode lma = LocalMode.newInstance();
        Configuration conf = new Configuration(false);
        conf.addResource(this.getClass().getResourceAsStream("/dag-test-props.xml"));
        lma.prepareDAG(new CassandraEventDetailsStreamingApp(), conf);
        LocalMode.Controller lc = lma.getController();
        lc.run();
     catch (ConstraintViolationException e) 
        Assert.fail("constraint violations: " + e.getConstraintViolations());
    

【问题讨论】:

【参考方案1】:

我能够通过从 Apex-engine 、 apex-api 的依赖部分中排除 Malhar-library 和 Malhar-contrib 的依赖项来解决问题。

这使得 Malhar 的 3.3.1-incubating 版本进入类路径,随后 Malhar-Kafka 库与 3.3.1-incubating 版本)。

【讨论】:

以上是关于如何使用 Apache Apex 对 Kafka 0.9 运算符进行单元测试?的主要内容,如果未能解决你的问题,请参考以下文章

重命名 Apache Apex 应用程序时,Kafka 运算符语义会发生变化吗?

将输入运算符动态添加到正在运行的 Apache Apex 应用程序

如何在 Apache Apex 中使用 JSON 创建 DAG?

如何从 Apache Apex 应用程序内部获取 ApplicationID?

如何重新启动 Apache Apex 应用程序?

如何在 DAG 中使用 Apache Apex Malhar RabbitMQ 运算符