如何将 Kafka (Java) 应用程序从 Windows 连接到 Linux 中的 Confluent

Posted

技术标签:

【中文标题】如何将 Kafka (Java) 应用程序从 Windows 连接到 Linux 中的 Confluent【英文标题】:How to connect Kafka (Java) application from Windows to Confluent in Linux 【发布时间】:2019-08-20 14:12:56 【问题描述】:

我正在使用 Winscp 和 Putty 在 Linux 服务器上运行 Confluent 5.0。我在 Windows 中有 Kafka (Java/Eclipse) 应用程序。

当我运行 Java 应用程序时,它无法识别 Linux 上运行的 Confluent 中的 Kafka 代理。

我通过在 MAC 终端中运行 Confluent 5.0 测试了我的 Java 应用程序,该应用程序将数据发送到 MACBook 中的 Kafka 主题。现在我正在尝试在 Windows 中实现相同的 Kafka 应用程序。由于 Windows 不支持 Confluent,所以我在 Linux 服务器上运行。

我使用 Confluent 而不是 Apache Kafka,因为我在我的应用程序中使用了 Schema-registry。

通过使用 netstat -tupln & curl -v http://localhost:port no。发现 Kafka 在 8082 上运行,架构注册表在 8081 details of ports 上。 下面是我在 Java 应用程序中的 Kafka Properties。

public static Properties producerProperties() 

    // normal producer
    properties.setProperty("bootstrap.servers", "127.0.0.1:8082");
    properties.setProperty("acks", "all");
    properties.setProperty("retries", "10");
    // avro part
    properties.setProperty("key.serializer", StringSerializer .class.getName());
    properties.setProperty("value.serializer", KafkaAvroSerializer .class.getName());
    properties.setProperty("schema.registry.url", "http://127.0.0.1:8081");

    return properties;



public static Properties consumerProperties() 

   // Properties properties = new Properties();
    // normal consumer
    properties.setProperty("bootstrap.servers", "127.0.0.1:8082");
    //different for consumer
    properties.setProperty("group.id", "Avro-consumer");
    properties.setProperty("enable.auto.commit", "false");
    properties.setProperty("auto.offset.reset", "earliest");

    // avro part
    properties.setProperty("key.deserializer", StringDeserializer.class.getName());
    properties.setProperty("value.deserializer", KafkaAvroDeserializer.class.getName());
    properties.setProperty("schema.registry.url", "http://127.0.0.1:8081");
    properties.setProperty("specific.avro.reader", "true");

    return properties;


public static Properties streamsProperties() 

    // normal consumer
    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "com.github.ptn006");
    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:8082");
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    return properties;

预期: 写入 Kafka 主题的数据。

实际: WARN 无法建立到节点 -1 的连接。经纪人可能不可用。 (org.apache.kafka.clients.NetworkClient:589)

【问题讨论】:

【参考方案1】:

您需要确保 Windows 机器上的 advertised.listeners of the server.properties file in Kafka is resolvable。还要确保防火墙允许访问 (netstat -tupln | grep LIST),并寻找您的 Kafka 端口在 0.0.0.0 上侦听,例如。

【讨论】:

用 java 代码中来自 linux 和 kafka prop 的端口的详细信息更新了我的帖子。如果我还遗漏了什么,请告诉我。 127.0.0.1 是你的 Windows 机器,而不是 linux 机器 谢谢!!通过指向linux服务器地址和端口号,它工作正常!

以上是关于如何将 Kafka (Java) 应用程序从 Windows 连接到 Linux 中的 Confluent的主要内容,如果未能解决你的问题,请参考以下文章

如何将消息直接从现有的大型机应用程序发布到 Kafka 主题?

我可以从同一个 Java 应用程序中的 2 个不同的 kafka 服务器集群获取数据吗?

如何从 Kafka JSON 消息中获取 org.apache.kafka.connect.data.Decimal 值 [重复]

如何使用 JAVA API 从 Kafka 获取每个主题的消息数量 [重复]

如何获取Kafka的消费者详情——从Scala到Java的切换

Flink是如何从kafka中拉取数据的