如何改善响应式 kafka(Scala 加 Akka Streams)的缓慢性能?
Posted
技术标签:
【中文标题】如何改善响应式 kafka(Scala 加 Akka Streams)的缓慢性能?【英文标题】:How to improve slow performance of reactive-kafka (Scala plus Akka Streams)? 【发布时间】:2016-07-17 12:45:03 【问题描述】:我正在将我的项目从 RabbitMQ 转移到 Kafka,并试图了解 reactive-kafka 的速度。
我目前能够以大约 12K/秒的速度向 Rabbit 写入大约 12K/秒的普通消息,并在读取时通过“hello world”流以大约 4K/秒的速度从队列中拉出微不足道的数据。
我使用反应流迁移到 Kafka,我可以写大概 1M/秒——巨大的胜利! 但是在相同的环境中,我只能使用此处示例中的方法以大约 2K/秒的速度通过读取流:DummyConsumer.scala
有没有人知道如何将读数恢复到与 Rabbit 方法相当的水平?
有趣:我只是“直接”尝试了它(通过原始 Java 驱动程序与响应式 kafka 访问 Kafka)并且获得了大约 22K 的读取,所以这非常好。它说明了我如何使用响应式卡夫卡正在减慢速度。
好的...我正在寻找这个东西。接下来我尝试了一个原始的 Akka 流“hello world”:
now = System.currentTimeMillis()
count = 0
val in2 = Source(1 to num)
val g = RunnableGraph.fromGraph(GraphDSL.create() implicit b =>
import GraphDSL.Implicits._
val show = Flow[Int].map i => count +=1; if(count==num) println(s"time 2 ($count): "+(System.currentTimeMillis() - now)); i
in2 ~> show ~> Sink.ignore
ClosedShape
)
g.run()
Thread.sleep(2000)
这以非常快的 742K/秒运行!所以 Kafka raw 速度很快,而 Akka 流速度很快。因此,罪魁祸首在于反应式 kafka 的构建方式(或更可能)我尝试使用它的方式。考虑到摩擦,我应该期望看到接近原始 kafka 的 22K/秒的东西。嗯。
【问题讨论】:
嗨 Greg,这里的反应式 kafka 开发人员。我们正在完成 reactive-kafka 0.11,它应该会给你带来更好的性能。但是,旧版本的性能应该比仅 2K/s 更高。你的代码还在吗?我想看看你如何使用 reactive-kafka。 不幸的是我没有那个代码了。基本上我正在从文档中的示例代码中进行剪切粘贴......没什么特别的。我转移并开发了一个项目 LateKafka(在 github 中),为 Akka 流创建源。它真的很快——我正在为一个微不足道的流提取每秒 6 位数的事务处理 kafka-thru-stream。关于 0.11 的激动人心的消息。试试看会很酷。 嗨 Jacek,我刚刚使用新的 0.11 版本重建了我的测试。它比 M 系列快得多,但遗憾的是我仍然无法重现其他人显示的高数字。注意我只是想获得一些性能——我的真实用例使用流 DSL,文档中没有显示。我在另一个问题中发布了代码:***.com/questions/39617827/… 嗨 Greg,我们从 akka-streams 切换到使用 vanilla Kafka 也看到了良好的性能提升。 【参考方案1】:不是一个完整的答案,但请再试一次,但最近(2016 年 9 月)Akka Streams Kafka 0.11
有显着的性能改进。我们已经完成了一些基准测试,虽然 react-kafka 包装器仍然存在一些开销(尽管请记住,您也会得到一些回报:例如,所有好处都来自具有异步背压的良好 API!),总体数字看起来非常好,而且越来越好。
这里有几个常见的Kafka使用场景,比较旧的reactive-kafka版本(M4),当前版本(0.11),以及使用普通Kafka实现的等效功能
Producers
/Consumers
(但没有考虑到,当然,与任何其他反应性组件连接)。
在Krzysiek Ciesielski 的“Benchmarking akka-stream-kafka”上查看更多信息。
【讨论】:
我在使用新库时遇到了问题。 Ran 代码如下要点:gist.github.com/gzoller/8847556ab1347b886d80cf53c3a1e867 这产生了一堆“死信”错误,试图使用文档中的批处理建议。我还尝试了一个变体,我只提交每个批次的头部(即最新的偏移量)。这并没有给我死信错误,但它也没有产生一组干净的分区偏移量。 @Greg Strange。您可以根据该示例提出一个新问题,看看是否有真正的 Akka Streams Kafka 专家可以加入。以上是关于如何改善响应式 kafka(Scala 加 Akka Streams)的缓慢性能?的主要内容,如果未能解决你的问题,请参考以下文章