Spring Kafka 异步发送调用块
Posted
技术标签:
【中文标题】Spring Kafka 异步发送调用块【英文标题】:Spring Kafka asynchronous send calls block 【发布时间】:2017-12-18 11:25:17 【问题描述】:我使用的是 Spring-Kafka 版本 1.2.1,当 Kafka 服务器关闭/无法访问时,异步发送调用会阻塞一段时间。这似乎是 TCP 超时。代码是这样的:
ListenableFuture<SendResult<K, V>> future = kafkaTemplate.send(topic, key, message);
future.addCallback(new ListenableFutureCallback<SendResult<K, V>>()
@Override
public void onSuccess(SendResult<K, V> result)
...
@Override
public void onFailure(Throwable ex)
...
);
我快速浏览了 Spring-Kafka 代码,它似乎只是将任务传递给 kafka 客户端库,将回调交互转换为未来的对象交互。查看kafka客户端库,代码变得更加复杂,我没有花时间去理解它,但我猜它可能是在同一个线程中进行远程调用(至少是元数据?)。
作为用户,我希望返回未来的 Spring-Kafka 方法立即返回,即使远程 kafka 服务器无法访问。
如果我的理解是错误的或者这是一个错误,任何确认都将受到欢迎。我最终暂时将其设为异步。
另一个问题是 Spring-Kafka 文档一开始就说它提供了同步和异步发送方法。我找不到任何不返回期货的方法,也许文档需要更新。
如果需要,我很乐意提供更多详细信息。谢谢。
【问题讨论】:
你能找到解决方案吗? 不。我已经实现了我最终需要的东西,并且一直保持这种状态。既然已经很久了,也许更新的版本解决了这些问题?你试过 2.1.10 和 2.2.0 吗? 【参考方案1】:除了配置类上的@EnableAsync注解外,如果您调用此代码,还需要在方法上使用@Async注解。
http://www.baeldung.com/spring-async
这里有一些代码片段。 Kafka 生产者配置:
@EnableAsync
@Configuration
public class KafkaProducerConfig
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);
@Value("$kafka.brokers")
private String servers;
@Bean
public Map<String, Object> producerConfigs()
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return props;
@Bean
public ProducerFactory<String, GenericMessage> producerFactory(ObjectMapper objectMapper)
return new DefaultKafkaProducerFactory<>(producerConfigs(), new StringSerializer(), new JsonSerializer(objectMapper));
@Bean
public KafkaTemplate<String, GenericMessage> kafkaTemplate(ObjectMapper objectMapper)
return new KafkaTemplate<String, GenericMessage>(producerFactory(objectMapper));
@Bean
public Producer producer()
return new Producer();
还有制作人本身:
public class Producer
public static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
@Autowired
private KafkaTemplate<String, GenericMessage> kafkaTemplate;
@Async
public void send(String topic, GenericMessage message)
ListenableFuture<SendResult<String, GenericMessage>> future = kafkaTemplate.send(topic, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, GenericMessage>>()
@Override
public void onSuccess(final SendResult<String, GenericMessage> message)
LOGGER.info("sent message= " + message + " with offset= " + message.getRecordMetadata().offset());
@Override
public void onFailure(final Throwable throwable)
LOGGER.error("unable to send message= " + message, throwable);
);
【讨论】:
感谢您的回复。不,我没有使用这些注释,文档中没有关于它们的内容。我会尝试两种方法,如果它可以解决问题,请告诉您。 不幸的是,使用 EnableAsync 并没有改变任何东西。另外,从链接中我了解到应该使用 Async 注释的是 spring-kafka 库,因为它为我提供了未来的对象。 我同意你的观点,对我来说,你提供期货没有意义,但无论如何我必须放置注释。在我们的例子中,放置这两个注释使它像一个魅力一样工作。我将编辑响应添加一些代码片段。 我对你的代码片段的理解是你自己的代码是异步的,用spring的方式来做。它对你有用,因为 send 方法返回 void,所以 Spring 在一个新线程中执行它的内容并立即返回给 send 的调用者。但是,您进行的 spring-kafka 调用保持同步。您的代码与我在代码中为解决问题所做的类似,但我使用的是纯 Java,因为我需要更多控制。对于您和我的解决方案,如果 spring-kafka 做得对,我希望执行流程能够更改线程 2 次。【参考方案2】:如果我看一下KafkaProducer本身,发送消息有两个部分:
-
将消息存储到内部缓冲区中。
将消息从缓冲区上传到 Kafka。
KafkaProducer 对于第二部分是异步的,而不是第一部分。
send() 方法仍然可以在第一部分被阻塞并最终抛出 TimeoutExceptions,例如:
主题的元数据未缓存或陈旧,因此生产者尝试从服务器获取元数据以了解主题是否仍然存在以及它有多少分区。 缓冲区已满(默认为 32MB)。如果服务器完全没有响应,您可能会遇到这两个问题。
更新:
我在 Kafka 2.2.1 中测试并确认了这一点。看起来这种行为在 2.4 和/或 2.6 中可能有所不同:KAFKA-3720
【讨论】:
哇,将近 3 年的实际答案,谢谢。我什至不再使用它,但实际上我现在花时间查找当前文档以查看是否在那里进行了描述,但事实并非如此。该文档当然需要改进。 对不起,我按回车并发送了评论,然后花了 5 多分钟来编辑...所以这里休息一下:我没想到会出现这种行为,我预计会从未来的对象中得到任何错误使用异步发送方法。在同步和异步版本中返回未来也无济于事......再次感谢您抽出宝贵时间。 其实我自己也遇到过这个问题,在我意识到它已经 3 岁之前就回答了你的问题 :-) 阅读您提到的问题,这种行为似乎是合理的。但是,这确实需要在文档中进行解释。但正如那里所说,即使是测试也期待不同的行为:P【参考方案3】:最佳解决方案是在 Producer 级别添加一个“回调”侦听器。
@Bean
public KafkaTemplate<String, WebUserOperation> operationKafkaTemplate()
KafkaTemplate<String, WebUserOperation> kt = new KafkaTemplate<>(operationProducerFactory());
kt.setProducerListener(new ProducerListener<String, WebUserOperation>()
@Override
public void onSuccess(ProducerRecord<String, WebUserOperation> record, RecordMetadata recordMetadata)
System.out.println("### Callback :: " + recordMetadata.topic() + " ; partition = "
+ recordMetadata.partition() +" with offset= " + recordMetadata.offset()
+ " ; Timestamp : " + recordMetadata.timestamp() + " ; Message Size = " + recordMetadata.serializedValueSize());
@Override
public void onError(ProducerRecord<String, WebUserOperation> producerRecord, Exception exception)
System.out.println("### Topic = " + producerRecord.topic() + " ; Message = " + producerRecord.value().getOperation());
exception.printStackTrace();
);
return kt;
【讨论】:
我知道我迟到了将近 18 个月,但如果您/有人能解释一下如何解决所描述的问题,将会很有帮助。【参考方案4】:只是为了确定。您是否应用了 @EnableAsync 注释?我想说这可能是指定 Future
行为的关键【讨论】:
感谢您的回复。不,我没有使用这个注释,文档中没有任何关于它的内容。我会尝试一下,如果它解决了问题,请告诉您。 不幸的是,使用 @EnableAsync 并没有改变任何东西 =/【参考方案5】:下面的代码可以让我异步获得响应
ProducerRecord<UUID, Person> person = new ProducerRecord<>(kafkaTemplate.getDefaultTopic(), messageKey,Person);
Runnable runnable = () -> kafkaTemplate.send(person).addCallback(new MessageAckHandler());
new Thread(runnable).start();
public class MessageAckHandler implements ListenableFutureCallback<SendResult<UUID,Person>>
@Override
public void onFailure(Throwable exception)
log.error("unable to send message: " + exception.getMessage());
@Override
public void onSuccess(SendResult<UUID, ScreeningEvent> result)
log.debug("sent message with offset= messageID=", result.getRecordMetadata().offset(), result.getProducerRecord().key());
public class SendResult<K, V>
private final ProducerRecord<K, V> producerRecord;
private final RecordMetadata recordMetadata;
public SendResult(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata)
this.producerRecord = producerRecord;
this.recordMetadata = recordMetadata;
public ProducerRecord<K, V> getProducerRecord()
return this.producerRecord;
public RecordMetadata getRecordMetadata()
return this.recordMetadata;
@Override
public String toString()
return "SendResult [producerRecord=" + this.producerRecord + ", recordMetadata=" + this.recordMetadata + "]";
【讨论】:
这当然可行,但您只是自己使其异步,这正是我当时所做的并在原始问题中所说的。那时我想知道这是否是 spring kafka 中的一个错误,或者我是否遗漏了什么...... @CarlosE.L.Augusto:请分享您为处理代码问题所做的工作。以上是关于Spring Kafka 异步发送调用块的主要内容,如果未能解决你的问题,请参考以下文章
spring学习笔记(15)趣谈spring 事件:实现业务逻辑解耦,异步调用提升用户体验