如何在 Scala 中实现 Kafka Consumer

Posted

技术标签:

【中文标题】如何在 Scala 中实现 Kafka Consumer【英文标题】:How do I implement Kafka Consumer in Scala 【发布时间】:2016-12-10 00:04:35 【问题描述】:

我正在尝试在 scala 中实现一个 kafka 消费者。我已经看过一百万篇关于如何用 Java 进行操作的教程,甚至有些 (like this one) 说它是针对 scala 的,但它是用 Java 编写的。

有谁知道我在哪里可以找到如何用 Scala 编写它的示例?我才刚刚开始学习 Scala,所以即使链接示例是用 Java 或其他东西编写的,也可能可以在 Scala 中使用,但老实说,我现在不知道我在做什么。我在谷歌上搜索的所有内容都只是将我链接到如何用 Java 进行操作。

【问题讨论】:

您可以使用 Scala 中的所有 Java 代码,只需很少的更改。 我可以在 Java 中创建类,然后将其导入到我想要使用的类中吗?或者我是否需要将所有变量和东西重写为 scala? 没关系,我的 scala 测试无法识别 java 类。这是 Java 中的类 (pastebin.com/tnS9Amie),我只是不太了解 scala 来转换它。看起来有可能吗? 您能详细说明“不会识别”吗?这是怎么回事?您的项目结构如何? 【参考方案1】:

您看到大多数 Java 示例的原因是,从 0.8.2.2 开始的新 KafkaProducer 是用 Java 编写的。

假设您使用 sbt 作为构建系统,并假设您使用 Kafka 0.8.2.2(您可以根据需要更改版本),您将需要:

libraryDependencies ++= 
  Seq(
    "org.apache.kafka" %% "kafka" % "0.8.2.2",
    "org.apache.kafka" % "kafka-clients" % "0.8.2.2",
  )

一个简单的例子应该让你开始:

import scala.collection.JavaConverters._
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer 

object KafkaExample 
  def main(args: Array[String]): Unit = 
    val properties = new Properties()
    properties.put("bootstrap.servers", "localhost:9092")
    properties.put("group.id", "consumer-tutorial")
    properties.put("key.deserializer", classOf[StringDeserializer])
    properties.put("value.deserializer", classOf[StringDeserializer])

    val kafkaConsumer = new KafkaConsumer[String, String](properties)
    kafkaConsumer.subscribe("firstTopic", "secondTopic")

    while (true) 
      val results = kafkaConsumer.poll(2000).asScala
      for ((topic, data) <- results) 
        // Do stuff
      
    

【讨论】:

消费者不应该与 zookeeper 客户而不是经纪人交谈吗? @AvihooMamka Kafka 不再“需要”ZooKeeper 来跟踪偏移量。如何做到这一点完全取决于您。通常,消费者会与经纪人交谈以进行消费。 results 始终为 null。我错过了什么? @ItayB 你确定你从正确的主题消费?它有消息吗?你的消费策略是什么?最早还是最晚? @ItayB 你需要this,查看那里的实现。我会更新我的答案。【参考方案2】:

您还可以在这里查看完全基于 Scala 构建的工作模板:https://github.com/knoldus/activator-kafka-scala-producer-consumer 此应用程序包含您要在此处使用的代码。

希望我能解决您的问题,谢谢!

【讨论】:

以上是关于如何在 Scala 中实现 Kafka Consumer的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Scala 中实现真正的 Singleton

您将如何在 Scala 中实现缓存方面

如何在 spark scala 中实现 uniqueConcatenate、uniqueCount [关闭]

如何在火花聚合函数中实现scala类型安全

如何在Scala中实现一个真正的Singleton

Scala - 如何在 Spark 的 map 函数中实现 Try