如何使用 Apache Apex 对 Kafka 0.9 运算符进行单元测试?
Posted
技术标签:
【中文标题】如何使用 Apache Apex 对 Kafka 0.9 运算符进行单元测试?【英文标题】:How to unit test Kafka 0.9 operator with Apache Apex? 【发布时间】:2016-04-22 14:56:54 【问题描述】:从 users@apex.incubator.apache.org 转发
我想使用支持 0.9 版本协议的新 Kafka Operator 运行单元测试代码。
在这个过程中,我包含了 Malhar-Kafka 库版本 ( 3.3.1-incubating ) 并使用 Apex 引擎 ( 版本 3.3.0 ) 作为测试/提供。
编译工作正常,但我的单元测试无法正常运行,出现“java.lang.ClassNotFoundException: com.datatorrent.lib.util.KryoCloneUtils”异常。
使用与 Apex 引擎集成的 Kafka 0.9 运算符运行单元测试的推荐方法是什么?我假设 Malhar-contrib 库 Kafka 运算符不兼容 0.9 ..
单元测试代码是这样的:
CassandraEventDetailsStreamingApp 类扩展了以下 sn-p 代码中的 AbstractKafkaInputOperator。
异常出现在方法 lma.getController();
@Test
public void testApplication() throws IOException, Exception
try
LocalMode lma = LocalMode.newInstance();
Configuration conf = new Configuration(false);
conf.addResource(this.getClass().getResourceAsStream("/dag-test-props.xml"));
lma.prepareDAG(new CassandraEventDetailsStreamingApp(), conf);
LocalMode.Controller lc = lma.getController();
lc.run();
catch (ConstraintViolationException e)
Assert.fail("constraint violations: " + e.getConstraintViolations());
【问题讨论】:
【参考方案1】:我能够通过从 Apex-engine 、 apex-api 的依赖部分中排除 Malhar-library 和 Malhar-contrib 的依赖项来解决问题。
这使得 Malhar 的 3.3.1-incubating 版本进入类路径,随后 Malhar-Kafka 库与 3.3.1-incubating 版本)。
【讨论】:
以上是关于如何使用 Apache Apex 对 Kafka 0.9 运算符进行单元测试?的主要内容,如果未能解决你的问题,请参考以下文章
重命名 Apache Apex 应用程序时,Kafka 运算符语义会发生变化吗?
将输入运算符动态添加到正在运行的 Apache Apex 应用程序
如何在 Apache Apex 中使用 JSON 创建 DAG?