ClassNotFoundException:org.apache.flink.streaming.api.checkpoint.CheckpointNotifier 同时消费一个 kafka 主题

Posted

技术标签:

【中文标题】ClassNotFoundException:org.apache.flink.streaming.api.checkpoint.CheckpointNotifier 同时消费一个 kafka 主题【英文标题】:ClassNotFoundException: org.apache.flink.streaming.api.checkpoint.CheckpointNotifier while consuming a kafka topic 【发布时间】:2017-02-23 12:35:48 【问题描述】:

我正在使用最新的 Flink-1.1.2-Hadoop-27 和 flink-connector-kafka-0.10.2-hadoop1 jars。

Flink 消费者如下:

StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        if (properties == null) 
            properties = new Properties();
            InputStream props = Resources.getResource(KAFKA_CONFIGURATION_FILE).openStream();
            properties.load(props);

            DataStream<String> stream = env.addSource(new FlinkKafkaConsumer082<>(KAFKA_SIP_TOPIC, new SimpleStringSchema() , properties));

以下是我执行后得到的异常:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/checkpoint/CheckpointNotifier
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(Unknown Source)
at java.security.SecureClassLoader.defineClass(Unknown Source)
at java.net.URLClassLoader.defineClass(Unknown Source)
at java.net.URLClassLoader.access$100(Unknown Source)
at java.net.URLClassLoader$1.run(Unknown Source)
at java.net.URLClassLoader$1.run(Unknown Source)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(Unknown Source)
at java.security.SecureClassLoader.defineClass(Unknown Source)
at java.net.URLClassLoader.defineClass(Unknown Source)
at java.net.URLClassLoader.access$100(Unknown Source)
at java.net.URLClassLoader$1.run(Unknown Source)
at java.net.URLClassLoader$1.run(Unknown Source)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at com.bt.oss.voice.main.FlnkConsumer.main(FlnkConsumer.java:50)Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.api.checkpoint.CheckpointNotifier
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
... 25 more

【问题讨论】:

【参考方案1】:

您正在混合版本。 Flink 0.10.2 的 Kafka 消费者不适用于 Flink 1.1.2。

您应该使用 Flink 1.1.2 提供的 Kafka 连接器并包含以下 Maven 依赖项:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
  <version>1.1.2</version>
</dependency>

详情请查看documentation。

【讨论】:

【参考方案2】:

如果使用sbt,flink scala 依赖项中的以下更新解决了我的问题。

"org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",

到:

"org.apache.flink" %% "flink-scala" % flinkVersion,
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion,

【讨论】:

以上是关于ClassNotFoundException:org.apache.flink.streaming.api.checkpoint.CheckpointNotifier 同时消费一个 kafka 主题的主要内容,如果未能解决你的问题,请参考以下文章

ClassNotFoundException:org.sqlite.JDBC

ClassNotFoundException和NoClassDefFoundError

ClassNotFoundException和NoClassDefFoundError

ClassNotFoundException:oracle.jdbc.driver.OracleDriver

如何解决 ClassNotFoundException?

ClassNotFoundException / mysql jdbc驱动程序[重复]