我可以从同一个 Java 应用程序中的 2 个不同的 kafka 服务器集群获取数据吗?

Posted

技术标签:

【中文标题】我可以从同一个 Java 应用程序中的 2 个不同的 kafka 服务器集群获取数据吗?【英文标题】:Can i get data from 2 different kafka server clusters in the same java app? 【发布时间】:2017-11-24 02:57:26 【问题描述】:

我是卡夫卡的新手。通常我会编写一个小型 java 演示应用程序,设置一个 kafka 消费者并从一个 3 kafka 服务器集群获取数据。它工作得很好。 我将设置如下服务器 props.put("bootstrap.servers", "192.168.22.1:9092,192.168.22.2:9092,192.168.22.3:9092"),并将订阅 consumer.subscribe(Arrays.asList("test_topic_1","test_topic_2","test_topic_3")) 等主题。 现在我需要使用来自 2 个不同集群的数据。

所以 kafka 服务器将是“192.168.22.1:9092,192.168.22.2:9092,192.168.22.3:9092”作为 1 个集群, “192.168.22.4:9092,192.168.22.5:9092,192.168.22.6:9092”作为另一个集群。

主题将是集群编号 1 中的“test_topic_1”、“test_topic_2”、“test_topic_3”、集群编号 2 中的、“test_topic_4”、“test_topic_5”、“test_topic_6”。

我可以在同一个 java 应用程序中执行此操作吗? 我试过但只能使用来自 1 个集群的数据。 我怎样才能让它工作?非常感谢。

感谢@yaswanth,我确实使用了 2 个实例。请看下面我的代码。

    public class Consumer 
    public static void main(String[] args) 
    System.out.println("begin consumer");
    consume();
    consume2();
    System.out.println("finish consumer");
     
    public static void consume() 
    Properties props = new Properties();
    props.put("bootstrap.servers", "192.168.116.13:9092");
    props.put("group.id", "group-test1");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("auto.offset.reset", "earliest");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer",         "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    consumer.subscribe(Arrays.asList("test_topic_1"));
     while (true) 
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) 
            System.out.println(record.topic()+"---------------------------"+record.value());
        
    

    public static void consume2() 

    Properties props = new Properties();
    props.put("bootstrap.servers", "192.168.116.37:9092");
    props.put("group.id", "group-test2");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("auto.offset.reset", "earliest");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer2 = new KafkaConsumer<String, String>(props);
    consumer2.subscribe(Arrays.asList("test_topic_2"));     
    while (true) 
        ConsumerRecords<String, String> records = consumer2.poll(100);
        for (ConsumerRecord<String, String> record : records) 
            System.out.println(record.topic()+"---------------------------"+record.value());
        
    

感谢您的帮助,@yaswanth,它有效。

【问题讨论】:

你需要使用不同的消费者实例。请发布一些代码。这将帮助其他人更好地了解什么是行不通的。 【参考方案1】:

这将不起作用,因为从未调用过 consume2()consume()consume2() 似乎分别是正确的。您需要在两个不同的线程中运行consume()consume2()。你可以这样做。从您的主线程启动两个不同的线程,并将consume()consume2() 包装在RunnableCallable 中。

只是给你一个想法,

public class Consumer2 implements Runnable 

    @Override
    public void run() 
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.116.37:9092");
        props.put("group.id", "group-test2");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "earliest");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer2 = new KafkaConsumer<String, String>(props);
        consumer2.subscribe(Arrays.asList("test_topic_2"));
        while (true) 
            ConsumerRecords<String, String> records = consumer2.poll(100);
            for (ConsumerRecord<String, String> record : records) 
                System.out.println(record.topic()+"---------------------------"+record.value());
            
        
    


public class Consumer1 implements Runnable 

    @Override
    public void run() 
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.116.13:9092");
        props.put("group.id", "group-test1");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "earliest");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("test_topic_1"));
        while (true) 
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) 
                System.out.println(record.topic() + "---------------------------" + record.value());
            
        
    


public class AppStarter 

    public void init() 
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        List<Runnable> runnables = new ArrayList<>();
        Future future1 = executorService.submit(new Consumer1());
        Future future2 = executorService.submit(new Consumer2());

        try 
            future1.get();
            future2.get();
         catch (InterruptedException e) 
            e.printStackTrace();
         catch (ExecutionException e) 
            e.printStackTrace();
        
    

    public static void main(String[] args) 
        AppStarter appStarter = new AppStarter();
        appStarter.init();
    

注意:以上代码只是为了让您了解如何实现它。您可以参数化属性并将其作为构造函数 arg 传递,并且只有一个 Consumer 类的实现。

【讨论】:

以上是关于我可以从同一个 Java 应用程序中的 2 个不同的 kafka 服务器集群获取数据吗?的主要内容,如果未能解决你的问题,请参考以下文章

Java中的广播消息

从 2 个不同片段的 sqlite 中的 2 个表中获取信息

Django 2.0:将数据从视图发送到包含在另一个模板中的模板(2 个不同的应用程序)

从java批量更新的两个不同表中的一个表中插入max + 1

java - 如何从Java中的String中删除不同数量的空格? [复制]

我可以从 2 个不同的 apache vhost 提供一个 django 应用程序吗?