在 kafka 集群节点之间分配数据套接字

Posted

技术标签:

【中文标题】在 kafka 集群节点之间分配数据套接字【英文标题】:Distributing data socket among kafka cluster nodes 【发布时间】:2019-05-29 01:58:45 【问题描述】:

我想从 socket 中获取数据并将其放入 kafka 主题中,以便我的 flink 程序可以从主题中读取数据并进行处理。我可以在一个节点上做到这一点。但我想拥有一个至少有三个不同节点(不同 IP 地址)的 kafka 集群,并从套接字轮询数据以在节点之间分发它。我不知道如何执行此操作并更改此代码。我的简单程序如下:

public class WordCount 

   public static void main(String[] args) throws Exception 

    kafka_test objKafka=new kafka_test();
  // Checking input parameters
    final ParameterTool params = ParameterTool.fromArgs(args);
    int myport = 9999;
    String hostname = "localhost";
 // set up the execution environment
    final StreamExecutionEnvironment env = 
  StreamExecutionEnvironment.getExecutionEnvironment();


 // make parameters available in the web interface
    env.getConfig().setGlobalJobParameters(params);

    DataStream<String> stream = env.socketTextStream(hostname,myport);

    stream.addSink(objKafka.createStringProducer("testFlink", 
    "localhost:9092"));

    DataStream<String> text = 
    env.addSource(objKafka.createStringConsumerForTopic("testFlink", 
    "localhost:9092", "test"));
    DataStream<Tuple2<String, Long>> counts = text
     .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() 
                @Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) 
   
          // normalize and split the line
             String[] words = value.toLowerCase().split("\\W+");

                    // emit the pairs
             for (String word : words) 
                  if (!word.isEmpty()) 
                     out.collect(new Tuple2<String, Long>(word, 1L));
                        
                    
                
            )
            .keyBy(0)
            .timeWindow(Time.seconds(5))
            .sum(1);
          // emit result
        if (params.has("output")) 
           counts.writeAsText(params.get("output"));
           else 
          System.out.println("Printing result to stdout. Use --output 
          to specify output path.");
          counts.print();
         
    // execute program
    env.execute("Streaming WordCount");

    //main
   

  public class kafka_test 
  public FlinkKafkaConsumer<String> createStringConsumerForTopic(
        String topic, String kafkaAddress, String kafkaGroup) 
  //        ************************** KAFKA Properties ******        
     Properties props = new Properties();
    props.setProperty("bootstrap.servers", kafkaAddress);
    props.setProperty("group.id", kafkaGroup);
    FlinkKafkaConsumer<String> myconsumer = new FlinkKafkaConsumer<>(
            topic, new SimpleStringSchema(), props);
    myconsumer.setStartFromLatest();     

    return myconsumer;
  

  public FlinkKafkaProducer<String> createStringProducer(
        String topic, String kafkaAddress) 

        return new FlinkKafkaProducer<>(kafkaAddress,
            topic, new SimpleStringSchema());
     
  

请指导我如何在不同的kafka节点之间广播套接字流数据?

任何帮助将不胜感激。

【问题讨论】:

【参考方案1】:

我认为您的代码是正确的。 Kafka 将负责数据的“分发”。如何在 Kafka 代理之间分配数据将取决于主题配置。

查看答案here 以更好地了解 Kafka 主题和分区。

假设您有 3 个 Kafka 代理。然后,如果您创建具有 3 个副本和 3 个分区的主题

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic my-topic

这将导致您的主题将有 3 个分区,每个分区将在您的集群中存储 3 次。使用 3 个代理,您将在每个代理上存储 1 个分区和 2 个副本。

然后你只需要创建你的 Kafka Sink

FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
        "broker1:9092,broker2:9092,broker3:9092",
        "my-topic",
        new SimpleStringSchema());

stream.addSink(myProducer);

【讨论】:

亲爱的@belbo 感谢您的回答。但是假设我有三个具有不同 IP 地址的代理。那么,数据如何在经纪人之间分布?请告诉我是否必须在所有经纪人中使用您的代码(如上)? 不在代理中。你有两个集群。 Kafka 集群——这将由多个代理组成,每个代理都有不同的 IP。然后是 Flink 集群——由具有不同 IP 的多个节点/服务器组成。我在上面粘贴的代码用于您的 Flink 作业,它将在 Flink 集群上运行。该作业将使用 Kafka 代理 (broker1:9092,broker2:9092,broker3:9092) 的 IP 创建 Kafka 生产者,并将在这些 Kafka 代理之间分发数据。 亲爱的@belbo,很抱歉打扰您。事实上,我使用此命令“cat file.csv | nc -lk 9999”将数据从 CSV 文件泵送到套接字 9999。如果我将数据发送到所有broker IP 的9999 端口,那么每个broker 都可能得到相同的数据。如何将所有数据分发给三个经纪人?你能告诉我 Flink 主节点是否为我做这件事吗?感谢收获。 您希望在 Flink 节点之间分发数据,而不是在 Kafka 代理之间。我有点困惑,因为nc -lk 意味着您将监听数据,而不是发送数据。无论如何,为了实现你所需要的,我猜你需要在 netcat 和 Flink 节点之间放置某种负载均衡器,这将负责在 Flink 实例之间分配数据,或者编写一个 shell 脚本来负责发送不同行的文件到不同的 Flink 节点。但对于生产用途,我建议使用更强大的东西 - 也许是 Apache NiFi 或其他东西。 您无需在 Kafka 代理之间分发数据。作为 Flink 工作的一部分的 Kafka 生产者将负责处理它。数据的分布方式取决于您将要写入的主题的分区数。

以上是关于在 kafka 集群节点之间分配数据套接字的主要内容,如果未能解决你的问题,请参考以下文章

kafka集群扩容后的数据迁移

一文掌握 Kafka 集群快速扩容的方案

kafka集群里面如何重建单个节点

避坑指南:Kafka集群快速扩容的方案总结

为啥我的客户端应该只连接到一个子节点集群工作人员时接收来自所有子节点集群工作人员的套接字发射?

如何将套接字传递给节点中的集群