如何使用 Kafka Streams 为应用程序编写单元测试用例
Posted
技术标签:
【中文标题】如何使用 Kafka Streams 为应用程序编写单元测试用例【英文标题】:How to write unit test case for application using Kafka Streams 【发布时间】:2021-08-21 14:52:06 【问题描述】:我们正在构建一个将使用 Kafka Streams 的应用程序。我们正在寻找示例示例,该示例向我们展示了如何为场景编写测试用例,其中我们从 Kafka 拓扑中调用外部服务。 基本上需要以某种方式模拟外部调用,因为服务可能不会一直运行。我们正在使用 TopologyTestDriver 编写测试用例。由于这个外部调用,我们的测试用例没有执行。出现错误:org.springframework.web.client.ResourceAccessException:对“http://localhost:8080/endPointName”的 POST 请求出现 I/O 错误:连接被拒绝:连接;嵌套异常是 java.net.ConnectException: Connection denied: connect
我们要为其编写测试用例的示例代码:
@Bean
public RestTemplate restTemplate()
return new RestTemplate();
public void method(StreamsBuilder builder)
builder.stream(inTopic,Consumed.with(StreamsSerdes.String(),new StreamsSerdes.CustomSerde()))
.peek((s, customObj) -> LOG.info(customObj))
.mapValues(this::getResult)
.peek((s, result) -> LOG.info(result))
.to(outputTopic,Produced.with(StreamsSerdes.String(),new ResultSerde()));
private Result getResult(Custom customObj)
HttpHeaders headers = new HttpHeaders();
headers.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON));
HttpEntity<Custom> request = new HttpEntity<>(customObj, headers);
return restTemplate.postForEntity(restCompleteUri, request, Result.class).getBody();
示例测试用例示例:
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@SpringBootTest
public class TopologyTest
private static TopologyTestDriver topologyTestDriver;
private static final Logger LOG = LogManager.getLogger();
@Autowired
private ConfigurableApplicationContext appContext;
@BeforeAll
void setUp()
Properties properties = getProperties();
StreamsBuilder builder = new StreamsBuilder();
appContext.getBean(PublisherSubscriberTopology.class).withBuilder(builder);
Topology topology = builder.build();
topologyTestDriver = new TopologyTestDriver(topology, properties);
private Properties getProperties()
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test:9092");
properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogDeserializationExceptionHandler.class.getName());
properties.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, CustomProductionExceptionHandler.class.getName());
return properties;
@Test
void testAppFlow()
Custom customObj = getCustomObj();
Result result = getResult();
ConsumerRecordFactory<String, Custom> resultFactory =
new ConsumerRecordFactory<>(inTopic,
StreamsSerdes.String().serializer(), StreamsSerdes.CustomSerde().serializer());
topologyTestDriver.pipeInput(resultFactory.create(
inTopic,
"1001",
customObj
));
ProducerRecord<String, Result> record =
topologyTestDriver.readOutput(
outputTopic,
StreamsSerdes.String().deserializer(),
StreamsSerdes.ResultSerde().deserializer()
);
assertAll(() -> assertEquals("abc123", record.value().getABC()));
private Custom getCustomObj()
Custom customObj = new Custom();
//setting customObj
return customObj;
private Result getResult()
Result result = new Result();
//setting resultObj
return result;
@AfterAll
static void tearDown()
try
topologyTestDriver.close();
catch (Exception e)
LOG.error(e.getMessage());
【问题讨论】:
【参考方案1】:在这种特殊情况下,考虑重构现有代码 - 将对 HTTP 的调用抽象到某个接口并模拟它。由于您无论如何都在使用spring,因此注入将与HTTP一起使用的bean,而不是调用
public void method(StreamsBuilder builder)
builder.stream(inTopic,Consumed.with(StreamsSerdes.String(),new StreamsSerdes.CustomSerde()))
.peek((s, customObj) -> LOG.info(customObj))
.mapValues(this::getResult)
.peek((s, result) -> LOG.info(result))
.to(outputTopic,Produced.with(StreamsSerdes.String(),new ResultSerde()));
private Result getResult(Custom customObj)
... HTTP call here ...
使用这样的东西:
class Whatever
@Autowired
private HttpFacade httpFacade;
public void method(StreamsBuilder builder) ...
private Result getResult(Custom customObj)
// httpFacade is int
httpFacade.callRemoteService(customObj);
@Component
class HttpFacade
public ABC callRemoteService(CustomObj)
... here comes the code that works with HttpClient
使用此设置,您可以在测试中模拟 HttpFacade(如果您在没有 spring 的情况下运行单元测试,则使用 @MockBean 或普通模拟) 并指定期望。
这是给你的具体案例。
一般来说,如果您必须测试 Http 请求是否填充了正确的 URL、标头、正文等。您可以使用 WireMock。
对于 Kafka Streams,由于它是 Kafka 的客户端库,您可以在测试前启动 Kafka docker Test Container(可能还有 Zookeeper),设置它以创建所需的主题,然后就可以开始了。
如果您想测试真正的 kafka 交互并且确实想确保消息到达 kafka 主题,然后被您的消费者消费等,这很有意义,换句话说,更复杂的情况。
如果您使用的是 spring kafka,也可以选择使用 Embedded Kafka,但我不确定它是否适用于 kafka 流 - 您应该尝试一下,但至少它是一个“可行”的方向
更新(基于 op 的评论):
在 spring 驱动测试中使用模拟 bean 时,您必须指定对该 bean 的期望:
@SpringBootTest // or whatever configuration you have to run spring driven test here
public class MyTest
@MockBean
private HttpFacade httpFacade;
@Test
public void myTest()
Mockito.when(httpFacade).callRemoteService(eq(<YOUR_EXPECTED_CUSTOM_OBJECT_1>)).thenReturn(<EXPECTED_RESPONSE_1);
... its possible to specify as many expectations as you wish...
... run the test code here that you probably already have...
关键是您实际上并不需要进行 HTTP 调用来测试您的 kafka 流代码!
【讨论】:
您能否分享/告诉修改后的测试用例在这种情况下将如何变成(在我的测试类-TopologyTest 中)。基本上如何在 TopologyTest 的测试用例中使用 Mock 端点? @vivek075 - 我已经更新了答案,请阅读并告诉我它是否适合您的需求 感谢您的回答。问题仍然存在,因为当执行将调用休息端点的拓扑时,瓶颈就在那里。如何触发模拟的休息端点。如果我不进行 http 调用(模拟),那么如何对 kafka 流进行单元测试;作为 rest 端点返回的结果将被推送到输出主题。 这正是模拟的重点。在这种情况下,如果您关心术语,那么准确地说是存根。它用于指定期望。因此,您“模拟”来自远程 HTTP 服务器的答案,这些答案无论如何都超出了您的测试上下文。您无需测试 HTTP 服务器是否正常工作,而是测试 kafka 流代码是否根据存根 HTTP 调用的结果执行其需要执行的操作。 拓扑执行时,该存根将如何注入,它不会执行实际的rest端点吗?如果你能告诉我我们如何修改我的@Test 案例以利用那个模拟端点?【参考方案2】:该问题与 Kafka Streams 无关。您依赖于拓扑中间的 HTTP 客户端(顺便说一下,不推荐),因此您需要询问如何测试它。
您需要注入一个模拟的RestTemplate
和restCompleteUri
变量来与一些假HTTP 端点进行通信。
例如,WireMock 或查看这些帖子
How do I mock a REST template exchange?
How to mock RestTemplate in Java Spring?
【讨论】:
你能告诉我推荐的方法是什么。基本上它是一个无状态的 Kafka 流;所以我们不能在 KTable 中维护任何东西。我们调用的其余端点只是一个转换器,它将输入转换为输出。我也知道我们需要模拟休息端点,但如何嵌入到 kafka 拓扑中。 TopologyTestDriver 应该为您处理,并且与 Rest 客户端无关,或者是无状态/有状态,正如我已经回答的那样。如果它“只是一个转换”,那么你应该在本地 JVM 中引入必要的代码来完全做到这一点,而不是使用外部 HTTP 端点 TopologyTestDriver 应该为您处理,并且与 Rest 客户端无关 - 如何?? StreamsBuilder 和 RestTemplate 是相互独立实例化的独立对象。你唯一的要求是两者都是非空的,但两者都可以被模拟/伪造 那么我应该假设它目前无法实现,还是 Kafka Streams 没有提供足够的功能来为这些场景编写单元测试用例?我对 Kafka Streams 很陌生。因此提出这些问题。如果您发现我的理解不正确,我们深表歉意。如果可行,请纠正我?以上是关于如何使用 Kafka Streams 为应用程序编写单元测试用例的主要内容,如果未能解决你的问题,请参考以下文章
如何限制kafka-streams中的rocksdb内存使用量
如何在单个 Kafka Streams 应用程序中连接到多个集群?
可以将 Kafka Streams 配置为等待 KTable 加载吗?