断言 Kafka 发送工作
Posted
技术标签:
【中文标题】断言 Kafka 发送工作【英文标题】:Assert Kafka send worked 【发布时间】:2019-03-09 23:38:21 【问题描述】:我正在用 Spring Boot 编写一个应用程序,所以我要写信给 Kafka:
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
然后在我的方法里面:
kafkaTemplate.send(topic, data)
但我觉得我只是依靠这个来工作,我怎么知道这是否有效?如果它是异步的,返回 200 代码并希望它确实有效是一个好习惯吗?我糊涂了。如果 Kafka 不可用,这不会失败吗?不应该提示我捕获异常吗?
【问题讨论】:
【参考方案1】:您可以在向 kafka 发送消息时使用以下命令:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-name
当上面的命令运行时,你应该运行你的代码,如果发送消息成功,那么消息必须打印在控制台上。
此外,与任何资源的任何其他连接一样,如果无法建立连接,则执行任何类型的操作都会引发一些异常。
【讨论】:
他在询问如何从 java 代码库中验证这一点,而不是运行外部命令。 @mjuarez 哇,那么这是否意味着如果我们使用基本的 cosnole 消费者它不起作用? :)【参考方案2】:是的,如果 Kafka 不可用,.send()
调用将失败,但如果您异步发送,则不会通知任何人。您可以指定一个回调,当未来最终完成时要执行。完整的接口规范在这里:https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/producer/Callback.html
来自官方 Kafka javadoc:https://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
完全非阻塞使用可以利用回调参数来 提供在请求完成时将调用的回调。
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value); producer.send(myRecord, new Callback() public void onCompletion(RecordMetadata metadata, Exception e) if(e != null) e.printStackTrace(); else System.out.println("The offset of the record we just sent is: " + metadata.offset()); );
【讨论】:
【参考方案3】:除了@mjuarez 提到的内容之外,您还可以尝试使用两个 Kafka 生产者属性。一个是ProducerConfig.ACKS_CONFIG
,它允许您设置您认为对您的用例安全的确认级别。这个旋钮有三个可能的值。来自卡夫卡doc
acks=0
: Producer 不关心来自服务器的确认,并认为它已发送。
acks=1
:这意味着领导者会将记录写入其本地日志,但会在不等待所有追随者完全确认的情况下做出响应。
acks=all
:这意味着领导者将等待完整的同步副本集来确认记录。
另一个属性是ProducerConfig.RETRIES_CONFIG
。设置大于零的值将导致客户端重新发送任何发送失败的记录,并可能出现暂时性错误。
【讨论】:
以上是关于断言 Kafka 发送工作的主要内容,如果未能解决你的问题,请参考以下文章