生成嵌入式 kafka 时出错
Posted
技术标签:
【中文标题】生成嵌入式 kafka 时出错【英文标题】:Error producing to embedded kafka 【发布时间】:2016-06-27 18:48:18 【问题描述】:我正在尝试在我的代码中嵌入一个 kafkaserver。我使用下面的示例代码来尝试学习如何做到这一点,但由于某种原因,我的生产者无法向嵌入式服务器发送消息(它在 60 秒后超时)。我正在使用卡夫卡 0.8.2.2。谁能告诉我我做错了什么?
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import kafka.producer.ProducerConfig;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.Time;
import kafka.utils.Utils;
import org.apache.commons.collections.functors.ExceptionPredicate;
import org.apache.curator.test.TestingServer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Properties;
public class KafkaLocalBroker
public static final String TEST_TOPIC = "test-topic";
public KafkaConfig kafkaConfig;
public KafkaServer kafkaServer;
public TestingServer zookeeper;
public KafkaLocalBroker() throws Exception
zookeeper = new TestingServer(true);
Properties props = new Properties();
props.put("zookeeper.connect", zookeeper.getConnectString());
props.put("broker.id", 0);
kafkaConfig = new KafkaConfig(props);
kafkaServer = new KafkaServer(kafkaConfig, new Time()
public long nanoseconds()
return System.nanoTime();
public long milliseconds()
return System.currentTimeMillis();
public void sleep(long ms)
try
Thread.sleep(ms);
catch(InterruptedException e)
// Do Nothing
);
kafkaServer.startup();
System.out.println("embedded kafka is up");
public void stop()
kafkaServer.shutdown();
System.out.println("embedded kafka stop");
/**
* a main that tests the embedded kafka
* @param args
*/
public static void main(String[] args)
KafkaLocalBroker kafkaLocalBroker = null;
//init kafka server and start it:
try
kafkaLocalBroker = new KafkaLocalBroker();
catch (Exception e)
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 1);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
//send one message to local kafka server:
for (int i=0; i<10; i++)
ProducerRecord<String, String> data = new ProducerRecord<String, String>(TEST_TOPIC, "test-message" + i);
producer.send(data, (metadata, exception) ->
if (exception != null)
System.out.println("Failed to write log message: " + exception.getMessage());
else
System.out.println("Successful write to offset in partition on topic : " +
metadata.offset() + ", " + metadata.partition() + ", "+ metadata.topic());
);
//consume messages from Kafka:
SimpleConsumer consumer = new SimpleConsumer("localhost", 9092, 10000, 1024000, "clientId");
long offset = 0L;
while (offset < 160) //this is an exit criteria just for this test so we are not stuck in enless loop
// create a fetch request for topic “test”, partition 0, current offset, and fetch size of 1MB
FetchRequest fetchRequest = new FetchRequestBuilder().addFetch(TEST_TOPIC, 0, offset, 100000).build();//new FetchRequest(TEST_TOPIC, 0, offset, 1000000);
// get the message set from the consumer and print them out
FetchResponse messages = consumer.fetch(fetchRequest);
for(MessageAndOffset msg : messages.messageSet(TEST_TOPIC, 0))
ByteBuffer payload = msg.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
try
System.out.println(new String(bytes, "UTF-8"));
catch (Exception e)
// advance the offset after consuming each message
offset = msg.offset();
producer.close();
//close the consumer
consumer.close();
//stop the kafka broker:
if(kafkaLocalBroker != null)
kafkaLocalBroker.stop();
编辑:我在下面包含了从生产者返回的异常:
org.apache.kafka.common.errors.TimeoutException: 60000 毫秒后更新元数据失败。
【问题讨论】:
你如何描述失败的特征?任何具体的错误信息?你确定是生产者而不是消费者有问题吗? producer.send 挂起 60 秒,然后吐出我在上面的编辑中包含的 TimeoutException。 你解决过这个问题吗? 我将我的版本升级到 0.10.0.0 并且它工作正常。 【参考方案1】:用于创建 kafka producer 的属性对 0.8 无效。通过producerconfig 并更改属性。或者更新kafka版本
【讨论】:
我使用'org.apache.kafka.clients.producer.KafkaProducer'还是'org.apache.kafka.producer.Producer'? 当我使用包含链接中给出的配置时出现此异常:线程“main”中的异常 org.apache.kafka.common.config.ConfigException: Missing required configuration "bootstrap.servers" which没有默认值。 看来kafka生产者至少需要配置的bootstrap.servers、key.serializer和value.serializer以上是关于生成嵌入式 kafka 时出错的主要内容,如果未能解决你的问题,请参考以下文章
将 Kafka 连接嵌入 Ksqldb-server 时挂载(卷)不起作用