scala中的Flink Kafka程序给出超时错误org.apache.kafka.common.errors.TimeoutException:60000毫秒后更新元数据失败

Posted

技术标签:

【中文标题】scala中的Flink Kafka程序给出超时错误org.apache.kafka.common.errors.TimeoutException:60000毫秒后更新元数据失败【英文标题】:Flink Kafka program in scala giving timeout error org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms 【发布时间】:2017-12-15 11:30:18 【问题描述】:

我正在编写如下 Flink Kafka 集成程序,但出现 kafka 超时错误:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010, 
FlinkKafkaProducer010
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import java.util.Properties

object StreamKafkaProducer 

def main(args: Array[String]) 
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("serializer.class", "kafka.serializer.StringEncoder")


val stream: DataStream[String] =env.fromElements(
  ("Adam"),
  ("Sarah"))

val kafkaProducer = new FlinkKafkaProducer010[String](
  "localhost:9092",
  "output",
  new SimpleStringSchema
)
// write data into Kafka
stream.addSink(kafkaProducer)

env.execute("Flink kafka integration  ")


从终端我可以看到 kafka 和 zookeeper 正在运行,但是当我从 Intellij 运行上面的程序时,它显示了这个错误:

C:\Users\amdass\workspace\flink-project-master>sbt run
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=256m; 
support was removed in 8.0
[info] Loading project definition from C:\Users\amdass\workspace\flink-
project-master\project
[info] Set current project to Flink Project (in build 
file:/C:/Users/amdass/workspace/flink-project-master/)
[info] Compiling 1 Scala source to C:\Users\amdass\workspace\flink-project-
master\target\scala-2.11\classes...
[info] Running org.example.StreamKafkaProducer
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
details.
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-563113020] 
with leader session id 5a637740-5c73-4f69-a19e-c8ef7141efa1.
12/15/2017 14:41:49     Job execution switched to status RUNNING.
12/15/2017 14:41:49     Source: Collection Source(1/1) switched to SCHEDULED
12/15/2017 14:41:49     Sink: Unnamed(1/4) switched to SCHEDULED
12/15/2017 14:41:49     Sink: Unnamed(2/4) switched to SCHEDULED
12/15/2017 14:41:49     Sink: Unnamed(3/4) switched to SCHEDULED
12/15/2017 14:41:49     Sink: Unnamed(4/4) switched to SCHEDULED
12/15/2017 14:41:49     Source: Collection Source(1/1) switched to DEPLOYING
12/15/2017 14:41:49     Sink: Unnamed(1/4) switched to DEPLOYING
12/15/2017 14:41:49     Sink: Unnamed(2/4) switched to DEPLOYING
12/15/2017 14:41:49     Sink: Unnamed(3/4) switched to DEPLOYING
12/15/2017 14:41:49     Sink: Unnamed(4/4) switched to DEPLOYING
12/15/2017 14:41:50     Source: Collection Source(1/1) switched to RUNNING
12/15/2017 14:41:50     Sink: Unnamed(2/4) switched to RUNNING
12/15/2017 14:41:50     Sink: Unnamed(4/4) switched to RUNNING
12/15/2017 14:41:50     Sink: Unnamed(3/4) switched to RUNNING
12/15/2017 14:41:50     Sink: Unnamed(1/4) switched to RUNNING
12/15/2017 14:41:50     Source: Collection Source(1/1) switched to FINISHED
12/15/2017 14:41:50     Sink: Unnamed(3/4) switched to FINISHED
12/15/2017 14:41:50     Sink: Unnamed(4/4) switched to FINISHED
12/15/2017 14:42:50     Sink: Unnamed(1/4) switched to FAILED
<b>  org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
after 60000 ms. </b>

12/15/2017 14:42:50     Sink: Unnamed(2/4) switched to FAILED
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
after 60000 ms.

12/15/2017 14:42:50     Job execution switched to status FAILING.

org.apache.kafka.common.errors.TimeoutException: 60000 毫秒后更新元数据失败。 2017 年 12 月 15 日 14:42:50 作业执行切换到 FAILED 状态。 [错误] (run-main-0) org.apache.flink.runtime.client.JobExecutionException:作业执行失败。 org.apache.flink.runtime.client.JobExecutionException:作业执行失败。 在 org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:933) 在 org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:876) 在 org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:876) 在 scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) 在 scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) 在 akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) 在 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) 在 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 在 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 在 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 在

scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107 ) 引起:org.apache.kafka.common.errors.TimeoutException:更新失败 60000 毫秒后的元数据。 [trace] 堆栈跟踪被抑制:最后运行 *:run 以获取完整输出。 java.lang.RuntimeException:非零退出代码:1 在 scala.sys.package$.error(package.scala:27​​) [trace] 堆栈跟踪被抑制:运行最后一次编译:运行完整输出。 [错误](编译:运行)非零退出代码:1 [错误] 总时间:75 s,2017 年 12 月 15 日下午 2:42:51 完成

【问题讨论】:

【参考方案1】:

请检查并确保您的 Kafka 服务器正在运行。 这个错误一般是当你的 Flink 程序无法连接到 Kafka 服务器时。 Flink 会自动尝试以某个阈值时间连接到 Kafka 服务器。一旦达到此阈值并且 Flink 仍然无法与 Kafka 建立连接,那么它将抛出此 org.apache.kafka.common.errors.TimeoutException

请检查您的 Kafka 服务器详细信息、Kafka 主题,并验证您的 Kafka 服务器是否正在运行。

【讨论】:

以上是关于scala中的Flink Kafka程序给出超时错误org.apache.kafka.common.errors.TimeoutException:60000毫秒后更新元数据失败的主要内容,如果未能解决你的问题,请参考以下文章

任何人都可以在 Scala 中分享 Flink Kafka 示例吗?

Flink输出到Kafka(两种方式)

kafka学习,kafka踩坑,使用java项目连接kafka发送消息报错连接超时问题

如何在 Flink 独立集群上的 Flink 作业中使用两个 Kerberos 密钥表(用于 Kafka 和 Hadoop HDFS)?

Flink集成Kafka

Kafka与Flink集成