断言 Kafka 发送工作

Posted

技术标签:

【中文标题】断言 Kafka 发送工作【英文标题】:Assert Kafka send worked 【发布时间】:2019-03-09 23:38:21 【问题描述】:

我正在用 Spring Boot 编写一个应用程序,所以我要写信给 Kafka:

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

然后在我的方法里面:

kafkaTemplate.send(topic, data)

但我觉得我只是依靠这个来工作,我怎么知道这是否有效?如果它是异步的,返回 200 代码并希望它确实有效是一个好习惯吗?我糊涂了。如果 Kafka 不可用,这不会失败吗?不应该提示我捕获异常吗?

【问题讨论】:

【参考方案1】:

您可以在向 kafka 发送消息时使用以下命令:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-name

当上面的命令运行时,你应该运行你的代码,如果发送消息成功,那么消息必须打印在控制台上。

此外,与任何资源的任何其他连接一样,如果无法建立连接,则执行任何类型的操作都会引发一些异常。

【讨论】:

他在询问如何从 java 代码库中验证这一点,而不是运行外部命令。 @mjuarez 哇,那么这是否意味着如果我们使用基本的 cosnole 消费者它不起作用? :)【参考方案2】:

是的,如果 Kafka 不可用,.send() 调用将失败,但如果您异步发送,则不会通知任何人。您可以指定一个回调,当未来最终完成时要执行。完整的接口规范在这里:https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/producer/Callback.html

来自官方 Kafka javadoc:https://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

完全非阻塞使用可以利用回调参数来 提供在请求完成时将调用的回调。

  ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value); 
      producer.send(myRecord,
           new Callback() 
               public void onCompletion(RecordMetadata metadata, Exception e) 
                   if(e != null) 
                      e.printStackTrace();
                    else 
                      System.out.println("The offset of the record we just sent is: " + metadata.offset());
                   
               
           );

【讨论】:

【参考方案3】:

除了@mjuarez 提到的内容之外,您还可以尝试使用两个 Kafka 生产者属性。一个是ProducerConfig.ACKS_CONFIG,它允许您设置您认为对您的用例安全的确认级别。这个旋钮有三个可能的值。来自卡夫卡doc

acks=0: Producer 不关心来自服务器的确认,并认为它已发送。 acks=1:这意味着领导者会将记录写入其本地日志,但会在不等待所有追随者完全确认的情况下做出响应。 acks=all:这意味着领导者将等待完整的同步副本集来确认记录。

另一个属性是ProducerConfig.RETRIES_CONFIG。设置大于零的值将导致客户端重新发送任何发送失败的记录,并可能出现暂时性错误。

【讨论】:

以上是关于断言 Kafka 发送工作的主要内容,如果未能解决你的问题,请参考以下文章

接口自动化测试实践指导(下):接口自动化测试断言设置思路

JMeter断言处理之响应断言

断言消息被发送到自己的 OCUnit

未发送预期的 [App\Mail\WelcomeEmail] 可邮寄。无法断言 false 为 true

Jmeter 02 Jmeter断言之响应断言

如何断言应用程序使用 POST 请求向 API 服务器发送正确的数据