Spark Streaming + Kafka:SparkException:找不到Set的领导者偏移量

Posted

技术标签:

【中文标题】Spark Streaming + Kafka:SparkException:找不到Set的领导者偏移量【英文标题】:Spark Streaming + Kafka: SparkException: Couldn't find leader offsets for Set 【发布时间】:2016-03-21 04:44:58 【问题描述】:

我正在尝试设置 Spark Streaming 以从 Kafka 队列中获取消息。我收到以下错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o30.createDirectStream.
: org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
org.apache.spark.SparkException: Couldn't find leader offsets for Set([test-topic,0])
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at scala.util.Either.fold(Either.scala:97)

这是我正在执行的代码 (pyspark):

from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

directKafkaStream = KafkaUtils.createDirectStream(ssc, ["test-topic"], "metadata.broker.list": "host.domain:9092")

ssc.start()
ssc.awaitTermination()

有几个类似的帖子有相同的错误。在所有情况下,原因都是空的 kafka 主题。我的“测试主题”中有消息。我可以把它们弄出来

kafka-console-consumer --zookeeper host.domain:2181 --topic test-topic --from-beginning --max-messages 100

有谁知道可能是什么问题?

我正在使用:

Spark 1.5.2 (apache) Kafka 0.8.2.0+kafka1.3.0 (CDH 5.4.7)

【问题讨论】:

我认为这是缺少领导者的问题,请查看helpful i think 我也遇到了同样的问题,请问您找到解决方法了吗?我使用 spark 1.6.1 和 kafka 0.8.2.1 我将我的偏移量存储在 zookeeper 中。我清除/休息了我存储的偏移量,这个错误不再出现。 【参考方案1】:

如果您在 /etc/hosts 中定义短主机名并在您的 kafka 服务器配置中使用它们,您应该将这些名称更改为 ip。或者在本地 PC 或客户端的 /etc/hosts 中注册相同的短主机名。

发生错误,因为 Spark 流媒体库无法解析 PC 或客户端中的短主机名。

【讨论】:

【参考方案2】:

你需要检查两件事:

    检查这个主题和分区是否存在,你的情况是主题是test-topic,分区是0。

    根据您的代码,您正在尝试从偏移量 0 消费消息,并且可能无法从偏移量 0 获得消息,请检查您最早的偏移量是什么,然后尝试从那里消费。

    李>

以下是检查最早偏移量的命令:

sh kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "your broker list" --topic "topic name" --time -1 

【讨论】:

【参考方案3】:

1) 你必须确保你已经创建了主题test-topic

运行以下命令查看主题列表

kafka-topics.sh --list --zookeeper [host or ip of zookeeper]:[port]

2) 检查你的主题后,你必须在Socket Server Settings部分配置你的Kafka配置

listeners=PLAINTEXT://[host or ip of Kafka]:[port]

【讨论】:

【参考方案4】:

如果主题不存在则强制创建主题的另一个选项。您可以通过像这样在 kafkaParams 映射中将属性“auto.create.topics.enable”设置为“true”来做到这一点。

val kafkaParams = Map[String, String](
  "bootstrap.servers" -> kafkaHost,
  "group.id" -> kafkaGroup,
  "auto.create.topics.enable" -> "true")

使用 Scala 2.11 和 Kafka 0.10 版本。

【讨论】:

【参考方案5】:

无法为指定主题找到领导者的此类错误的原因之一是一个人的 Kafka 服务器配置问题。

打开您的 Kafka 服务器配置:

vim ./kafka/kafka-<your-version>/config/server.properties

在“套接字服务器设置”部分中,如果您的主机缺少 IP,请提供它:

listeners=PLAINTEXT://host-ip:host-port

我正在使用 MapR 沙箱提供的 Kafka 设置,并试图通过 spark 代码访问 kafka。我在访问我的 kafka 时遇到了同样的错误,因为我的配置缺少 IP。

【讨论】:

以上是关于Spark Streaming + Kafka:SparkException:找不到Set的领导者偏移量的主要内容,如果未能解决你的问题,请参考以下文章

spark streaming kafka example

Spark Streaming实时流处理项目实战Spark Streaming整合Kafka实战一

Spark Streaming实时流处理项目实战Spark Streaming整合Kafka实战一

Spark Streaming实时流处理项目实战Spark Streaming整合Kafka实战一

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十二)Spark Streaming接收流数据及使用窗口函数

Spark Streaming:Spark Structured Streaming 中不允许使用 Kafka 组 ID