如何在 Scala 中实现 Kafka Consumer
Posted
技术标签:
【中文标题】如何在 Scala 中实现 Kafka Consumer【英文标题】:How do I implement Kafka Consumer in Scala 【发布时间】:2016-12-10 00:04:35 【问题描述】:我正在尝试在 scala 中实现一个 kafka 消费者。我已经看过一百万篇关于如何用 Java 进行操作的教程,甚至有些 (like this one) 说它是针对 scala 的,但它是用 Java 编写的。
有谁知道我在哪里可以找到如何用 Scala 编写它的示例?我才刚刚开始学习 Scala,所以即使链接示例是用 Java 或其他东西编写的,也可能可以在 Scala 中使用,但老实说,我现在不知道我在做什么。我在谷歌上搜索的所有内容都只是将我链接到如何用 Java 进行操作。
【问题讨论】:
您可以使用 Scala 中的所有 Java 代码,只需很少的更改。 我可以在 Java 中创建类,然后将其导入到我想要使用的类中吗?或者我是否需要将所有变量和东西重写为 scala? 没关系,我的 scala 测试无法识别 java 类。这是 Java 中的类 (pastebin.com/tnS9Amie),我只是不太了解 scala 来转换它。看起来有可能吗? 您能详细说明“不会识别”吗?这是怎么回事?您的项目结构如何? 【参考方案1】:您看到大多数 Java 示例的原因是,从 0.8.2.2 开始的新 KafkaProducer
是用 Java 编写的。
假设您使用 sbt 作为构建系统,并假设您使用 Kafka 0.8.2.2(您可以根据需要更改版本),您将需要:
libraryDependencies ++=
Seq(
"org.apache.kafka" %% "kafka" % "0.8.2.2",
"org.apache.kafka" % "kafka-clients" % "0.8.2.2",
)
一个简单的例子应该让你开始:
import scala.collection.JavaConverters._
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
object KafkaExample
def main(args: Array[String]): Unit =
val properties = new Properties()
properties.put("bootstrap.servers", "localhost:9092")
properties.put("group.id", "consumer-tutorial")
properties.put("key.deserializer", classOf[StringDeserializer])
properties.put("value.deserializer", classOf[StringDeserializer])
val kafkaConsumer = new KafkaConsumer[String, String](properties)
kafkaConsumer.subscribe("firstTopic", "secondTopic")
while (true)
val results = kafkaConsumer.poll(2000).asScala
for ((topic, data) <- results)
// Do stuff
【讨论】:
消费者不应该与 zookeeper 客户而不是经纪人交谈吗? @AvihooMamka Kafka 不再“需要”ZooKeeper 来跟踪偏移量。如何做到这一点完全取决于您。通常,消费者会与经纪人交谈以进行消费。results
始终为 null
。我错过了什么?
@ItayB 你确定你从正确的主题消费?它有消息吗?你的消费策略是什么?最早还是最晚?
@ItayB 你需要this,查看那里的实现。我会更新我的答案。【参考方案2】:
您还可以在这里查看完全基于 Scala 构建的工作模板:https://github.com/knoldus/activator-kafka-scala-producer-consumer 此应用程序包含您要在此处使用的代码。
希望我能解决您的问题,谢谢!
【讨论】:
以上是关于如何在 Scala 中实现 Kafka Consumer的主要内容,如果未能解决你的问题,请参考以下文章