flink+Kafka:获取主机名端口

Posted

技术标签:

【中文标题】flink+Kafka:获取主机名端口【英文标题】:flink+Kafka: getHostnamePort 【发布时间】:2016-05-24 07:17:46 【问题描述】:

我想从flink读取一个kafka主题

package Toletum.pruebas;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

public class LeeKafka 
  public static void main(String[] args) throws Exception 
    final ParameterTool parameterTool = ParameterTool.fromArgs(args);
    
    // create execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

      FlinkKafkaConsumer082<String> kafkaSrc = new FlinkKafkaConsumer082<String>("test02", 
      new SimpleStringSchema(), 
      parameterTool.getProperties());
      
      DataStream<String> messageStream = env.addSource(kafkaSrc);
      
    messageStream.rebalance().map(new MapFunction<String, String>() 
      private static final long serialVersionUID = -6867736771747690202L;
  
      public String map(String value) throws Exception 
        return "Kafka and Flink says: " + value;
      
    ).print();

    env.execute("LeeKafka");
  


此代码成功运行:

java -cp Package.jar Toletum.pruebas.LeeKafka --topic test02 --bootstrap.servers kafka:9092 --zookeeper.connect zookeeper:2181 --group.id myGroup

但是,当我尝试从 flink 使用时:

flink run -c Toletum.pruebas.LeeKafka pruebas-0.0.1-SNAPSHOT-jar-with-dependencies.jar --topic test02 --bootstrap.servers kafka:9092 --zookeeper.connect zookeeper:2181 --group.id myGroup

我收到一个错误:

java.lang.NoSuchMethodError: org.apache.flink.util.NetUtils.getHostnamePort(Ljava/lang/String;)Ljava/net/URL; 在 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.getPartitionsForTopic(FlinkKafkaConsumer.java:592) 在 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:280) 在 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082.(FlinkKafkaConsumer082.java:49) 在 Toletum.pruebas.LeeKafka.main(LeeKafka.java:22) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:606) 在 org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497) 在 org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395) 在 org.apache.flink.client.program.Client.runBlocking(Client.java:252) 在 org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:676) 在 org.apache.flink.client.CliFrontend.run(CliFrontend.java:326) 在 org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:978) 在 org.apache.flink.client.CliFrontend.main(CliFrontend.java:1028)

【问题讨论】:

会不会是你编译作业的版本和集群上运行的Flink版本不相等? 谢谢....我在 pom.xml 中使用的是旧版本 【参考方案1】:

旧版本库.....

正确的 pom.xml:



            &ltdependency&gt
                    &ltgroupId&gtorg.apache.flink&lt/groupId&gt
                    &ltartifactId&gtflink-connector-kafka&lt/artifactId&gt
                    &ltversion&gt0.10.1&lt/version&gt
            &lt/dependency&gt

【讨论】:

【参考方案2】:

此问题是由于使用旧版本的 FLink 连接器库所致。

您可以查看最新的可用库并下载最新的 Maven 依赖项。

您使用的 Kafka 版本也应考虑在内。

尝试使用来自 Flink Documentation for Kafka Connector 的最新 Maven 依赖项

最新的maven依赖是

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
  <version>1.3.2</version>
</dependency>

【讨论】:

以上是关于flink+Kafka:获取主机名端口的主要内容,如果未能解决你的问题,请参考以下文章

如何解析 Postgresql JDBC url 以获取主机名、端口和 db_name

如何从 http 或 https 请求中获取带有端口的主机名

如何获取部署 Java 适配器的 IBM MobileFirst 服务器的主机名和端口?

获取本地主机名和 IP 地址的 C++ Windows 函数调用

如何覆盖访问主机名和端口的架构注册表主机

C++ 字符串匹配(主机名和端口)