我可以从同一个 Java 应用程序中的 2 个不同的 kafka 服务器集群获取数据吗?
Posted
技术标签:
【中文标题】我可以从同一个 Java 应用程序中的 2 个不同的 kafka 服务器集群获取数据吗?【英文标题】:Can i get data from 2 different kafka server clusters in the same java app? 【发布时间】:2017-11-24 02:57:26 【问题描述】:我是卡夫卡的新手。通常我会编写一个小型 java 演示应用程序,设置一个 kafka 消费者并从一个 3 kafka 服务器集群获取数据。它工作得很好。
我将设置如下服务器
props.put("bootstrap.servers", "192.168.22.1:9092,192.168.22.2:9092,192.168.22.3:9092")
,并将订阅 consumer.subscribe(Arrays.asList("test_topic_1","test_topic_2","test_topic_3"))
等主题。
现在我需要使用来自 2 个不同集群的数据。
所以 kafka 服务器将是“192.168.22.1:9092,192.168.22.2:9092,192.168.22.3:9092”作为 1 个集群, “192.168.22.4:9092,192.168.22.5:9092,192.168.22.6:9092”作为另一个集群。
主题将是集群编号 1 中的“test_topic_1”、“test_topic_2”、“test_topic_3”、集群编号 2 中的、“test_topic_4”、“test_topic_5”、“test_topic_6”。
我可以在同一个 java 应用程序中执行此操作吗? 我试过但只能使用来自 1 个集群的数据。 我怎样才能让它工作?非常感谢。
感谢@yaswanth,我确实使用了 2 个实例。请看下面我的代码。
public class Consumer
public static void main(String[] args)
System.out.println("begin consumer");
consume();
consume2();
System.out.println("finish consumer");
public static void consume()
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.116.13:9092");
props.put("group.id", "group-test1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("test_topic_1"));
while (true)
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.topic()+"---------------------------"+record.value());
public static void consume2()
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.116.37:9092");
props.put("group.id", "group-test2");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer2 = new KafkaConsumer<String, String>(props);
consumer2.subscribe(Arrays.asList("test_topic_2"));
while (true)
ConsumerRecords<String, String> records = consumer2.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.topic()+"---------------------------"+record.value());
感谢您的帮助,@yaswanth,它有效。
【问题讨论】:
你需要使用不同的消费者实例。请发布一些代码。这将帮助其他人更好地了解什么是行不通的。 【参考方案1】:这将不起作用,因为从未调用过 consume2()
。 consume()
和 consume2()
似乎分别是正确的。您需要在两个不同的线程中运行consume()
和consume2()
。你可以这样做。从您的主线程启动两个不同的线程,并将consume()
和consume2()
包装在Runnable
或Callable
中。
只是给你一个想法,
public class Consumer2 implements Runnable
@Override
public void run()
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.116.37:9092");
props.put("group.id", "group-test2");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer2 = new KafkaConsumer<String, String>(props);
consumer2.subscribe(Arrays.asList("test_topic_2"));
while (true)
ConsumerRecords<String, String> records = consumer2.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.topic()+"---------------------------"+record.value());
public class Consumer1 implements Runnable
@Override
public void run()
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.116.13:9092");
props.put("group.id", "group-test1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("test_topic_1"));
while (true)
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.topic() + "---------------------------" + record.value());
public class AppStarter
public void init()
ExecutorService executorService = Executors.newFixedThreadPool(2);
List<Runnable> runnables = new ArrayList<>();
Future future1 = executorService.submit(new Consumer1());
Future future2 = executorService.submit(new Consumer2());
try
future1.get();
future2.get();
catch (InterruptedException e)
e.printStackTrace();
catch (ExecutionException e)
e.printStackTrace();
public static void main(String[] args)
AppStarter appStarter = new AppStarter();
appStarter.init();
注意:以上代码只是为了让您了解如何实现它。您可以参数化属性并将其作为构造函数 arg 传递,并且只有一个 Consumer 类的实现。
【讨论】:
以上是关于我可以从同一个 Java 应用程序中的 2 个不同的 kafka 服务器集群获取数据吗?的主要内容,如果未能解决你的问题,请参考以下文章
从 2 个不同片段的 sqlite 中的 2 个表中获取信息
Django 2.0:将数据从视图发送到包含在另一个模板中的模板(2 个不同的应用程序)
从java批量更新的两个不同表中的一个表中插入max + 1