如何使用 Kafka Streams 为应用程序编写单元测试用例

Posted

技术标签:

【中文标题】如何使用 Kafka Streams 为应用程序编写单元测试用例【英文标题】:How to write unit test case for application using Kafka Streams 【发布时间】:2021-08-21 14:52:06 【问题描述】:

我们正在构建一个将使用 Kafka Streams 的应用程序。我们正在寻找示例示例,该示例向我们展示了如何为场景编写测试用例,其中我们从 Kafka 拓扑中调用外部服务。 基本上需要以某种方式模拟外部调用,因为服务可能不会一直运行。我们正在使用 TopologyTestDriver 编写测试用例。由于这个外部调用,我们的测试用例没有执行。出现错误:org.springframework.web.client.ResourceAccessException:对“http://localhost:8080/endPointName”的 POST 请求出现 I/O 错误:连接被拒绝:连接;嵌套异常是 java.net.ConnectException: Connection denied: connect

我们要为其编写测试用例的示例代码:

@Bean
public RestTemplate restTemplate() 
    return new RestTemplate();


public void method(StreamsBuilder builder) 
    builder.stream(inTopic,Consumed.with(StreamsSerdes.String(),new StreamsSerdes.CustomSerde()))
        .peek((s, customObj) -> LOG.info(customObj))
        .mapValues(this::getResult)
        .peek((s, result) -> LOG.info(result))
        .to(outputTopic,Produced.with(StreamsSerdes.String(),new ResultSerde()));


private Result getResult(Custom customObj) 
    HttpHeaders headers = new HttpHeaders();
    headers.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON));
    HttpEntity<Custom> request = new HttpEntity<>(customObj, headers);
    return restTemplate.postForEntity(restCompleteUri, request, Result.class).getBody();

示例测试用例示例:

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@SpringBootTest
public class TopologyTest 
    private static TopologyTestDriver topologyTestDriver;
    private static final Logger LOG = LogManager.getLogger();

@Autowired
private ConfigurableApplicationContext appContext;

@BeforeAll
void setUp() 
    Properties properties = getProperties();

    StreamsBuilder builder = new StreamsBuilder();
    appContext.getBean(PublisherSubscriberTopology.class).withBuilder(builder);
    Topology topology = builder.build();

    topologyTestDriver = new TopologyTestDriver(topology, properties);


private Properties getProperties() 
    Properties properties = new Properties();
    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test:9092");
    properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogDeserializationExceptionHandler.class.getName());
    properties.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, CustomProductionExceptionHandler.class.getName());
    return properties;


@Test
void testAppFlow() 
    Custom customObj = getCustomObj();
    Result result = getResult();

    ConsumerRecordFactory<String, Custom> resultFactory =
            new ConsumerRecordFactory<>(inTopic,
                    StreamsSerdes.String().serializer(), StreamsSerdes.CustomSerde().serializer());

    topologyTestDriver.pipeInput(resultFactory.create(
            inTopic,
            "1001",
            customObj
    ));

    ProducerRecord<String, Result> record =
            topologyTestDriver.readOutput(
                    outputTopic,
                    StreamsSerdes.String().deserializer(),
                    StreamsSerdes.ResultSerde().deserializer()
            );

    assertAll(() -> assertEquals("abc123", record.value().getABC()));


private Custom getCustomObj() 
    Custom customObj = new Custom();
    //setting customObj
    return customObj;


private Result getResult() 
    Result result = new Result();
    //setting resultObj
    return result;


@AfterAll
static void tearDown() 
    try 
        topologyTestDriver.close();
     catch (Exception e) 
        LOG.error(e.getMessage());
    

【问题讨论】:

【参考方案1】:

在这种特殊情况下,考虑重构现有代码 - 将对 HTTP 的调用抽象到某个接口并模拟它。由于您无论如何都在使用spring,因此注入将与HTTP一起使用的bean,而不是调用

public void method(StreamsBuilder builder) 
    builder.stream(inTopic,Consumed.with(StreamsSerdes.String(),new StreamsSerdes.CustomSerde()))
        .peek((s, customObj) -> LOG.info(customObj))
        .mapValues(this::getResult)
        .peek((s, result) -> LOG.info(result))
        .to(outputTopic,Produced.with(StreamsSerdes.String(),new ResultSerde()));


private Result getResult(Custom customObj) 
   ... HTTP call here ...
  

使用这样的东西:


class Whatever 
  @Autowired
  private HttpFacade httpFacade;

  public void method(StreamsBuilder builder) ...

  private Result getResult(Custom customObj) 
      // httpFacade is int
      httpFacade.callRemoteService(customObj);
   


@Component
class HttpFacade 
   public ABC callRemoteService(CustomObj) 
     ... here comes the code that works with HttpClient
   

使用此设置,您可以在测试中模拟 HttpFacade(如果您在没有 spring 的情况下运行单元测试,则使用 @MockBean 或普通模拟) 并指定期望。

这是给你的具体案例。

一般来说,如果您必须测试 Http 请求是否填充了正确的 URL、标头、正文等。您可以使用 WireMock。

对于 Kafka Streams,由于它是 Kafka 的客户端库,您可以在测试前启动 Kafka docker Test Container(可能还有 Zookeeper),设置它以创建所需的主题,然后就可以开始了。

如果您想测试真正的 kafka 交互并且确实想确保消息到达 kafka 主题,然后被您的消费者消费等,这很有意义,换句话说,更复杂的情况。

如果您使用的是 spring kafka,也可以选择使用 Embedded Kafka,但我不确定它是否适用于 kafka 流 - 您应该尝试一下,但至少它是一个“可行”的方向

更新(基于 op 的评论):

在 spring 驱动测试中使用模拟 bean 时,您必须指定对该 bean 的期望:


@SpringBootTest // or whatever configuration you have to run spring driven test here
public class MyTest 

   @MockBean
   private HttpFacade httpFacade;

   @Test
   public void myTest() 
     Mockito.when(httpFacade).callRemoteService(eq(<YOUR_EXPECTED_CUSTOM_OBJECT_1>)).thenReturn(<EXPECTED_RESPONSE_1);
... its possible to specify as many expectations as you wish...

     ... run the test code here that you probably already have...
   


关键是您实际上并不需要进行 HTTP 调用来测试您的 kafka 流代码!

【讨论】:

您能否分享/告诉修改后的测试用例在这种情况下将如何变成(在我的测试类-TopologyTest 中)。基本上如何在 TopologyTest 的测试用例中使用 Mock 端点? @vivek075 - 我已经更新了答案,请阅读并告诉我它是否适合您的需求 感谢您的回答。问题仍然存在,因为当执行将调用休息端点的拓扑时,瓶颈就在那里。如何触发模拟的休息端点。如果我不进行 http 调用(模拟),那么如何对 kafka 流进行单元测试;作为 rest 端点返回的结果将被推送到输出主题。 这正是模拟的重点。在这种情况下,如果您关心术语,那么准确地说是存根。它用于指定期望。因此,您“模拟”来自远程 HTTP 服务器的答案,这些答案无论如何都超出了您的测试上下文。您无需测试 HTTP 服务器是否正常工作,而是测试 kafka 流代码是否根据存根 HTTP 调用的结果执行其需要执行的操作。 拓扑执行时,该存根将如何注入,它不会执行实际的rest端点吗?如果你能告诉我我们如何修改我的@Test 案例以利用那个模拟端点?【参考方案2】:

该问题与 Kafka Streams 无关。您依赖于拓扑中间的 HTTP 客户端(顺便说一下,不推荐),因此您需要询问如何测试它。

您需要注入一个模拟的RestTemplaterestCompleteUri 变量来与一些假HTTP 端点进行通信。

例如,WireMock 或查看这些帖子

How do I mock a REST template exchange?

How to mock RestTemplate in Java Spring?

【讨论】:

你能告诉我推荐的方法是什么。基本上它是一个无状态的 Kafka 流;所以我们不能在 KTable 中维护任何东西。我们调用的其余端点只是一个转换器,它将输入转换为输出。我也知道我们需要模拟休息端点,但如何嵌入到 kafka 拓扑中。 TopologyTestDriver 应该为您处理,并且与 Rest 客户端无关,或者是无状态/有状态,正如我已经回答的那样。如果它“只是一个转换”,那么你应该在本地 JVM 中引入必要的代码来完全做到这一点,而不是使用外部 HTTP 端点 TopologyTestDriver 应该为您处理,并且与 Rest 客户端无关 - 如何?? StreamsBuilder 和 RestTemplate 是相互独立实例化的独立对象。你唯一的要求是两者都是非空的,但两者都可以被模拟/伪造 那么我应该假设它目前无法实现,还是 Kafka Streams 没有提供足够的功能来为这些场景编写单元测试用例?我对 Kafka Streams 很陌生。因此提出这些问题。如果您发现我的理解不正确,我们深表歉意。如果可行,请纠正我?

以上是关于如何使用 Kafka Streams 为应用程序编写单元测试用例的主要内容,如果未能解决你的问题,请参考以下文章

如何限制kafka-streams中的rocksdb内存使用量

如何在单个 Kafka Streams 应用程序中连接到多个集群?

可以将 Kafka Streams 配置为等待 KTable 加载吗?

Kafka Streams 在 HDFS 上查找数据

为啥 Apache Kafka Streams 使用 RocksDB 以及如何改变它?

Kafka Streams:使用相同的 `application.id` 从多个主题消费