如何改善响应式 kafka(Scala 加 Akka Streams)的缓慢性能?

Posted

技术标签:

【中文标题】如何改善响应式 kafka(Scala 加 Akka Streams)的缓慢性能?【英文标题】:How to improve slow performance of reactive-kafka (Scala plus Akka Streams)? 【发布时间】:2016-07-17 12:45:03 【问题描述】:

我正在将我的项目从 RabbitMQ 转移到 Kafka,并试图了解 reactive-kafka 的速度。

我目前能够以大约 12K/秒的速度向 Rabbit 写入大约 12K/秒的普通消息,并在读取时通过“hello world”流以大约 4K/秒的速度从队列中拉出微不足道的数据。

我使用反应流迁移到 Kafka,我可以写大概 1M/秒——巨大的胜利! 但是在相同的环境中,我只能使用此处示例中的方法以大约 2K/秒的速度通过读取流:DummyConsumer.scala

有没有人知道如何将读数恢复到与 Rabbit 方法相当的水平?

有趣:我只是“直接”尝试了它(通过原始 Java 驱动程序与响应式 kafka 访问 Kafka)并且获得了大约 22K 的读取,所以这非常好。它说明了我如何使用响应式卡夫卡正在减慢速度。

好的...我正在寻找这个东西。接下来我尝试了一个原始的 Akka 流“hello world”:

now = System.currentTimeMillis()
count = 0
val in2 = Source(1 to num)
val g = RunnableGraph.fromGraph(GraphDSL.create()  implicit b =>
  import GraphDSL.Implicits._
  val show = Flow[Int].map i => count +=1; if(count==num) println(s"time 2 ($count): "+(System.currentTimeMillis() - now)); i 
  in2 ~> show ~> Sink.ignore
  ClosedShape
)
g.run()
Thread.sleep(2000)

这以非常快的 742K/秒运行!所以 Kafka raw 速度很快,而 Akka 流速度很快。因此,罪魁祸首在于反应式 kafka 的构建方式(或更可能)我尝试使用它的方式。考虑到摩擦,我应该期望看到接近原始 kafka 的 22K/秒的东西。嗯。

【问题讨论】:

嗨 Greg,这里的反应式 kafka 开发人员。我们正在完成 reactive-kafka 0.11,它应该会给你带来更好的性能。但是,旧版本的性能应该比仅 2K/s 更高。你的代码还在吗?我想看看你如何使用 reactive-kafka。 不幸的是我没有那个代码了。基本上我正在从文档中的示例代码中进行剪切粘贴......没什么特别的。我转移并开发了一个项目 LateKafka(在 github 中),为 Akka 流创建源。它真的很快——我正在为一个微不足道的流提取每秒 6 位数的事务处理 kafka-thru-stream。关于 0.11 的激动人心的消息。试试看会很酷。 嗨 Jacek,我刚刚使用新的 0.11 版本重建了我的测试。它比 M 系列快得多,但遗憾的是我仍然无法重现其他人显示的高数字。注意我只是想获得一些性能——我的真实用例使用流 DSL,文档中没有显示。我在另一个问题中发布了代码:***.com/questions/39617827/… 嗨 Greg,我们从 akka-streams 切换到使用 vanilla Kafka 也看到了良好的性能提升。 【参考方案1】:

不是一个完整的答案,但请再试一次,但最近(2016 年 9 月)Akka Streams Kafka 0.11

有显着的性能改进。我们已经完成了一些基准测试,虽然 react-kafka 包装器仍然存在一些开销(尽管请记住,您也会得到一些回报:例如,所有好处都来自具有异步背压的良好 API!),总体数字看起来非常好,而且越来越好。

这里有几个常见的Kafka使用场景,比较旧的reactive-kafka版本(M4),当前版本(0.11),以及使用普通Kafka实现的等效功能Producers/Consumers(但没有考虑到,当然,与任何其他反应性组件连接)。

在Krzysiek Ciesielski 的“Benchmarking akka-stream-kafka”上查看更多信息。

【讨论】:

我在使用新库时遇到了问题。 Ran 代码如下要点:gist.github.com/gzoller/8847556ab1347b886d80cf53c3a1e867 这产生了一堆“死信”错误,试图使用文档中的批处理建议。我还尝试了一个变体,我只提交每个批次的头部(即最新的偏移量)。这并没有给我死信错误,但它也没有产生一组干净的分区偏移量。 @Greg Strange。您可以根据该示例提出一个新问题,看看是否有真正的 Akka Streams Kafka 专家可以加入。

以上是关于如何改善响应式 kafka(Scala 加 Akka Streams)的缓慢性能?的主要内容,如果未能解决你的问题,请参考以下文章

为啥kafka 用scala

Scala 进阶—— implicit 用法:隐式参数

如何在 Scala 中实现 Kafka Consumer

使用Scala的强大api快速加工数据

如何获取Kafka的消费者详情——从Scala到Java的切换

Scala.React 的状态如何? [关闭]