KafkaProducer:`callback`和返回的`Future`之间的区别?
Posted
技术标签:
【中文标题】KafkaProducer:`callback`和返回的`Future`之间的区别?【英文标题】:KafkaProducer: Difference between `callback` and returned `Future`? 【发布时间】:2017-09-21 22:35:18 【问题描述】:KafkaProducer send method 都返回一个 Future 并接受一个回调。
在发送完成后使用一种机制而不是另一种机制来执行操作有什么根本区别吗?
【问题讨论】:
也许您必须使用返回的 Futures 序列手动实现回调的排序保证? 【参考方案1】:查看您链接到的文档,Future 和 Callback 之间的主要区别在于谁发起“请求已完成,现在呢?”问题。
假设我们有一个客户C
和一个面包师B
。 C
要求 B
给他做一个好吃的饼干。现在面包师可以通过 2 种可能的方式将美味的曲奇返还给顾客。
未来
面包师接受请求并告诉客户:好的,等我完成后,我会将您的饼干放在柜台上。 (本协议为Future
。)
在这种情况下,客户负责检查柜台 (Future
) 以查看面包师是否完成了他的饼干。
阻止 顾客在柜台附近看着它,直到饼干放在那里(Future.get())或面包师在那里道歉(错误:饼干面团用完了)。
非阻塞 客户做一些其他的工作,偶尔检查一下 cookie 是否在柜台上等着他(Future.isDone())。如果 cookie 准备好了,客户就会拿走它 (Future.get())。
回调
在这种情况下,客户在订购了他的饼干后告诉面包师:当我的饼干准备好后,请把它交给我的宠物机器狗,他会知道如何处理它(这个机器人就是回调)。
现在,当饼干准备好后,面包师将饼干交给狗,并告诉他跑回它的主人身边。面包师可以继续为其他顾客烘烤下一块饼干。
狗跑回顾客身边,开始摇摆它的人造尾巴,让顾客知道他的饼干已经准备好了。
请注意,客户不知道什么时候会给他饼干,他也没有主动询问面包师是否准备好了。
这是两个场景之间的主要区别。谁负责发起“你的 cookie 准备好了,你想用它做什么?”题。对于 Future,客户有责任通过积极等待或不时轮询来检查它何时准备就绪。在回调的情况下,面包师将回调提供的函数。
我希望这个答案能让您更好地了解 Future 和 Calback 究竟是什么。一旦你有了大致的想法,你就可以尝试找出每个特定的事情是在哪个线程上处理的。当一个线程被阻塞时,或者一切以什么顺序完成。编写一些简单的程序来打印诸如“主客户端线程:收到的 cookie”之类的语句可能是一种有趣的试验方式。
【讨论】:
【参考方案2】:异步方法
producer.send(record, new Callback()
@Override
onComplete(RecordMetadata rm, Exception ex)...
)
与同步相比,提供更好的吞吐量
RecordMetadata rm = producer.send(record).get();
因为您在第一种情况下不等待确认。
同样,在异步方式中,不能保证排序,而在同步方式中,只有在收到确认后才会发送消息。
另一个区别可能是,在同步调用中,如果发生异常,您可以在异常发生后立即停止发送消息,而在第二种情况下,一些消息将在您发现错误之前发送,并且执行一些操作。
另请注意,在异步方法中,“正在运行”的消息数量由max.in.flight.requests.per.connection
参数控制。
除了同步和异步方法之外,您还可以使用 Fire and Forget 方法,这与同步方法几乎相同,但不处理返回的元数据 - 只需发送消息并希望它到达broker(知道它很可能会发生,如果出现可恢复的错误,生产者会重试),但有可能会丢失一些消息:
RecordMetadata rm = producer.send(record);
总结一下:
Fire and Forget - 最快的一种,但有些消息可能会丢失; 同步 - 最慢,如果您无法承受丢失消息的损失,请使用它; 异步 - 介于两者之间。【讨论】:
如果我不马上在返回的Future
上调用get
会不会也是异步的?
但我想能够使其同步是 Future 方法的一个优势。
添加有关具有排序保证的回调的位,然后这是公认的答案。
嗯,添加了@Thilo
在所有示例中,我看到每次我们执行 producer.send() 时,我们也在创建一个新的回调对象。可以一次创建回调并用于所有 send()。像 CallBack c = new Callback(); producer.send(message,c);【参考方案3】:
主要区别在于是否要阻塞等待确认的调用线程。
以下使用Future.get() 方法会阻塞当前线程,直到发送完成后再执行某些操作。
producer.send(record).get()
// Do some action
当使用回调执行某些操作时,代码将在 I/O 线程中执行,因此它对于调用线程是非阻塞的。
producer.send(record,
new Callback()
// Do some action
);
虽然the docs 说它“通常”在生产者中执行:
请注意,回调通常会在生产者的 I/O 线程中执行,因此应该相当快,否则它们会延迟从其他线程发送消息。如果要执行阻塞或计算量大的回调,建议在回调主体中使用自己的 Executor 来并行处理。
【讨论】:
好的,但我不需要马上拨打Future
上的get
。我可以推迟到它完成。
我不确定我是否理解正确,但是如果您不知道对 send() 的异步调用已完成,如何推迟对 get() 的调用?这就是为什么 Future 的 get() 方法会阻塞,直到结果可用。
有Future.isDone
。如果你在它变成true
之后才调用get
,get
将立即返回。最终必须有人轮询或阻塞才能获得结果,但不一定是调用 Kafka 的线程。
您可以收集所有Future
对象,然后在事务结束时一次性解决它们(无论这在您自己的上下文中意味着什么)。从理论上讲,这应该意味着等待仅在您完成所有处理后才会发生。文档和示例实践有点不清楚您是否可以为每个目标主题分区保留并解决一个 Future,或者这样做是否有任何优势。【参考方案4】:
我的观察基于The Kafka Producer documentation:
Future
让您可以访问同步处理
Future
可能不保证确认。我的理解是Callback
将在确认后执行
Callback
让您可以访问完全非阻塞异步处理。
还可以保证同一分区上回调的执行顺序
保证被发送到同一分区的记录的回调 按顺序执行。
我的另一个观点是Future
返回对象和Callback
'模式' 代表了两种不同的编程风格,我认为这是根本的区别:
Future
代表 Java 的并发模型样式。
Callback
代表 Java 的 Lambda 编程风格(因为 Callback 实际上满足了函数式接口的要求)
您最终可能会使用 Future
和 Callback
样式编写类似的行为,但在某些用例中,看起来一种样式可能比另一种更有利。
【讨论】:
您关于 Future 不保证确认的说法是错误的。在调用回调之后,Future 被标记为完成。参见 kafka java 客户端中的函数 completeFutureAndFireCallbacks@ProducerBatch 类。【参考方案5】:send() 是一种开始在 Kafka 集群上发布消息的方法。 send() 方法是一个异步调用,表示 send 方法在 Buffer 中累积消息并立即返回。这可以与 linger.ms 一起使用来批量发布消息以获得更好的性能。我们可以使用调用 send 方法处理异常和控制,同步使用 Future 上的 get 方法或异步回调。
每种方法都有自己的优缺点,可以根据用例来决定。
异步发送(Fire & Forget): 我们如下调用 send 方法调用发布消息,无需等待任何成功或错误响应。
producer.send(new ProducerRecord<String, String>("topic-name", "key", "value"));
此方案不会等待完成第一条消息开始发送其他消息以进行发布。如果出现异常,生产者会根据重试配置参数重试,但如果重试后消息仍然失败,Kafka 生产者永远不会知道这一点。在这种情况下,我们可能会大量发送一些消息,但如果消息丢失很少,这将提供高吞吐量和高延迟。
同步发送 同步发送消息的一个简单方法是使用 get() 方法
RecordMetadata recMetadata = producer.send(new ProducerRecord<String, String>("topic-name", "key", "value")).get();
Producer.send 返回 RecordMetadata 的 Future,当我们调用 .get() 方法时,它会得到 Kafka 的回复。我们可以在错误的情况下捕获错误或在成功的情况下返回 RecordMetadata。 RecordMetadata 包含偏移量、分区、时间戳来记录信息。它很慢,但提供了高可靠性并保证传递消息。
带回调的异步发送 我们还可以使用回调函数调用 send() 方法,该回调函数会在消息完成后返回响应。如果您喜欢以异步方式发送消息意味着不等待完成工作但同时处理错误或更新有关消息传递的状态,这很好。
producer.send(record, new Callback()
@Override
onComplete(RecordMetadata recodMetadata, Exception ex)...
)
注意:请不要将 ack & retries 与异步发送调用混淆。确认和重试将应用于每个发送调用,无论是同步调用还是异步调用,唯一的问题是如何处理返回消息和失败情况。例如,如果您发送异步发送仍然 ack 和 retries 规则被应用,但将在一个独立的线程上,而不阻塞其他线程发送并行记录。失败时我们不会意识到的唯一挑战以及消息成功完成的时间。
【讨论】:
以上是关于KafkaProducer:`callback`和返回的`Future`之间的区别?的主要内容,如果未能解决你的问题,请参考以下文章
Kafka Producer拦截器(Interceptor)
4深潜KafkaProducer —— RecordAccumulator精析